|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the test runner script for the 'unittest' framework. |
|
8 """ |
|
9 |
|
10 import json |
|
11 import os |
|
12 import sys |
|
13 import time |
|
14 import unittest |
|
15 |
|
16 sys.path.insert(2, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) |
|
17 |
|
18 |
|
19 class EricTestResult(unittest.TestResult): |
|
20 """ |
|
21 Class implementing a TestResult derivative to send the data via a network |
|
22 connection. |
|
23 """ |
|
24 |
|
25 def __init__(self, writer, failfast): |
|
26 """ |
|
27 Constructor |
|
28 |
|
29 @param writer reference to the object to write the results to |
|
30 @type EricJsonWriter |
|
31 @param failfast flag indicating to stop at the first error |
|
32 @type bool |
|
33 """ |
|
34 super().__init__() |
|
35 self.__writer = writer |
|
36 self.failfast = failfast |
|
37 self.__testsRun = 0 |
|
38 |
|
39 self.__currentTestStatus = {} |
|
40 |
|
41 def addFailure(self, test, err): |
|
42 """ |
|
43 Public method called if a test failed. |
|
44 |
|
45 @param test reference to the test object |
|
46 @type TestCase |
|
47 @param err tuple containing the exception data like sys.exc_info |
|
48 (exception type, exception instance, traceback) |
|
49 @type tuple |
|
50 """ |
|
51 super().addFailure(test, err) |
|
52 tracebackLines = self._exc_info_to_string(err, test) |
|
53 |
|
54 self.__currentTestStatus.update( |
|
55 { |
|
56 "status": "failure", |
|
57 "traceback": tracebackLines, |
|
58 } |
|
59 ) |
|
60 |
|
61 def addError(self, test, err): |
|
62 """ |
|
63 Public method called if a test errored. |
|
64 |
|
65 @param test reference to the test object |
|
66 @type TestCase |
|
67 @param err tuple containing the exception data like sys.exc_info |
|
68 (exception type, exception instance, traceback) |
|
69 @type tuple |
|
70 """ |
|
71 super().addError(test, err) |
|
72 tracebackLines = self._exc_info_to_string(err, test) |
|
73 |
|
74 self.__currentTestStatus.update( |
|
75 { |
|
76 "status": "error", |
|
77 "traceback": tracebackLines, |
|
78 } |
|
79 ) |
|
80 |
|
81 def addSkip(self, test, reason): |
|
82 """ |
|
83 Public method called if a test was skipped. |
|
84 |
|
85 @param test reference to the test object |
|
86 @type TestCase |
|
87 @param reason reason for skipping the test |
|
88 @type str |
|
89 """ |
|
90 super().addSkip(test, reason) |
|
91 |
|
92 self.__currentTestStatus.update( |
|
93 { |
|
94 "status": "skipped", |
|
95 "shortmsg": reason, |
|
96 } |
|
97 ) |
|
98 |
|
99 def addExpectedFailure(self, test, err): |
|
100 """ |
|
101 Public method called if a test failed expected. |
|
102 |
|
103 @param test reference to the test object |
|
104 @type TestCase |
|
105 @param err tuple containing the exception data like sys.exc_info |
|
106 (exception type, exception instance, traceback) |
|
107 @type tuple |
|
108 """ |
|
109 super().addExpectedFailure(test, err) |
|
110 tracebackLines = self._exc_info_to_string(err, test) |
|
111 |
|
112 self.__currentTestStatus.update( |
|
113 { |
|
114 "status": "expected failure", |
|
115 "traceback": tracebackLines, |
|
116 } |
|
117 ) |
|
118 |
|
119 def addUnexpectedSuccess(self, test): |
|
120 """ |
|
121 Public method called if a test succeeded expectedly. |
|
122 |
|
123 @param test reference to the test object |
|
124 @type TestCase |
|
125 """ |
|
126 super().addUnexpectedSuccess(test) |
|
127 |
|
128 self.__currentTestStatus["status"] = "unexpected success" |
|
129 |
|
130 def addSubTest(self, test, subtest, err): |
|
131 """ |
|
132 Public method called for each subtest to record its result. |
|
133 |
|
134 @param test reference to the test object |
|
135 @type TestCase |
|
136 @param subtest reference to the subtest object |
|
137 @type TestCase |
|
138 @param err tuple containing the exception data like sys.exc_info |
|
139 (exception type, exception instance, traceback) |
|
140 @type tuple |
|
141 """ |
|
142 if err is not None: |
|
143 super().addSubTest(test, subtest, err) |
|
144 tracebackLines = self._exc_info_to_string(err, test) |
|
145 status = "failure" if issubclass(err[0], test.failureException) else "error" |
|
146 |
|
147 # record the last subtest fail status as the overall status |
|
148 self.__currentTestStatus["status"] = status |
|
149 |
|
150 self.__writer.write( |
|
151 { |
|
152 "event": "result", |
|
153 "status": status, |
|
154 "name": str(subtest), |
|
155 "id": subtest.id(), |
|
156 "description": subtest.shortDescription(), |
|
157 "traceback": tracebackLines, |
|
158 "subtest": True, |
|
159 } |
|
160 ) |
|
161 |
|
162 if self.failfast: |
|
163 self.stop() |
|
164 else: |
|
165 self.__writer.write( |
|
166 { |
|
167 "event": "result", |
|
168 "status": "success", |
|
169 "name": str(subtest), |
|
170 "id": subtest.id(), |
|
171 "description": subtest.shortDescription(), |
|
172 "subtest": True, |
|
173 } |
|
174 ) |
|
175 |
|
176 def startTest(self, test): |
|
177 """ |
|
178 Public method called at the start of a test. |
|
179 |
|
180 @param test reference to the test object |
|
181 @type TestCase |
|
182 """ |
|
183 super().startTest(test) |
|
184 |
|
185 self.__testsRun += 1 |
|
186 self.__currentTestStatus = { |
|
187 "event": "result", |
|
188 "status": "success", |
|
189 "name": str(test), |
|
190 "id": test.id(), |
|
191 "description": test.shortDescription(), |
|
192 "subtest": False, |
|
193 } |
|
194 |
|
195 self.__writer.write( |
|
196 { |
|
197 "event": "started", |
|
198 "name": str(test), |
|
199 "id": test.id(), |
|
200 "description": test.shortDescription(), |
|
201 } |
|
202 ) |
|
203 |
|
204 self.__startTime = time.monotonic_ns() |
|
205 |
|
206 def stopTest(self, test): |
|
207 """ |
|
208 Public method called at the end of a test. |
|
209 |
|
210 @param test reference to the test object |
|
211 @type TestCase |
|
212 """ |
|
213 stopTime = time.monotonic_ns() |
|
214 duration = (stopTime - self.__startTime) / 1_000_000 # ms |
|
215 |
|
216 super().stopTest(test) |
|
217 |
|
218 self.__currentTestStatus["duration_ms"] = duration |
|
219 self.__writer.write(self.__currentTestStatus) |
|
220 |
|
221 def startTestRun(self): |
|
222 """ |
|
223 Public method called once before any tests are executed. |
|
224 """ |
|
225 self.__totalStartTime = time.monotonic_ns() |
|
226 self.__testsRun = 0 |
|
227 |
|
228 def stopTestRun(self): |
|
229 """ |
|
230 Public method called once after all tests are executed. |
|
231 """ |
|
232 stopTime = time.monotonic_ns() |
|
233 duration = (stopTime - self.__totalStartTime) / 1_000_000_000 # s |
|
234 |
|
235 self.__writer.write( |
|
236 { |
|
237 "event": "finished", |
|
238 "duration_s": duration, |
|
239 "tests": self.__testsRun, |
|
240 } |
|
241 ) |
|
242 |
|
243 |
|
244 def _assembleTestCasesList(suite): |
|
245 """ |
|
246 Protected function to assemble a list of test cases included in a test |
|
247 suite. |
|
248 |
|
249 @param suite test suite to be inspected |
|
250 @type unittest.TestSuite |
|
251 @return list of tuples containing the test case ID, the string |
|
252 representation and the short description |
|
253 @rtype list of tuples of (str, str) |
|
254 """ |
|
255 testCases = [] |
|
256 for test in suite: |
|
257 if isinstance(test, unittest.TestSuite): |
|
258 testCases.extend(_assembleTestCasesList(test)) |
|
259 else: |
|
260 testId = test.id() |
|
261 if ( |
|
262 "ModuleImportFailure" not in testId |
|
263 and "LoadTestsFailure" not in testId |
|
264 and "_FailedTest" not in testId |
|
265 ): |
|
266 testCases.append((testId, str(test), test.shortDescription())) |
|
267 return testCases |
|
268 |
|
269 |
|
270 def runtest(argv): |
|
271 """ |
|
272 Function to run the tests. |
|
273 |
|
274 @param argv list of command line parameters. |
|
275 @type list of str |
|
276 """ |
|
277 from EricNetwork.EricJsonStreamWriter import EricJsonWriter |
|
278 |
|
279 writer = EricJsonWriter(argv[0], int(argv[1])) |
|
280 del argv[:2] |
|
281 |
|
282 # process arguments |
|
283 if argv[0] == "discover": |
|
284 discover = True |
|
285 argv.pop(0) |
|
286 if argv[0] == "--start-directory": |
|
287 discoveryStart = argv[1] |
|
288 del argv[:2] |
|
289 else: |
|
290 discover = False |
|
291 discoveryStart = "" |
|
292 |
|
293 failfast = "--failfast" in argv |
|
294 if failfast: |
|
295 argv.remove("--failfast") |
|
296 |
|
297 collectCoverage = "--cover" in argv |
|
298 if collectCoverage: |
|
299 argv.remove("--cover") |
|
300 coverageErase = "--cover-erase" in argv |
|
301 if coverageErase: |
|
302 argv.remove("--cover-erase") |
|
303 if "--cover-file" in argv: |
|
304 index = argv.index("--cover-file") |
|
305 covDataFile = argv[index + 1] |
|
306 del argv[index : index + 2] |
|
307 else: |
|
308 covDataFile = "" |
|
309 |
|
310 if argv and argv[0] == "--failed-only": |
|
311 if discover: |
|
312 testFileName = "" |
|
313 failed = argv[1:] |
|
314 else: |
|
315 testFileName = argv[1] |
|
316 failed = argv[2:] |
|
317 else: |
|
318 failed = [] |
|
319 if discover: |
|
320 testFileName = testName = "" |
|
321 else: |
|
322 testFileName, testName = argv[:2] |
|
323 del argv[:2] |
|
324 |
|
325 testCases = argv[:] |
|
326 |
|
327 if testFileName: |
|
328 sys.path.insert(1, os.path.dirname(os.path.abspath(testFileName))) |
|
329 elif discoveryStart: |
|
330 sys.path.insert(1, os.path.abspath(discoveryStart)) |
|
331 |
|
332 # setup test coverage |
|
333 if collectCoverage: |
|
334 if not covDataFile: |
|
335 if discover: |
|
336 covname = os.path.join(discoveryStart, "test") |
|
337 elif testFileName: |
|
338 covname = os.path.splitext(os.path.abspath(testFileName))[0] |
|
339 else: |
|
340 covname = "test" |
|
341 covDataFile = "{0}.coverage".format(covname) |
|
342 if not os.path.isabs(covDataFile): |
|
343 covDataFile = os.path.abspath(covDataFile) |
|
344 |
|
345 sys.path.insert( |
|
346 2, |
|
347 os.path.abspath( |
|
348 os.path.join( |
|
349 os.path.dirname(__file__), "..", "..", "DebugClients", "Python" |
|
350 ) |
|
351 ), |
|
352 ) |
|
353 from DebugClients.Python.coverage import Coverage |
|
354 |
|
355 cover = Coverage(data_file=covDataFile) |
|
356 if coverageErase: |
|
357 cover.erase() |
|
358 cover.start() |
|
359 else: |
|
360 cover = None |
|
361 |
|
362 try: |
|
363 testLoader = unittest.TestLoader() |
|
364 if discover and not failed: |
|
365 if testCases: |
|
366 test = testLoader.loadTestsFromNames(testCases) |
|
367 else: |
|
368 test = testLoader.discover(discoveryStart) |
|
369 else: |
|
370 if testFileName: |
|
371 module = __import__(os.path.splitext(os.path.basename(testFileName))[0]) |
|
372 else: |
|
373 module = None |
|
374 if failed: |
|
375 if module: |
|
376 failed = [t.split(".", 1)[1] for t in failed] |
|
377 test = testLoader.loadTestsFromNames(failed, module) |
|
378 else: |
|
379 test = testLoader.loadTestsFromName(testName, module) |
|
380 except Exception as err: |
|
381 print("Exception:", str(err)) |
|
382 writer.write( |
|
383 { |
|
384 "event": "collecterror", |
|
385 "error": str(err), |
|
386 } |
|
387 ) |
|
388 sys.exit(1) |
|
389 |
|
390 collectedTests = { |
|
391 "event": "collected", |
|
392 "tests": [ |
|
393 {"id": id, "name": name, "description": desc} |
|
394 for id, name, desc in _assembleTestCasesList(test) |
|
395 ], |
|
396 } |
|
397 writer.write(collectedTests) |
|
398 |
|
399 testResult = EricTestResult(writer, failfast) |
|
400 startTestRun = getattr(testResult, "startTestRun", None) |
|
401 if startTestRun is not None: |
|
402 startTestRun() |
|
403 try: |
|
404 test.run(testResult) |
|
405 finally: |
|
406 if cover: |
|
407 cover.stop() |
|
408 cover.save() |
|
409 writer.write( |
|
410 { |
|
411 "event": "coverage", |
|
412 "file": covDataFile, |
|
413 } |
|
414 ) |
|
415 stopTestRun = getattr(testResult, "stopTestRun", None) |
|
416 if stopTestRun is not None: |
|
417 stopTestRun() |
|
418 |
|
419 writer.close() |
|
420 sys.exit(0) |
|
421 |
|
422 |
|
423 if __name__ == "__main__": |
|
424 if len(sys.argv) > 1: |
|
425 command = sys.argv[1] |
|
426 if command == "installed": |
|
427 sys.exit(0) |
|
428 |
|
429 elif command == "versions": |
|
430 import platform |
|
431 |
|
432 versions = { |
|
433 "name": "unittest", |
|
434 "version": platform.python_version(), |
|
435 "plugins": [], |
|
436 } |
|
437 print(json.dumps(versions)) |
|
438 sys.exit(0) |
|
439 |
|
440 elif command == "runtest": |
|
441 runtest(sys.argv[2:]) |
|
442 sys.exit(0) |
|
443 |
|
444 sys.exit(42) |
|
445 |
|
446 # |
|
447 # eflag: noqa = M801 |