|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the executor for the 'pytest' framework. |
|
8 """ |
|
9 |
|
10 import contextlib |
|
11 import json |
|
12 import os |
|
13 |
|
14 from PyQt6.QtCore import pyqtSlot, QProcess |
|
15 |
|
16 from EricNetwork.EricJsonStreamReader import EricJsonReader |
|
17 |
|
18 from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory |
|
19 |
|
20 |
|
21 class PytestExecutor(TestExecutorBase): |
|
22 """ |
|
23 Class implementing the executor for the 'pytest' framework. |
|
24 """ |
|
25 module = "pytest" |
|
26 name = "pytest" |
|
27 |
|
28 runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py") |
|
29 |
|
30 def __init__(self, testWidget): |
|
31 """ |
|
32 Constructor |
|
33 |
|
34 @param testWidget reference to the unit test widget |
|
35 @type TestingWidget |
|
36 """ |
|
37 super().__init__(testWidget) |
|
38 |
|
39 self.__statusDisplayMapping = { |
|
40 "failed": self.tr("Failure"), |
|
41 "skipped": self.tr("Skipped"), |
|
42 "xfailed": self.tr("Expected Failure"), |
|
43 "xpassed": self.tr("Unexpected Success"), |
|
44 "passed": self.tr("Success"), |
|
45 } |
|
46 |
|
47 self.__config = None |
|
48 |
|
49 def getVersions(self, interpreter): |
|
50 """ |
|
51 Public method to get the test framework version and version information |
|
52 of its installed plugins. |
|
53 |
|
54 @param interpreter interpreter to be used for the test |
|
55 @type str |
|
56 @return dictionary containing the framework name and version and the |
|
57 list of available plugins with name and version each |
|
58 @rtype dict |
|
59 """ |
|
60 proc = QProcess() |
|
61 proc.start(interpreter, [PytestExecutor.runner, "versions"]) |
|
62 if proc.waitForFinished(3000): |
|
63 exitCode = proc.exitCode() |
|
64 if exitCode == 0: |
|
65 outputLines = self.readAllOutput(proc).splitlines() |
|
66 for line in outputLines: |
|
67 if line.startswith("{") and line.endswith("}"): |
|
68 with contextlib.suppress(json.JSONDecodeError): |
|
69 return json.loads(line) |
|
70 |
|
71 return {} |
|
72 |
|
73 def hasCoverage(self, interpreter): |
|
74 """ |
|
75 Public method to get the test framework version and version information |
|
76 of its installed plugins. |
|
77 |
|
78 @param interpreter interpreter to be used for the test |
|
79 @type str |
|
80 @return flag indicating the availability of coverage functionality |
|
81 @rtype bool |
|
82 """ |
|
83 versions = self.getVersions(interpreter) |
|
84 if "plugins" in versions: |
|
85 return any(plugin["name"] == "pytest-cov" |
|
86 for plugin in versions["plugins"]) |
|
87 |
|
88 return False |
|
89 |
|
90 def createArguments(self, config): |
|
91 """ |
|
92 Public method to create the arguments needed to start the test process. |
|
93 |
|
94 @param config configuration for the test execution |
|
95 @type TestConfig |
|
96 @return list of process arguments |
|
97 @rtype list of str |
|
98 """ |
|
99 # |
|
100 # collectCoverage: --cov= + --cov-report= to suppress report generation |
|
101 # eraseCoverage: --cov-append if eraseCoverage is False |
|
102 # coverageFile |
|
103 args = [ |
|
104 PytestExecutor.runner, |
|
105 "runtest", |
|
106 self.reader.address(), |
|
107 str(self.reader.port()), |
|
108 "--quiet", |
|
109 ] |
|
110 |
|
111 if config.failFast: |
|
112 args.append("--exitfirst") |
|
113 |
|
114 if config.failedOnly: |
|
115 args.append("--last-failed") |
|
116 else: |
|
117 args.append("--cache-clear") |
|
118 |
|
119 if config.collectCoverage: |
|
120 args.extend([ |
|
121 "--cov=.", |
|
122 "--cov-report=" |
|
123 ]) |
|
124 if not config.eraseCoverage: |
|
125 args.append("--cov-append") |
|
126 |
|
127 if config.testFilename: |
|
128 if config.testName: |
|
129 args.append("{0}::{1}".format( |
|
130 config.testFilename, |
|
131 config.testName.replace(".", "::") |
|
132 )) |
|
133 else: |
|
134 args.append(config.testFilename) |
|
135 |
|
136 return args |
|
137 |
|
138 def start(self, config, pythonpath): |
|
139 """ |
|
140 Public method to start the testing process. |
|
141 |
|
142 @param config configuration for the test execution |
|
143 @type TestConfig |
|
144 @param pythonpath list of directories to be added to the Python path |
|
145 @type list of str |
|
146 """ |
|
147 self.reader = EricJsonReader(name="Unittest Reader", parent=self) |
|
148 self.reader.dataReceived.connect(self.__processData) |
|
149 |
|
150 self.__config = config |
|
151 |
|
152 if config.discoveryStart: |
|
153 pythonpath.insert(0, os.path.abspath(config.discoveryStart)) |
|
154 elif config.testFilename: |
|
155 pythonpath.insert( |
|
156 0, os.path.abspath(os.path.dirname(config.testFilename))) |
|
157 |
|
158 if config.discover: |
|
159 self.__rootdir = config.discoveryStart |
|
160 elif config.testFilename: |
|
161 self.__rootdir = os.path.dirname(config.testFilename) |
|
162 else: |
|
163 self.__rootdir = "" |
|
164 |
|
165 super().start(config, pythonpath) |
|
166 |
|
167 def finished(self): |
|
168 """ |
|
169 Public method handling the unit test process been finished. |
|
170 |
|
171 This method should read the results (if necessary) and emit the signal |
|
172 testFinished. |
|
173 """ |
|
174 if self.__config.collectCoverage: |
|
175 self.coverageDataSaved.emit( |
|
176 os.path.join(self.__rootdir, ".coverage")) |
|
177 |
|
178 self.__config = None |
|
179 |
|
180 self.reader.close() |
|
181 |
|
182 output = self.readAllOutput() |
|
183 self.testFinished.emit([], output) |
|
184 |
|
185 @pyqtSlot(object) |
|
186 def __processData(self, data): |
|
187 """ |
|
188 Private slot to process the received data. |
|
189 |
|
190 @param data data object received |
|
191 @type dict |
|
192 """ |
|
193 # test configuration |
|
194 if data["event"] == "config": |
|
195 self.__rootdir = data["root"] |
|
196 |
|
197 # error collecting tests |
|
198 elif data["event"] == "collecterror": |
|
199 name = self.__normalizeModuleName(data["nodeid"]) |
|
200 self.collectError.emit([(name, data["report"])]) |
|
201 |
|
202 # tests collected |
|
203 elif data["event"] == "collected": |
|
204 self.collected.emit([ |
|
205 (data["nodeid"], |
|
206 self.__nodeid2testname(data["nodeid"]), |
|
207 "") |
|
208 ]) |
|
209 |
|
210 # test started |
|
211 elif data["event"] == "starttest": |
|
212 self.startTest.emit( |
|
213 (data["nodeid"], |
|
214 self.__nodeid2testname(data["nodeid"]), |
|
215 "") |
|
216 ) |
|
217 |
|
218 # test result |
|
219 elif data["event"] == "result": |
|
220 if data["status"] in ("failed", "xpassed") or data["with_error"]: |
|
221 category = TestResultCategory.FAIL |
|
222 elif data["status"] in ("passed", "xfailed"): |
|
223 category = TestResultCategory.OK |
|
224 else: |
|
225 category = TestResultCategory.SKIP |
|
226 |
|
227 status = ( |
|
228 self.tr("Error") |
|
229 if data["with_error"] else |
|
230 self.__statusDisplayMapping[data["status"]] |
|
231 ) |
|
232 |
|
233 message = data.get("message", "") |
|
234 extraText = data.get("report", "") |
|
235 reportPhase = data.get("report_phase") |
|
236 if reportPhase in ("setup", "teardown"): |
|
237 message = ( |
|
238 self.tr("ERROR at {0}: {1}", "phase, message") |
|
239 .format(reportPhase, message) |
|
240 ) |
|
241 extraText = ( |
|
242 self.tr("ERROR at {0}: {1}", "phase, extra text") |
|
243 .format(reportPhase, extraText) |
|
244 ) |
|
245 sections = data.get("sections", []) |
|
246 if sections: |
|
247 extraText += "\n" |
|
248 for heading, text in sections: |
|
249 extraText += "----- {0} -----\n{1}".format(heading, text) |
|
250 |
|
251 duration = data.get("duration_s", None) |
|
252 if duration: |
|
253 # convert to ms |
|
254 duration *= 1000 |
|
255 |
|
256 filename = data["filename"] |
|
257 if self.__rootdir: |
|
258 filename = os.path.join(self.__rootdir, filename) |
|
259 |
|
260 self.testResult.emit(TestResult( |
|
261 category=category, |
|
262 status=status, |
|
263 name=self.__nodeid2testname(data["nodeid"]), |
|
264 id=data["nodeid"], |
|
265 description="", |
|
266 message=message, |
|
267 extra=extraText.rstrip().splitlines(), |
|
268 duration=duration, |
|
269 filename=filename, |
|
270 lineno=data.get("linenumber", 0) + 1, |
|
271 # pytest reports 0-based line numbers |
|
272 )) |
|
273 |
|
274 # test run finished |
|
275 elif data["event"] == "finished": |
|
276 self.testRunFinished.emit(data["tests"], data["duration_s"]) |
|
277 |
|
278 def __normalizeModuleName(self, name): |
|
279 r""" |
|
280 Private method to convert a module name reported by pytest to Python |
|
281 conventions. |
|
282 |
|
283 This method strips the extensions '.pyw' and '.py' first and replaces |
|
284 '/' and '\' thereafter. |
|
285 |
|
286 @param name module name reported by pytest |
|
287 @type str |
|
288 @return module name iaw. Python conventions |
|
289 @rtype str |
|
290 """ |
|
291 return (name |
|
292 .replace(".pyw", "") |
|
293 .replace(".py", "") |
|
294 .replace("/", ".") |
|
295 .replace("\\", ".")) |
|
296 |
|
297 def __nodeid2testname(self, nodeid): |
|
298 """ |
|
299 Private method to convert a nodeid to a test name. |
|
300 |
|
301 @param nodeid nodeid to be converted |
|
302 @type str |
|
303 @return test name |
|
304 @rtype str |
|
305 """ |
|
306 module, name = nodeid.split("::", 1) |
|
307 module = self.__normalizeModuleName(module) |
|
308 name = name.replace("::", ".") |
|
309 testname, name = "{0}.{1}".format(module, name).rsplit(".", 1) |
|
310 return "{0} ({1})".format(name, testname) |