6 """ |
6 """ |
7 Module implementing the test runner script for the 'unittest' framework. |
7 Module implementing the test runner script for the 'unittest' framework. |
8 """ |
8 """ |
9 |
9 |
10 import json |
10 import json |
|
11 import os |
11 import sys |
12 import sys |
|
13 import time |
|
14 import unittest |
|
15 |
|
16 |
|
17 sys.path.insert( |
|
18 2, |
|
19 os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) |
|
20 ) |
|
21 |
|
22 |
|
23 class EricTestResult(unittest.TestResult): |
|
24 """ |
|
25 Class implementing a TestResult derivative to send the data via a network |
|
26 connection. |
|
27 """ |
|
28 def __init__(self, writer, failfast): |
|
29 """ |
|
30 Constructor |
|
31 |
|
32 @param writer reference to the object to write the results to |
|
33 @type EricJsonWriter |
|
34 @param failfast flag indicating to stop at the first error |
|
35 @type bool |
|
36 """ |
|
37 super().__init__() |
|
38 self.__writer = writer |
|
39 self.failfast = failfast |
|
40 self.__testsRun = 0 |
|
41 |
|
42 self.__currentTestStatus = {} |
|
43 |
|
44 def addFailure(self, test, err): |
|
45 """ |
|
46 Public method called if a test failed. |
|
47 |
|
48 @param test reference to the test object |
|
49 @type TestCase |
|
50 @param err tuple containing the exception data like sys.exc_info |
|
51 (exception type, exception instance, traceback) |
|
52 @type tuple |
|
53 """ |
|
54 super().addFailure(test, err) |
|
55 tracebackLines = self._exc_info_to_string(err, test) |
|
56 |
|
57 self.__currentTestStatus.update({ |
|
58 "status": "failure", |
|
59 "traceback": tracebackLines, |
|
60 }) |
|
61 |
|
62 def addError(self, test, err): |
|
63 """ |
|
64 Public method called if a test errored. |
|
65 |
|
66 @param test reference to the test object |
|
67 @type TestCase |
|
68 @param err tuple containing the exception data like sys.exc_info |
|
69 (exception type, exception instance, traceback) |
|
70 @type tuple |
|
71 """ |
|
72 super().addError(test, err) |
|
73 tracebackLines = self._exc_info_to_string(err, test) |
|
74 |
|
75 self.__currentTestStatus.update({ |
|
76 "status": "error", |
|
77 "traceback": tracebackLines, |
|
78 }) |
|
79 |
|
80 def addSubTest(self, test, subtest, err): |
|
81 """ |
|
82 Public method called for each subtest to record its result. |
|
83 |
|
84 @param test reference to the test object |
|
85 @type TestCase |
|
86 @param subtest reference to the subtest object |
|
87 @type TestCase |
|
88 @param err tuple containing the exception data like sys.exc_info |
|
89 (exception type, exception instance, traceback) |
|
90 @type tuple |
|
91 """ |
|
92 if err is not None: |
|
93 super().addSubTest(test, subtest, err) |
|
94 tracebackLines = self._exc_info_to_string(err, test) |
|
95 status = ( |
|
96 "failure" |
|
97 if issubclass(err[0], test.failureException) else |
|
98 "error" |
|
99 ) |
|
100 |
|
101 self.__currentTestStatus.update({ |
|
102 "status": status, |
|
103 "name": str(subtest), |
|
104 "traceback": tracebackLines, |
|
105 }) |
|
106 |
|
107 if self.failfast: |
|
108 self.stop() |
|
109 |
|
110 def addSkip(self, test, reason): |
|
111 """ |
|
112 Public method called if a test was skipped. |
|
113 |
|
114 @param test reference to the test object |
|
115 @type TestCase |
|
116 @param reason reason for skipping the test |
|
117 @type str |
|
118 """ |
|
119 super().addSkip(test, reason) |
|
120 |
|
121 self.__currentTestStatus.update({ |
|
122 "status": "skipped", |
|
123 "shortmsg": reason, |
|
124 }) |
|
125 |
|
126 def addExpectedFailure(self, test, err): |
|
127 """ |
|
128 Public method called if a test failed expected. |
|
129 |
|
130 @param test reference to the test object |
|
131 @type TestCase |
|
132 @param err tuple containing the exception data like sys.exc_info |
|
133 (exception type, exception instance, traceback) |
|
134 @type tuple |
|
135 """ |
|
136 super().addExpectedFailure(test, err) |
|
137 tracebackLines = self._exc_info_to_string(err, test) |
|
138 |
|
139 self.__currentTestStatus.update({ |
|
140 "status": "expected failure", |
|
141 "traceback": tracebackLines, |
|
142 }) |
|
143 |
|
144 def addUnexpectedSuccess(self, test): |
|
145 """ |
|
146 Public method called if a test succeeded expectedly. |
|
147 |
|
148 @param test reference to the test object |
|
149 @type TestCase |
|
150 """ |
|
151 super().addUnexpectedSuccess(test) |
|
152 |
|
153 self.__currentTestStatus["status"] = "unexpected success" |
|
154 |
|
155 def startTest(self, test): |
|
156 """ |
|
157 Public method called at the start of a test. |
|
158 |
|
159 @param test reference to the test object |
|
160 @type TestCase |
|
161 """ |
|
162 super().startTest(test) |
|
163 |
|
164 self.__testsRun += 1 |
|
165 self.__currentTestStatus = { |
|
166 "event": "result", |
|
167 "status": "success", |
|
168 "name": str(test), |
|
169 "id": test.id(), |
|
170 "description": test.shortDescription(), |
|
171 } |
|
172 |
|
173 self.__writer.write({ |
|
174 "event": "started", |
|
175 "name": str(test), |
|
176 "id": test.id(), |
|
177 "description": test.shortDescription(), |
|
178 }) |
|
179 |
|
180 self.__startTime = time.monotonic_ns() |
|
181 |
|
182 def stopTest(self, test): |
|
183 """ |
|
184 Public method called at the end of a test. |
|
185 |
|
186 @param test reference to the test object |
|
187 @type TestCase |
|
188 """ |
|
189 stopTime = time.monotonic_ns() |
|
190 duration = (stopTime - self.__startTime) / 1_000_000 # ms |
|
191 |
|
192 super().stopTest(test) |
|
193 |
|
194 self.__currentTestStatus["duration_ms"] = duration |
|
195 self.__writer.write(self.__currentTestStatus) |
|
196 |
|
197 def startTestRun(self): |
|
198 """ |
|
199 Public method called once before any tests are executed. |
|
200 """ |
|
201 self.__totalStartTime = time.monotonic_ns() |
|
202 self.__testsRun = 0 |
|
203 |
|
204 def stopTestRun(self): |
|
205 """ |
|
206 Public method called once after all tests are executed. |
|
207 """ |
|
208 stopTime = time.monotonic_ns() |
|
209 duration = (stopTime - self.__totalStartTime) / 1_000_000_000 # s |
|
210 |
|
211 self.__writer.write({ |
|
212 "event": "finished", |
|
213 "duration_s": duration, |
|
214 "tests": self.__testsRun, |
|
215 }) |
|
216 |
|
217 |
|
218 def _assembleTestCasesList(suite): |
|
219 """ |
|
220 Protected function to assemble a list of test cases included in a test |
|
221 suite. |
|
222 |
|
223 @param suite test suite to be inspected |
|
224 @type unittest.TestSuite |
|
225 @return list of tuples containing the test case ID, the string |
|
226 representation and the short description |
|
227 @rtype list of tuples of (str, str) |
|
228 """ |
|
229 testCases = [] |
|
230 for test in suite: |
|
231 if isinstance(test, unittest.TestSuite): |
|
232 testCases.extend(_assembleTestCasesList(test)) |
|
233 else: |
|
234 testId = test.id() |
|
235 if ( |
|
236 "ModuleImportFailure" not in testId and |
|
237 "LoadTestsFailure" not in testId and |
|
238 "_FailedTest" not in testId |
|
239 ): |
|
240 testCases.append( |
|
241 (testId, str(test), test.shortDescription()) |
|
242 ) |
|
243 return testCases |
|
244 |
|
245 |
|
246 def runtest(argv): |
|
247 """ |
|
248 Function to run the tests. |
|
249 |
|
250 @param argv list of command line parameters. |
|
251 @type list of str |
|
252 """ |
|
253 from EricNetwork.EricJsonStreamWriter import EricJsonWriter |
|
254 writer = EricJsonWriter(argv[0], int(argv[1])) |
|
255 del argv[:2] |
|
256 |
|
257 # process arguments |
|
258 if argv[0] == "discover": |
|
259 discover = True |
|
260 argv.pop(0) |
|
261 if argv[0] == "--start-directory": |
|
262 discoveryStart = argv[1] |
|
263 del argv[:2] |
|
264 else: |
|
265 discover = False |
|
266 discoveryStart = "" |
|
267 |
|
268 failfast = "--failfast" in argv |
|
269 if failfast: |
|
270 argv.remove("--failfast") |
|
271 |
|
272 coverage = "--cover" in argv |
|
273 if coverage: |
|
274 argv.remove("--cover") |
|
275 coverageErase = "--cover-erase" in argv |
|
276 if coverageErase: |
|
277 argv.remove("--cover-erase") |
|
278 |
|
279 if not discover: |
|
280 testFileName, testName = argv[:2] |
|
281 del argv[:2] |
|
282 else: |
|
283 testFileName = testName = "" |
|
284 |
|
285 testCases = argv[:] |
|
286 |
|
287 if testFileName: |
|
288 sys.path.insert(1, os.path.dirname(os.path.abspath(testFileName))) |
|
289 elif discoveryStart: |
|
290 sys.path.insert(1, os.path.abspath(discoveryStart)) |
|
291 |
|
292 try: |
|
293 testLoader = unittest.TestLoader() |
|
294 if discover: |
|
295 if testCases: |
|
296 test = testLoader.loadTestsFromNames(testCases) |
|
297 else: |
|
298 test = testLoader.discover(discoveryStart) |
|
299 else: |
|
300 if testFileName: |
|
301 module = __import__(os.path.splitext( |
|
302 os.path.basename(testFileName))[0]) |
|
303 else: |
|
304 module = None |
|
305 # TODO: implement 'failed only' |
|
306 # if failedOnly and self.__failedTests: |
|
307 # if module: |
|
308 # failed = [t.split(".", 1)[1] |
|
309 # for t in self.__failedTests] |
|
310 # else: |
|
311 # failed = list(self.__failedTests) |
|
312 # test = testLoader.loadTestsFromNames( |
|
313 # failed, module) |
|
314 # else: |
|
315 test = testLoader.loadTestsFromName( |
|
316 testName, module) |
|
317 except Exception as err: |
|
318 print("Exception:", str(err)) |
|
319 writer.write({ |
|
320 "event": "collecterror", |
|
321 "error": str(err), |
|
322 }) |
|
323 sys.exit(1) |
|
324 |
|
325 collectedTests = { |
|
326 "event": "collected", |
|
327 "tests": [ |
|
328 {"id": id, "name": name, "description": desc} |
|
329 for id, name, desc in _assembleTestCasesList(test) |
|
330 ] |
|
331 } |
|
332 writer.write(collectedTests) |
|
333 |
|
334 # setup test coverage |
|
335 if coverage: |
|
336 if discover: |
|
337 covname = os.path.join(discoveryStart, "unittest") |
|
338 elif testFileName: |
|
339 covname = os.path.splitext( |
|
340 os.path.abspath(testFileName))[0] |
|
341 else: |
|
342 covname = "unittest" |
|
343 covDataFile = "{0}.coverage".format(covname) |
|
344 if not os.path.isabs(covDataFile): |
|
345 covDataFile = os.path.abspath(covDataFile) |
|
346 |
|
347 from DebugClients.Python.coverage import coverage as cov |
|
348 cover = cov(data_file=covDataFile) |
|
349 if coverageErase: |
|
350 cover.erase() |
|
351 else: |
|
352 cover = None |
|
353 |
|
354 testResult = EricTestResult(writer, failfast) |
|
355 startTestRun = getattr(testResult, 'startTestRun', None) |
|
356 if startTestRun is not None: |
|
357 startTestRun() |
|
358 try: |
|
359 if cover: |
|
360 cover.start() |
|
361 test.run(testResult) |
|
362 finally: |
|
363 if cover: |
|
364 cover.stop() |
|
365 cover.save() |
|
366 writer.write({ |
|
367 "event": "coverage", |
|
368 "file": covDataFile, |
|
369 }) |
|
370 stopTestRun = getattr(testResult, 'stopTestRun', None) |
|
371 if stopTestRun is not None: |
|
372 stopTestRun() |
|
373 |
|
374 writer.close() |
|
375 sys.exit(0) |
12 |
376 |
13 if __name__ == '__main__': |
377 if __name__ == '__main__': |
14 command = sys.argv[1] |
378 if len(sys.argv) > 1: |
15 if command == "installed": |
379 command = sys.argv[1] |
16 try: |
380 if command == "installed": |
17 import unittest # __IGNORE_WARNING__ |
|
18 sys.exit(0) |
381 sys.exit(0) |
19 except ImportError: |
382 |
20 sys.exit(1) |
383 elif command == "versions": |
21 |
384 import platform |
22 elif command == "versions": |
385 versions = { |
23 import platform |
386 "name": "unittest", |
24 versions = { |
387 "version": platform.python_version(), |
25 "name": "unittest", |
388 "plugins": [], |
26 "version": platform.python_version(), |
389 } |
27 "plugins": [], |
390 print(json.dumps(versions)) |
28 } |
391 sys.exit(0) |
29 print(json.dumps(versions)) |
392 |
30 sys.exit(0) |
393 elif command == "runtest": |
|
394 runtest(sys.argv[2:]) |
|
395 sys.exit(0) |
31 |
396 |
32 sys.exit(42) |
397 sys.exit(42) |
33 |
398 |
34 # |
399 # |
35 # eflag: noqa = M801 |
400 # eflag: noqa = M801 |