|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the test runner script for the 'unittest' framework. |
|
8 """ |
|
9 |
|
10 import json |
|
11 import os |
|
12 import sys |
|
13 import time |
|
14 import unittest |
|
15 |
|
16 |
|
17 sys.path.insert( |
|
18 2, |
|
19 os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) |
|
20 ) |
|
21 |
|
22 |
|
23 class EricTestResult(unittest.TestResult): |
|
24 """ |
|
25 Class implementing a TestResult derivative to send the data via a network |
|
26 connection. |
|
27 """ |
|
28 def __init__(self, writer, failfast): |
|
29 """ |
|
30 Constructor |
|
31 |
|
32 @param writer reference to the object to write the results to |
|
33 @type EricJsonWriter |
|
34 @param failfast flag indicating to stop at the first error |
|
35 @type bool |
|
36 """ |
|
37 super().__init__() |
|
38 self.__writer = writer |
|
39 self.failfast = failfast |
|
40 self.__testsRun = 0 |
|
41 |
|
42 self.__currentTestStatus = {} |
|
43 |
|
44 def addFailure(self, test, err): |
|
45 """ |
|
46 Public method called if a test failed. |
|
47 |
|
48 @param test reference to the test object |
|
49 @type TestCase |
|
50 @param err tuple containing the exception data like sys.exc_info |
|
51 (exception type, exception instance, traceback) |
|
52 @type tuple |
|
53 """ |
|
54 super().addFailure(test, err) |
|
55 tracebackLines = self._exc_info_to_string(err, test) |
|
56 |
|
57 self.__currentTestStatus.update({ |
|
58 "status": "failure", |
|
59 "traceback": tracebackLines, |
|
60 }) |
|
61 |
|
62 def addError(self, test, err): |
|
63 """ |
|
64 Public method called if a test errored. |
|
65 |
|
66 @param test reference to the test object |
|
67 @type TestCase |
|
68 @param err tuple containing the exception data like sys.exc_info |
|
69 (exception type, exception instance, traceback) |
|
70 @type tuple |
|
71 """ |
|
72 super().addError(test, err) |
|
73 tracebackLines = self._exc_info_to_string(err, test) |
|
74 |
|
75 self.__currentTestStatus.update({ |
|
76 "status": "error", |
|
77 "traceback": tracebackLines, |
|
78 }) |
|
79 |
|
80 def addSkip(self, test, reason): |
|
81 """ |
|
82 Public method called if a test was skipped. |
|
83 |
|
84 @param test reference to the test object |
|
85 @type TestCase |
|
86 @param reason reason for skipping the test |
|
87 @type str |
|
88 """ |
|
89 super().addSkip(test, reason) |
|
90 |
|
91 self.__currentTestStatus.update({ |
|
92 "status": "skipped", |
|
93 "shortmsg": reason, |
|
94 }) |
|
95 |
|
96 def addExpectedFailure(self, test, err): |
|
97 """ |
|
98 Public method called if a test failed expected. |
|
99 |
|
100 @param test reference to the test object |
|
101 @type TestCase |
|
102 @param err tuple containing the exception data like sys.exc_info |
|
103 (exception type, exception instance, traceback) |
|
104 @type tuple |
|
105 """ |
|
106 super().addExpectedFailure(test, err) |
|
107 tracebackLines = self._exc_info_to_string(err, test) |
|
108 |
|
109 self.__currentTestStatus.update({ |
|
110 "status": "expected failure", |
|
111 "traceback": tracebackLines, |
|
112 }) |
|
113 |
|
114 def addUnexpectedSuccess(self, test): |
|
115 """ |
|
116 Public method called if a test succeeded expectedly. |
|
117 |
|
118 @param test reference to the test object |
|
119 @type TestCase |
|
120 """ |
|
121 super().addUnexpectedSuccess(test) |
|
122 |
|
123 self.__currentTestStatus["status"] = "unexpected success" |
|
124 |
|
125 def addSubTest(self, test, subtest, err): |
|
126 """ |
|
127 Public method called for each subtest to record its result. |
|
128 |
|
129 @param test reference to the test object |
|
130 @type TestCase |
|
131 @param subtest reference to the subtest object |
|
132 @type TestCase |
|
133 @param err tuple containing the exception data like sys.exc_info |
|
134 (exception type, exception instance, traceback) |
|
135 @type tuple |
|
136 """ |
|
137 if err is not None: |
|
138 super().addSubTest(test, subtest, err) |
|
139 tracebackLines = self._exc_info_to_string(err, test) |
|
140 status = ( |
|
141 "failure" |
|
142 if issubclass(err[0], test.failureException) else |
|
143 "error" |
|
144 ) |
|
145 |
|
146 # record the last subtest fail status as the overall status |
|
147 self.__currentTestStatus["status"] = status |
|
148 |
|
149 self.__writer.write({ |
|
150 "event": "result", |
|
151 "status": status, |
|
152 "name": str(subtest), |
|
153 "id": subtest.id(), |
|
154 "description": subtest.shortDescription(), |
|
155 "traceback": tracebackLines, |
|
156 "subtest": True, |
|
157 }) |
|
158 |
|
159 if self.failfast: |
|
160 self.stop() |
|
161 else: |
|
162 self.__writer.write({ |
|
163 "event": "result", |
|
164 "status": "success", |
|
165 "name": str(subtest), |
|
166 "id": subtest.id(), |
|
167 "description": subtest.shortDescription(), |
|
168 "subtest": True, |
|
169 }) |
|
170 |
|
171 def startTest(self, test): |
|
172 """ |
|
173 Public method called at the start of a test. |
|
174 |
|
175 @param test reference to the test object |
|
176 @type TestCase |
|
177 """ |
|
178 super().startTest(test) |
|
179 |
|
180 self.__testsRun += 1 |
|
181 self.__currentTestStatus = { |
|
182 "event": "result", |
|
183 "status": "success", |
|
184 "name": str(test), |
|
185 "id": test.id(), |
|
186 "description": test.shortDescription(), |
|
187 "subtest": False, |
|
188 } |
|
189 |
|
190 self.__writer.write({ |
|
191 "event": "started", |
|
192 "name": str(test), |
|
193 "id": test.id(), |
|
194 "description": test.shortDescription(), |
|
195 }) |
|
196 |
|
197 self.__startTime = time.monotonic_ns() |
|
198 |
|
199 def stopTest(self, test): |
|
200 """ |
|
201 Public method called at the end of a test. |
|
202 |
|
203 @param test reference to the test object |
|
204 @type TestCase |
|
205 """ |
|
206 stopTime = time.monotonic_ns() |
|
207 duration = (stopTime - self.__startTime) / 1_000_000 # ms |
|
208 |
|
209 super().stopTest(test) |
|
210 |
|
211 self.__currentTestStatus["duration_ms"] = duration |
|
212 self.__writer.write(self.__currentTestStatus) |
|
213 |
|
214 def startTestRun(self): |
|
215 """ |
|
216 Public method called once before any tests are executed. |
|
217 """ |
|
218 self.__totalStartTime = time.monotonic_ns() |
|
219 self.__testsRun = 0 |
|
220 |
|
221 def stopTestRun(self): |
|
222 """ |
|
223 Public method called once after all tests are executed. |
|
224 """ |
|
225 stopTime = time.monotonic_ns() |
|
226 duration = (stopTime - self.__totalStartTime) / 1_000_000_000 # s |
|
227 |
|
228 self.__writer.write({ |
|
229 "event": "finished", |
|
230 "duration_s": duration, |
|
231 "tests": self.__testsRun, |
|
232 }) |
|
233 |
|
234 |
|
235 def _assembleTestCasesList(suite): |
|
236 """ |
|
237 Protected function to assemble a list of test cases included in a test |
|
238 suite. |
|
239 |
|
240 @param suite test suite to be inspected |
|
241 @type unittest.TestSuite |
|
242 @return list of tuples containing the test case ID, the string |
|
243 representation and the short description |
|
244 @rtype list of tuples of (str, str) |
|
245 """ |
|
246 testCases = [] |
|
247 for test in suite: |
|
248 if isinstance(test, unittest.TestSuite): |
|
249 testCases.extend(_assembleTestCasesList(test)) |
|
250 else: |
|
251 testId = test.id() |
|
252 if ( |
|
253 "ModuleImportFailure" not in testId and |
|
254 "LoadTestsFailure" not in testId and |
|
255 "_FailedTest" not in testId |
|
256 ): |
|
257 testCases.append( |
|
258 (testId, str(test), test.shortDescription()) |
|
259 ) |
|
260 return testCases |
|
261 |
|
262 |
|
263 def runtest(argv): |
|
264 """ |
|
265 Function to run the tests. |
|
266 |
|
267 @param argv list of command line parameters. |
|
268 @type list of str |
|
269 """ |
|
270 from EricNetwork.EricJsonStreamWriter import EricJsonWriter |
|
271 writer = EricJsonWriter(argv[0], int(argv[1])) |
|
272 del argv[:2] |
|
273 |
|
274 # process arguments |
|
275 if argv[0] == "discover": |
|
276 discover = True |
|
277 argv.pop(0) |
|
278 if argv[0] == "--start-directory": |
|
279 discoveryStart = argv[1] |
|
280 del argv[:2] |
|
281 else: |
|
282 discover = False |
|
283 discoveryStart = "" |
|
284 |
|
285 failfast = "--failfast" in argv |
|
286 if failfast: |
|
287 argv.remove("--failfast") |
|
288 |
|
289 coverage = "--cover" in argv |
|
290 if coverage: |
|
291 argv.remove("--cover") |
|
292 coverageErase = "--cover-erase" in argv |
|
293 if coverageErase: |
|
294 argv.remove("--cover-erase") |
|
295 |
|
296 if argv and argv[0] == "--failed-only": |
|
297 if discover: |
|
298 testFileName = "" |
|
299 failed = argv[1:] |
|
300 else: |
|
301 testFileName = argv[1] |
|
302 failed = argv[2:] |
|
303 else: |
|
304 failed = [] |
|
305 if discover: |
|
306 testFileName = testName = "" |
|
307 else: |
|
308 testFileName, testName = argv[:2] |
|
309 del argv[:2] |
|
310 |
|
311 testCases = argv[:] |
|
312 |
|
313 if testFileName: |
|
314 sys.path.insert(1, os.path.dirname(os.path.abspath(testFileName))) |
|
315 elif discoveryStart: |
|
316 sys.path.insert(1, os.path.abspath(discoveryStart)) |
|
317 |
|
318 try: |
|
319 testLoader = unittest.TestLoader() |
|
320 if discover and not failed: |
|
321 if testCases: |
|
322 test = testLoader.loadTestsFromNames(testCases) |
|
323 else: |
|
324 test = testLoader.discover(discoveryStart) |
|
325 else: |
|
326 if testFileName: |
|
327 module = __import__(os.path.splitext( |
|
328 os.path.basename(testFileName))[0]) |
|
329 else: |
|
330 module = None |
|
331 if failed: |
|
332 if module: |
|
333 failed = [t.split(".", 1)[1] |
|
334 for t in failed] |
|
335 test = testLoader.loadTestsFromNames( |
|
336 failed, module) |
|
337 else: |
|
338 test = testLoader.loadTestsFromName( |
|
339 testName, module) |
|
340 except Exception as err: |
|
341 print("Exception:", str(err)) |
|
342 writer.write({ |
|
343 "event": "collecterror", |
|
344 "error": str(err), |
|
345 }) |
|
346 sys.exit(1) |
|
347 |
|
348 collectedTests = { |
|
349 "event": "collected", |
|
350 "tests": [ |
|
351 {"id": id, "name": name, "description": desc} |
|
352 for id, name, desc in _assembleTestCasesList(test) |
|
353 ] |
|
354 } |
|
355 writer.write(collectedTests) |
|
356 |
|
357 # setup test coverage |
|
358 if coverage: |
|
359 if discover: |
|
360 covname = os.path.join(discoveryStart, "unittest") |
|
361 elif testFileName: |
|
362 covname = os.path.splitext( |
|
363 os.path.abspath(testFileName))[0] |
|
364 else: |
|
365 covname = "unittest" |
|
366 covDataFile = "{0}.coverage".format(covname) |
|
367 if not os.path.isabs(covDataFile): |
|
368 covDataFile = os.path.abspath(covDataFile) |
|
369 |
|
370 from DebugClients.Python.coverage import coverage as cov |
|
371 cover = cov(data_file=covDataFile) |
|
372 if coverageErase: |
|
373 cover.erase() |
|
374 else: |
|
375 cover = None |
|
376 |
|
377 testResult = EricTestResult(writer, failfast) |
|
378 startTestRun = getattr(testResult, 'startTestRun', None) |
|
379 if startTestRun is not None: |
|
380 startTestRun() |
|
381 try: |
|
382 if cover: |
|
383 cover.start() |
|
384 test.run(testResult) |
|
385 finally: |
|
386 if cover: |
|
387 cover.stop() |
|
388 cover.save() |
|
389 writer.write({ |
|
390 "event": "coverage", |
|
391 "file": covDataFile, |
|
392 }) |
|
393 stopTestRun = getattr(testResult, 'stopTestRun', None) |
|
394 if stopTestRun is not None: |
|
395 stopTestRun() |
|
396 |
|
397 writer.close() |
|
398 sys.exit(0) |
|
399 |
|
400 if __name__ == '__main__': |
|
401 if len(sys.argv) > 1: |
|
402 command = sys.argv[1] |
|
403 if command == "installed": |
|
404 sys.exit(0) |
|
405 |
|
406 elif command == "versions": |
|
407 import platform |
|
408 versions = { |
|
409 "name": "unittest", |
|
410 "version": platform.python_version(), |
|
411 "plugins": [], |
|
412 } |
|
413 print(json.dumps(versions)) |
|
414 sys.exit(0) |
|
415 |
|
416 elif command == "runtest": |
|
417 runtest(sys.argv[2:]) |
|
418 sys.exit(0) |
|
419 |
|
420 sys.exit(42) |
|
421 |
|
422 # |
|
423 # eflag: noqa = M801 |