|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the executor for the 'pytest' framework. |
|
8 """ |
|
9 |
|
10 import contextlib |
|
11 import json |
|
12 import os |
|
13 |
|
14 from PyQt6.QtCore import pyqtSlot, QProcess |
|
15 |
|
16 from EricNetwork.EricJsonStreamReader import EricJsonReader |
|
17 |
|
18 from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory |
|
19 |
|
20 |
|
21 class PytestExecutor(TestExecutorBase): |
|
22 """ |
|
23 Class implementing the executor for the 'pytest' framework. |
|
24 """ |
|
25 module = "pytest" |
|
26 name = "pytest" |
|
27 |
|
28 runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py") |
|
29 |
|
30 def __init__(self, testWidget): |
|
31 """ |
|
32 Constructor |
|
33 |
|
34 @param testWidget reference to the unit test widget |
|
35 @type TestingWidget |
|
36 """ |
|
37 super().__init__(testWidget) |
|
38 |
|
39 self.__statusDisplayMapping = { |
|
40 "failed": self.tr("Failure"), |
|
41 "skipped": self.tr("Skipped"), |
|
42 "xfailed": self.tr("Expected Failure"), |
|
43 "xpassed": self.tr("Unexpected Success"), |
|
44 "passed": self.tr("Success"), |
|
45 } |
|
46 |
|
47 self.__config = None |
|
48 |
|
49 def getVersions(self, interpreter): |
|
50 """ |
|
51 Public method to get the test framework version and version information |
|
52 of its installed plugins. |
|
53 |
|
54 @param interpreter interpreter to be used for the test |
|
55 @type str |
|
56 @return dictionary containing the framework name and version and the |
|
57 list of available plugins with name and version each |
|
58 @rtype dict |
|
59 """ |
|
60 proc = QProcess() |
|
61 proc.start(interpreter, [PytestExecutor.runner, "versions"]) |
|
62 if proc.waitForFinished(3000): |
|
63 exitCode = proc.exitCode() |
|
64 if exitCode == 0: |
|
65 outputLines = self.readAllOutput(proc).splitlines() |
|
66 for line in outputLines: |
|
67 if line.startswith("{") and line.endswith("}"): |
|
68 with contextlib.suppress(json.JSONDecodeError): |
|
69 return json.loads(line) |
|
70 |
|
71 return {} |
|
72 |
|
73 def hasCoverage(self, interpreter): |
|
74 """ |
|
75 Public method to get the test framework version and version information |
|
76 of its installed plugins. |
|
77 |
|
78 @param interpreter interpreter to be used for the test |
|
79 @type str |
|
80 @return flag indicating the availability of coverage functionality |
|
81 @rtype bool |
|
82 """ |
|
83 versions = self.getVersions(interpreter) |
|
84 if "plugins" in versions: |
|
85 return any(plugin["name"] == "pytest-cov" |
|
86 for plugin in versions["plugins"]) |
|
87 |
|
88 return False |
|
89 |
|
90 def createArguments(self, config): |
|
91 """ |
|
92 Public method to create the arguments needed to start the test process. |
|
93 |
|
94 @param config configuration for the test execution |
|
95 @type TestConfig |
|
96 @return list of process arguments |
|
97 @rtype list of str |
|
98 """ |
|
99 # |
|
100 # collectCoverage: --cov= + --cov-report= to suppress report generation |
|
101 # eraseCoverage: --cov-append if eraseCoverage is False |
|
102 # coverageFile |
|
103 # |
|
104 args = [ |
|
105 PytestExecutor.runner, |
|
106 "runtest", |
|
107 self.reader.address(), |
|
108 str(self.reader.port()), |
|
109 "--quiet", |
|
110 ] |
|
111 |
|
112 if config.failFast: |
|
113 args.append("--exitfirst") |
|
114 |
|
115 if config.failedOnly: |
|
116 args.append("--last-failed") |
|
117 else: |
|
118 args.append("--cache-clear") |
|
119 |
|
120 if config.collectCoverage: |
|
121 args.extend([ |
|
122 "--cov=.", |
|
123 "--cov-report=" |
|
124 ]) |
|
125 if not config.eraseCoverage: |
|
126 args.append("--cov-append") |
|
127 |
|
128 if config.testFilename: |
|
129 if config.testName: |
|
130 args.append("{0}::{1}".format( |
|
131 config.testFilename, |
|
132 config.testName.replace(".", "::") |
|
133 )) |
|
134 else: |
|
135 args.append(config.testFilename) |
|
136 |
|
137 return args |
|
138 |
|
139 def start(self, config, pythonpath): |
|
140 """ |
|
141 Public method to start the testing process. |
|
142 |
|
143 @param config configuration for the test execution |
|
144 @type TestConfig |
|
145 @param pythonpath list of directories to be added to the Python path |
|
146 @type list of str |
|
147 """ |
|
148 self.reader = EricJsonReader(name="Unittest Reader", parent=self) |
|
149 self.reader.dataReceived.connect(self.__processData) |
|
150 |
|
151 self.__config = config |
|
152 |
|
153 if config.discoveryStart: |
|
154 pythonpath.insert(0, os.path.abspath(config.discoveryStart)) |
|
155 elif config.testFilename: |
|
156 pythonpath.insert( |
|
157 0, os.path.abspath(os.path.dirname(config.testFilename))) |
|
158 |
|
159 if config.discover: |
|
160 self.__rootdir = config.discoveryStart |
|
161 elif config.testFilename: |
|
162 self.__rootdir = os.path.dirname(config.testFilename) |
|
163 else: |
|
164 self.__rootdir = "" |
|
165 |
|
166 super().start(config, pythonpath) |
|
167 |
|
168 def finished(self): |
|
169 """ |
|
170 Public method handling the unit test process been finished. |
|
171 |
|
172 This method should read the results (if necessary) and emit the signal |
|
173 testFinished. |
|
174 """ |
|
175 if self.__config.collectCoverage: |
|
176 self.coverageDataSaved.emit( |
|
177 os.path.join(self.__rootdir, ".coverage")) |
|
178 |
|
179 self.__config = None |
|
180 |
|
181 self.reader.close() |
|
182 |
|
183 output = self.readAllOutput() |
|
184 self.testFinished.emit([], output) |
|
185 |
|
186 @pyqtSlot(object) |
|
187 def __processData(self, data): |
|
188 """ |
|
189 Private slot to process the received data. |
|
190 |
|
191 @param data data object received |
|
192 @type dict |
|
193 """ |
|
194 # test configuration |
|
195 if data["event"] == "config": |
|
196 self.__rootdir = data["root"] |
|
197 |
|
198 # error collecting tests |
|
199 elif data["event"] == "collecterror": |
|
200 name = self.__normalizeModuleName(data["nodeid"]) |
|
201 self.collectError.emit([(name, data["report"])]) |
|
202 |
|
203 # tests collected |
|
204 elif data["event"] == "collected": |
|
205 self.collected.emit([ |
|
206 (data["nodeid"], |
|
207 self.__nodeid2testname(data["nodeid"]), |
|
208 "") |
|
209 ]) |
|
210 |
|
211 # test started |
|
212 elif data["event"] == "starttest": |
|
213 self.startTest.emit( |
|
214 (data["nodeid"], |
|
215 self.__nodeid2testname(data["nodeid"]), |
|
216 "") |
|
217 ) |
|
218 |
|
219 # test result |
|
220 elif data["event"] == "result": |
|
221 if data["status"] in ("failed", "xpassed") or data["with_error"]: |
|
222 category = TestResultCategory.FAIL |
|
223 elif data["status"] in ("passed", "xfailed"): |
|
224 category = TestResultCategory.OK |
|
225 else: |
|
226 category = TestResultCategory.SKIP |
|
227 |
|
228 status = ( |
|
229 self.tr("Error") |
|
230 if data["with_error"] else |
|
231 self.__statusDisplayMapping[data["status"]] |
|
232 ) |
|
233 |
|
234 message = data.get("message", "") |
|
235 extraText = data.get("report", "") |
|
236 reportPhase = data.get("report_phase") |
|
237 if reportPhase in ("setup", "teardown"): |
|
238 message = ( |
|
239 self.tr("ERROR at {0}: {1}", "phase, message") |
|
240 .format(reportPhase, message) |
|
241 ) |
|
242 extraText = ( |
|
243 self.tr("ERROR at {0}: {1}", "phase, extra text") |
|
244 .format(reportPhase, extraText) |
|
245 ) |
|
246 sections = data.get("sections", []) |
|
247 if sections: |
|
248 extraText += "\n" |
|
249 for heading, text in sections: |
|
250 extraText += "----- {0} -----\n{1}".format(heading, text) |
|
251 |
|
252 duration = data.get("duration_s", None) |
|
253 if duration: |
|
254 # convert to ms |
|
255 duration *= 1000 |
|
256 |
|
257 filename = data["filename"] |
|
258 if self.__rootdir: |
|
259 filename = os.path.join(self.__rootdir, filename) |
|
260 |
|
261 self.testResult.emit(TestResult( |
|
262 category=category, |
|
263 status=status, |
|
264 name=self.__nodeid2testname(data["nodeid"]), |
|
265 id=data["nodeid"], |
|
266 description="", |
|
267 message=message, |
|
268 extra=extraText.rstrip().splitlines(), |
|
269 duration=duration, |
|
270 filename=filename, |
|
271 lineno=data.get("linenumber", 0) + 1, |
|
272 # pytest reports 0-based line numbers |
|
273 )) |
|
274 |
|
275 # test run finished |
|
276 elif data["event"] == "finished": |
|
277 self.testRunFinished.emit(data["tests"], data["duration_s"]) |
|
278 |
|
279 def __normalizeModuleName(self, name): |
|
280 r""" |
|
281 Private method to convert a module name reported by pytest to Python |
|
282 conventions. |
|
283 |
|
284 This method strips the extensions '.pyw' and '.py' first and replaces |
|
285 '/' and '\' thereafter. |
|
286 |
|
287 @param name module name reported by pytest |
|
288 @type str |
|
289 @return module name iaw. Python conventions |
|
290 @rtype str |
|
291 """ |
|
292 return (name |
|
293 .replace(".pyw", "") |
|
294 .replace(".py", "") |
|
295 .replace("/", ".") |
|
296 .replace("\\", ".")) |
|
297 |
|
298 def __nodeid2testname(self, nodeid): |
|
299 """ |
|
300 Private method to convert a nodeid to a test name. |
|
301 |
|
302 @param nodeid nodeid to be converted |
|
303 @type str |
|
304 @return test name |
|
305 @rtype str |
|
306 """ |
|
307 module, name = nodeid.split("::", 1) |
|
308 module = self.__normalizeModuleName(module) |
|
309 name = name.replace("::", ".") |
|
310 testname, name = "{0}.{1}".format(module, name).rsplit(".", 1) |
|
311 return "{0} ({1})".format(name, testname) |