|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the test runner script for the 'unittest' framework. |
|
8 """ |
|
9 |
|
10 import json |
|
11 import os |
|
12 import sys |
|
13 import time |
|
14 import unittest |
|
15 |
|
16 sys.path.insert( |
|
17 2, |
|
18 os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) |
|
19 ) |
|
20 |
|
21 |
|
22 class EricTestResult(unittest.TestResult): |
|
23 """ |
|
24 Class implementing a TestResult derivative to send the data via a network |
|
25 connection. |
|
26 """ |
|
27 def __init__(self, writer, failfast): |
|
28 """ |
|
29 Constructor |
|
30 |
|
31 @param writer reference to the object to write the results to |
|
32 @type EricJsonWriter |
|
33 @param failfast flag indicating to stop at the first error |
|
34 @type bool |
|
35 """ |
|
36 super().__init__() |
|
37 self.__writer = writer |
|
38 self.failfast = failfast |
|
39 self.__testsRun = 0 |
|
40 |
|
41 self.__currentTestStatus = {} |
|
42 |
|
43 def addFailure(self, test, err): |
|
44 """ |
|
45 Public method called if a test failed. |
|
46 |
|
47 @param test reference to the test object |
|
48 @type TestCase |
|
49 @param err tuple containing the exception data like sys.exc_info |
|
50 (exception type, exception instance, traceback) |
|
51 @type tuple |
|
52 """ |
|
53 super().addFailure(test, err) |
|
54 tracebackLines = self._exc_info_to_string(err, test) |
|
55 |
|
56 self.__currentTestStatus.update({ |
|
57 "status": "failure", |
|
58 "traceback": tracebackLines, |
|
59 }) |
|
60 |
|
61 def addError(self, test, err): |
|
62 """ |
|
63 Public method called if a test errored. |
|
64 |
|
65 @param test reference to the test object |
|
66 @type TestCase |
|
67 @param err tuple containing the exception data like sys.exc_info |
|
68 (exception type, exception instance, traceback) |
|
69 @type tuple |
|
70 """ |
|
71 super().addError(test, err) |
|
72 tracebackLines = self._exc_info_to_string(err, test) |
|
73 |
|
74 self.__currentTestStatus.update({ |
|
75 "status": "error", |
|
76 "traceback": tracebackLines, |
|
77 }) |
|
78 |
|
79 def addSkip(self, test, reason): |
|
80 """ |
|
81 Public method called if a test was skipped. |
|
82 |
|
83 @param test reference to the test object |
|
84 @type TestCase |
|
85 @param reason reason for skipping the test |
|
86 @type str |
|
87 """ |
|
88 super().addSkip(test, reason) |
|
89 |
|
90 self.__currentTestStatus.update({ |
|
91 "status": "skipped", |
|
92 "shortmsg": reason, |
|
93 }) |
|
94 |
|
95 def addExpectedFailure(self, test, err): |
|
96 """ |
|
97 Public method called if a test failed expected. |
|
98 |
|
99 @param test reference to the test object |
|
100 @type TestCase |
|
101 @param err tuple containing the exception data like sys.exc_info |
|
102 (exception type, exception instance, traceback) |
|
103 @type tuple |
|
104 """ |
|
105 super().addExpectedFailure(test, err) |
|
106 tracebackLines = self._exc_info_to_string(err, test) |
|
107 |
|
108 self.__currentTestStatus.update({ |
|
109 "status": "expected failure", |
|
110 "traceback": tracebackLines, |
|
111 }) |
|
112 |
|
113 def addUnexpectedSuccess(self, test): |
|
114 """ |
|
115 Public method called if a test succeeded expectedly. |
|
116 |
|
117 @param test reference to the test object |
|
118 @type TestCase |
|
119 """ |
|
120 super().addUnexpectedSuccess(test) |
|
121 |
|
122 self.__currentTestStatus["status"] = "unexpected success" |
|
123 |
|
124 def addSubTest(self, test, subtest, err): |
|
125 """ |
|
126 Public method called for each subtest to record its result. |
|
127 |
|
128 @param test reference to the test object |
|
129 @type TestCase |
|
130 @param subtest reference to the subtest object |
|
131 @type TestCase |
|
132 @param err tuple containing the exception data like sys.exc_info |
|
133 (exception type, exception instance, traceback) |
|
134 @type tuple |
|
135 """ |
|
136 if err is not None: |
|
137 super().addSubTest(test, subtest, err) |
|
138 tracebackLines = self._exc_info_to_string(err, test) |
|
139 status = ( |
|
140 "failure" |
|
141 if issubclass(err[0], test.failureException) else |
|
142 "error" |
|
143 ) |
|
144 |
|
145 # record the last subtest fail status as the overall status |
|
146 self.__currentTestStatus["status"] = status |
|
147 |
|
148 self.__writer.write({ |
|
149 "event": "result", |
|
150 "status": status, |
|
151 "name": str(subtest), |
|
152 "id": subtest.id(), |
|
153 "description": subtest.shortDescription(), |
|
154 "traceback": tracebackLines, |
|
155 "subtest": True, |
|
156 }) |
|
157 |
|
158 if self.failfast: |
|
159 self.stop() |
|
160 else: |
|
161 self.__writer.write({ |
|
162 "event": "result", |
|
163 "status": "success", |
|
164 "name": str(subtest), |
|
165 "id": subtest.id(), |
|
166 "description": subtest.shortDescription(), |
|
167 "subtest": True, |
|
168 }) |
|
169 |
|
170 def startTest(self, test): |
|
171 """ |
|
172 Public method called at the start of a test. |
|
173 |
|
174 @param test reference to the test object |
|
175 @type TestCase |
|
176 """ |
|
177 super().startTest(test) |
|
178 |
|
179 self.__testsRun += 1 |
|
180 self.__currentTestStatus = { |
|
181 "event": "result", |
|
182 "status": "success", |
|
183 "name": str(test), |
|
184 "id": test.id(), |
|
185 "description": test.shortDescription(), |
|
186 "subtest": False, |
|
187 } |
|
188 |
|
189 self.__writer.write({ |
|
190 "event": "started", |
|
191 "name": str(test), |
|
192 "id": test.id(), |
|
193 "description": test.shortDescription(), |
|
194 }) |
|
195 |
|
196 self.__startTime = time.monotonic_ns() |
|
197 |
|
198 def stopTest(self, test): |
|
199 """ |
|
200 Public method called at the end of a test. |
|
201 |
|
202 @param test reference to the test object |
|
203 @type TestCase |
|
204 """ |
|
205 stopTime = time.monotonic_ns() |
|
206 duration = (stopTime - self.__startTime) / 1_000_000 # ms |
|
207 |
|
208 super().stopTest(test) |
|
209 |
|
210 self.__currentTestStatus["duration_ms"] = duration |
|
211 self.__writer.write(self.__currentTestStatus) |
|
212 |
|
213 def startTestRun(self): |
|
214 """ |
|
215 Public method called once before any tests are executed. |
|
216 """ |
|
217 self.__totalStartTime = time.monotonic_ns() |
|
218 self.__testsRun = 0 |
|
219 |
|
220 def stopTestRun(self): |
|
221 """ |
|
222 Public method called once after all tests are executed. |
|
223 """ |
|
224 stopTime = time.monotonic_ns() |
|
225 duration = (stopTime - self.__totalStartTime) / 1_000_000_000 # s |
|
226 |
|
227 self.__writer.write({ |
|
228 "event": "finished", |
|
229 "duration_s": duration, |
|
230 "tests": self.__testsRun, |
|
231 }) |
|
232 |
|
233 |
|
234 def _assembleTestCasesList(suite): |
|
235 """ |
|
236 Protected function to assemble a list of test cases included in a test |
|
237 suite. |
|
238 |
|
239 @param suite test suite to be inspected |
|
240 @type unittest.TestSuite |
|
241 @return list of tuples containing the test case ID, the string |
|
242 representation and the short description |
|
243 @rtype list of tuples of (str, str) |
|
244 """ |
|
245 testCases = [] |
|
246 for test in suite: |
|
247 if isinstance(test, unittest.TestSuite): |
|
248 testCases.extend(_assembleTestCasesList(test)) |
|
249 else: |
|
250 testId = test.id() |
|
251 if ( |
|
252 "ModuleImportFailure" not in testId and |
|
253 "LoadTestsFailure" not in testId and |
|
254 "_FailedTest" not in testId |
|
255 ): |
|
256 testCases.append( |
|
257 (testId, str(test), test.shortDescription()) |
|
258 ) |
|
259 return testCases |
|
260 |
|
261 |
|
262 def runtest(argv): |
|
263 """ |
|
264 Function to run the tests. |
|
265 |
|
266 @param argv list of command line parameters. |
|
267 @type list of str |
|
268 """ |
|
269 from EricNetwork.EricJsonStreamWriter import EricJsonWriter |
|
270 writer = EricJsonWriter(argv[0], int(argv[1])) |
|
271 del argv[:2] |
|
272 |
|
273 # process arguments |
|
274 if argv[0] == "discover": |
|
275 discover = True |
|
276 argv.pop(0) |
|
277 if argv[0] == "--start-directory": |
|
278 discoveryStart = argv[1] |
|
279 del argv[:2] |
|
280 else: |
|
281 discover = False |
|
282 discoveryStart = "" |
|
283 |
|
284 failfast = "--failfast" in argv |
|
285 if failfast: |
|
286 argv.remove("--failfast") |
|
287 |
|
288 collectCoverage = "--cover" in argv |
|
289 if collectCoverage: |
|
290 argv.remove("--cover") |
|
291 coverageErase = "--cover-erase" in argv |
|
292 if coverageErase: |
|
293 argv.remove("--cover-erase") |
|
294 if "--cover-file" in argv: |
|
295 index = argv.index("--cover-file") |
|
296 covDataFile = argv[index + 1] |
|
297 del argv[index:index + 2] |
|
298 else: |
|
299 covDataFile = "" |
|
300 |
|
301 if argv and argv[0] == "--failed-only": |
|
302 if discover: |
|
303 testFileName = "" |
|
304 failed = argv[1:] |
|
305 else: |
|
306 testFileName = argv[1] |
|
307 failed = argv[2:] |
|
308 else: |
|
309 failed = [] |
|
310 if discover: |
|
311 testFileName = testName = "" |
|
312 else: |
|
313 testFileName, testName = argv[:2] |
|
314 del argv[:2] |
|
315 |
|
316 testCases = argv[:] |
|
317 |
|
318 if testFileName: |
|
319 sys.path.insert(1, os.path.dirname(os.path.abspath(testFileName))) |
|
320 elif discoveryStart: |
|
321 sys.path.insert(1, os.path.abspath(discoveryStart)) |
|
322 |
|
323 # setup test coverage |
|
324 if collectCoverage: |
|
325 if not covDataFile: |
|
326 if discover: |
|
327 covname = os.path.join(discoveryStart, "test") |
|
328 elif testFileName: |
|
329 covname = os.path.splitext( |
|
330 os.path.abspath(testFileName))[0] |
|
331 else: |
|
332 covname = "test" |
|
333 covDataFile = "{0}.coverage".format(covname) |
|
334 if not os.path.isabs(covDataFile): |
|
335 covDataFile = os.path.abspath(covDataFile) |
|
336 |
|
337 sys.path.insert( |
|
338 2, |
|
339 os.path.abspath(os.path.join( |
|
340 os.path.dirname(__file__), "..", "..", "DebugClients", "Python" |
|
341 )) |
|
342 ) |
|
343 from DebugClients.Python.coverage import Coverage |
|
344 cover = Coverage(data_file=covDataFile) |
|
345 if coverageErase: |
|
346 cover.erase() |
|
347 cover.start() |
|
348 else: |
|
349 cover = None |
|
350 |
|
351 try: |
|
352 testLoader = unittest.TestLoader() |
|
353 if discover and not failed: |
|
354 if testCases: |
|
355 test = testLoader.loadTestsFromNames(testCases) |
|
356 else: |
|
357 test = testLoader.discover(discoveryStart) |
|
358 else: |
|
359 if testFileName: |
|
360 module = __import__(os.path.splitext( |
|
361 os.path.basename(testFileName))[0]) |
|
362 else: |
|
363 module = None |
|
364 if failed: |
|
365 if module: |
|
366 failed = [t.split(".", 1)[1] |
|
367 for t in failed] |
|
368 test = testLoader.loadTestsFromNames( |
|
369 failed, module) |
|
370 else: |
|
371 test = testLoader.loadTestsFromName( |
|
372 testName, module) |
|
373 except Exception as err: |
|
374 print("Exception:", str(err)) |
|
375 writer.write({ |
|
376 "event": "collecterror", |
|
377 "error": str(err), |
|
378 }) |
|
379 sys.exit(1) |
|
380 |
|
381 collectedTests = { |
|
382 "event": "collected", |
|
383 "tests": [ |
|
384 {"id": id, "name": name, "description": desc} |
|
385 for id, name, desc in _assembleTestCasesList(test) |
|
386 ] |
|
387 } |
|
388 writer.write(collectedTests) |
|
389 |
|
390 testResult = EricTestResult(writer, failfast) |
|
391 startTestRun = getattr(testResult, 'startTestRun', None) |
|
392 if startTestRun is not None: |
|
393 startTestRun() |
|
394 try: |
|
395 test.run(testResult) |
|
396 finally: |
|
397 if cover: |
|
398 cover.stop() |
|
399 cover.save() |
|
400 writer.write({ |
|
401 "event": "coverage", |
|
402 "file": covDataFile, |
|
403 }) |
|
404 stopTestRun = getattr(testResult, 'stopTestRun', None) |
|
405 if stopTestRun is not None: |
|
406 stopTestRun() |
|
407 |
|
408 writer.close() |
|
409 sys.exit(0) |
|
410 |
|
411 if __name__ == '__main__': |
|
412 if len(sys.argv) > 1: |
|
413 command = sys.argv[1] |
|
414 if command == "installed": |
|
415 sys.exit(0) |
|
416 |
|
417 elif command == "versions": |
|
418 import platform |
|
419 versions = { |
|
420 "name": "unittest", |
|
421 "version": platform.python_version(), |
|
422 "plugins": [], |
|
423 } |
|
424 print(json.dumps(versions)) |
|
425 sys.exit(0) |
|
426 |
|
427 elif command == "runtest": |
|
428 runtest(sys.argv[2:]) |
|
429 sys.exit(0) |
|
430 |
|
431 sys.exit(42) |
|
432 |
|
433 # |
|
434 # eflag: noqa = M801 |