|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the executor for the 'pytest' framework. |
|
8 """ |
|
9 |
|
10 import contextlib |
|
11 import json |
|
12 import os |
|
13 |
|
14 from PyQt6.QtCore import pyqtSlot, QProcess |
|
15 |
|
16 from EricNetwork.EricJsonStreamReader import EricJsonReader |
|
17 |
|
18 from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory |
|
19 |
|
20 |
|
21 class PytestExecutor(TestExecutorBase): |
|
22 """ |
|
23 Class implementing the executor for the 'pytest' framework. |
|
24 """ |
|
25 |
|
26 module = "pytest" |
|
27 name = "pytest" |
|
28 |
|
29 runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py") |
|
30 |
|
31 def __init__(self, testWidget): |
|
32 """ |
|
33 Constructor |
|
34 |
|
35 @param testWidget reference to the unit test widget |
|
36 @type TestingWidget |
|
37 """ |
|
38 super().__init__(testWidget) |
|
39 |
|
40 self.__statusDisplayMapping = { |
|
41 "failed": self.tr("Failure"), |
|
42 "skipped": self.tr("Skipped"), |
|
43 "xfailed": self.tr("Expected Failure"), |
|
44 "xpassed": self.tr("Unexpected Success"), |
|
45 "passed": self.tr("Success"), |
|
46 } |
|
47 |
|
48 self.__config = None |
|
49 |
|
50 def getVersions(self, interpreter): |
|
51 """ |
|
52 Public method to get the test framework version and version information |
|
53 of its installed plugins. |
|
54 |
|
55 @param interpreter interpreter to be used for the test |
|
56 @type str |
|
57 @return dictionary containing the framework name and version and the |
|
58 list of available plugins with name and version each |
|
59 @rtype dict |
|
60 """ |
|
61 proc = QProcess() |
|
62 proc.start(interpreter, [PytestExecutor.runner, "versions"]) |
|
63 if proc.waitForFinished(3000): |
|
64 exitCode = proc.exitCode() |
|
65 if exitCode == 0: |
|
66 outputLines = self.readAllOutput(proc).splitlines() |
|
67 for line in outputLines: |
|
68 if line.startswith("{") and line.endswith("}"): |
|
69 with contextlib.suppress(json.JSONDecodeError): |
|
70 return json.loads(line) |
|
71 |
|
72 return {} |
|
73 |
|
74 def hasCoverage(self, interpreter): |
|
75 """ |
|
76 Public method to get the test framework version and version information |
|
77 of its installed plugins. |
|
78 |
|
79 @param interpreter interpreter to be used for the test |
|
80 @type str |
|
81 @return flag indicating the availability of coverage functionality |
|
82 @rtype bool |
|
83 """ |
|
84 versions = self.getVersions(interpreter) |
|
85 if "plugins" in versions: |
|
86 return any(plugin["name"] == "pytest-cov" for plugin in versions["plugins"]) |
|
87 |
|
88 return False |
|
89 |
|
90 def createArguments(self, config): |
|
91 """ |
|
92 Public method to create the arguments needed to start the test process. |
|
93 |
|
94 @param config configuration for the test execution |
|
95 @type TestConfig |
|
96 @return list of process arguments |
|
97 @rtype list of str |
|
98 """ |
|
99 # |
|
100 # collectCoverage: --cov= + --cov-report= to suppress report generation |
|
101 # eraseCoverage: --cov-append if eraseCoverage is False |
|
102 # coverageFile |
|
103 # |
|
104 args = [ |
|
105 PytestExecutor.runner, |
|
106 "runtest", |
|
107 self.reader.address(), |
|
108 str(self.reader.port()), |
|
109 "--quiet", |
|
110 ] |
|
111 |
|
112 if config.failFast: |
|
113 args.append("--exitfirst") |
|
114 |
|
115 if config.failedOnly: |
|
116 args.append("--last-failed") |
|
117 else: |
|
118 args.append("--cache-clear") |
|
119 |
|
120 if config.collectCoverage: |
|
121 args.extend(["--cov=.", "--cov-report="]) |
|
122 if not config.eraseCoverage: |
|
123 args.append("--cov-append") |
|
124 |
|
125 if config.testFilename: |
|
126 if config.testName: |
|
127 args.append( |
|
128 "{0}::{1}".format( |
|
129 config.testFilename, config.testName.replace(".", "::") |
|
130 ) |
|
131 ) |
|
132 else: |
|
133 args.append(config.testFilename) |
|
134 |
|
135 return args |
|
136 |
|
137 def start(self, config, pythonpath): |
|
138 """ |
|
139 Public method to start the testing process. |
|
140 |
|
141 @param config configuration for the test execution |
|
142 @type TestConfig |
|
143 @param pythonpath list of directories to be added to the Python path |
|
144 @type list of str |
|
145 """ |
|
146 self.reader = EricJsonReader(name="Unittest Reader", parent=self) |
|
147 self.reader.dataReceived.connect(self.__processData) |
|
148 |
|
149 self.__config = config |
|
150 |
|
151 if config.discoveryStart: |
|
152 pythonpath.insert(0, os.path.abspath(config.discoveryStart)) |
|
153 elif config.testFilename: |
|
154 pythonpath.insert(0, os.path.abspath(os.path.dirname(config.testFilename))) |
|
155 |
|
156 if config.discover: |
|
157 self.__rootdir = config.discoveryStart |
|
158 elif config.testFilename: |
|
159 self.__rootdir = os.path.dirname(config.testFilename) |
|
160 else: |
|
161 self.__rootdir = "" |
|
162 |
|
163 super().start(config, pythonpath) |
|
164 |
|
165 def finished(self): |
|
166 """ |
|
167 Public method handling the unit test process been finished. |
|
168 |
|
169 This method should read the results (if necessary) and emit the signal |
|
170 testFinished. |
|
171 """ |
|
172 if self.__config.collectCoverage: |
|
173 self.coverageDataSaved.emit(os.path.join(self.__rootdir, ".coverage")) |
|
174 |
|
175 self.__config = None |
|
176 |
|
177 self.reader.close() |
|
178 |
|
179 output = self.readAllOutput() |
|
180 self.testFinished.emit([], output) |
|
181 |
|
182 @pyqtSlot(object) |
|
183 def __processData(self, data): |
|
184 """ |
|
185 Private slot to process the received data. |
|
186 |
|
187 @param data data object received |
|
188 @type dict |
|
189 """ |
|
190 # test configuration |
|
191 if data["event"] == "config": |
|
192 self.__rootdir = data["root"] |
|
193 |
|
194 # error collecting tests |
|
195 elif data["event"] == "collecterror": |
|
196 name = self.__normalizeModuleName(data["nodeid"]) |
|
197 self.collectError.emit([(name, data["report"])]) |
|
198 |
|
199 # tests collected |
|
200 elif data["event"] == "collected": |
|
201 self.collected.emit( |
|
202 [(data["nodeid"], self.__nodeid2testname(data["nodeid"]), "")] |
|
203 ) |
|
204 |
|
205 # test started |
|
206 elif data["event"] == "starttest": |
|
207 self.startTest.emit( |
|
208 (data["nodeid"], self.__nodeid2testname(data["nodeid"]), "") |
|
209 ) |
|
210 |
|
211 # test result |
|
212 elif data["event"] == "result": |
|
213 if data["status"] in ("failed", "xpassed") or data["with_error"]: |
|
214 category = TestResultCategory.FAIL |
|
215 elif data["status"] in ("passed", "xfailed"): |
|
216 category = TestResultCategory.OK |
|
217 else: |
|
218 category = TestResultCategory.SKIP |
|
219 |
|
220 status = ( |
|
221 self.tr("Error") |
|
222 if data["with_error"] |
|
223 else self.__statusDisplayMapping[data["status"]] |
|
224 ) |
|
225 |
|
226 message = data.get("message", "") |
|
227 extraText = data.get("report", "") |
|
228 reportPhase = data.get("report_phase") |
|
229 if reportPhase in ("setup", "teardown"): |
|
230 message = self.tr("ERROR at {0}: {1}", "phase, message").format( |
|
231 reportPhase, message |
|
232 ) |
|
233 extraText = self.tr("ERROR at {0}: {1}", "phase, extra text").format( |
|
234 reportPhase, extraText |
|
235 ) |
|
236 sections = data.get("sections", []) |
|
237 if sections: |
|
238 extraText += "\n" |
|
239 for heading, text in sections: |
|
240 extraText += "----- {0} -----\n{1}".format(heading, text) |
|
241 |
|
242 duration = data.get("duration_s", None) |
|
243 if duration: |
|
244 # convert to ms |
|
245 duration *= 1000 |
|
246 |
|
247 filename = data["filename"] |
|
248 if self.__rootdir: |
|
249 filename = os.path.join(self.__rootdir, filename) |
|
250 |
|
251 self.testResult.emit( |
|
252 TestResult( |
|
253 category=category, |
|
254 status=status, |
|
255 name=self.__nodeid2testname(data["nodeid"]), |
|
256 id=data["nodeid"], |
|
257 description="", |
|
258 message=message, |
|
259 extra=extraText.rstrip().splitlines(), |
|
260 duration=duration, |
|
261 filename=filename, |
|
262 lineno=data.get("linenumber", 0) + 1, |
|
263 # pytest reports 0-based line numbers |
|
264 ) |
|
265 ) |
|
266 |
|
267 # test run finished |
|
268 elif data["event"] == "finished": |
|
269 self.testRunFinished.emit(data["tests"], data["duration_s"]) |
|
270 |
|
271 def __normalizeModuleName(self, name): |
|
272 r""" |
|
273 Private method to convert a module name reported by pytest to Python |
|
274 conventions. |
|
275 |
|
276 This method strips the extensions '.pyw' and '.py' first and replaces |
|
277 '/' and '\' thereafter. |
|
278 |
|
279 @param name module name reported by pytest |
|
280 @type str |
|
281 @return module name iaw. Python conventions |
|
282 @rtype str |
|
283 """ |
|
284 return ( |
|
285 name.replace(".pyw", "") |
|
286 .replace(".py", "") |
|
287 .replace("/", ".") |
|
288 .replace("\\", ".") |
|
289 ) |
|
290 |
|
291 def __nodeid2testname(self, nodeid): |
|
292 """ |
|
293 Private method to convert a nodeid to a test name. |
|
294 |
|
295 @param nodeid nodeid to be converted |
|
296 @type str |
|
297 @return test name |
|
298 @rtype str |
|
299 """ |
|
300 module, name = nodeid.split("::", 1) |
|
301 module = self.__normalizeModuleName(module) |
|
302 name = name.replace("::", ".") |
|
303 testname, name = "{0}.{1}".format(module, name).rsplit(".", 1) |
|
304 return "{0} ({1})".format(name, testname) |