src/eric7/Testing/Interfaces/PytestExecutor.py

branch
eric7-maintenance
changeset 9264
18a7312cfdb3
parent 9192
a763d57e23bc
parent 9221
bf71ee032bb4
child 9371
1da8bc75946f
equal deleted inserted replaced
9241:d23e9854aea4 9264:18a7312cfdb3
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the executor for the 'pytest' framework.
8 """
9
10 import contextlib
11 import json
12 import os
13
14 from PyQt6.QtCore import pyqtSlot, QProcess
15
16 from EricNetwork.EricJsonStreamReader import EricJsonReader
17
18 from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory
19
20
21 class PytestExecutor(TestExecutorBase):
22 """
23 Class implementing the executor for the 'pytest' framework.
24 """
25
26 module = "pytest"
27 name = "pytest"
28
29 runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py")
30
31 def __init__(self, testWidget):
32 """
33 Constructor
34
35 @param testWidget reference to the unit test widget
36 @type TestingWidget
37 """
38 super().__init__(testWidget)
39
40 self.__statusDisplayMapping = {
41 "failed": self.tr("Failure"),
42 "skipped": self.tr("Skipped"),
43 "xfailed": self.tr("Expected Failure"),
44 "xpassed": self.tr("Unexpected Success"),
45 "passed": self.tr("Success"),
46 }
47
48 self.__config = None
49
50 def getVersions(self, interpreter):
51 """
52 Public method to get the test framework version and version information
53 of its installed plugins.
54
55 @param interpreter interpreter to be used for the test
56 @type str
57 @return dictionary containing the framework name and version and the
58 list of available plugins with name and version each
59 @rtype dict
60 """
61 proc = QProcess()
62 proc.start(interpreter, [PytestExecutor.runner, "versions"])
63 if proc.waitForFinished(3000):
64 exitCode = proc.exitCode()
65 if exitCode == 0:
66 outputLines = self.readAllOutput(proc).splitlines()
67 for line in outputLines:
68 if line.startswith("{") and line.endswith("}"):
69 with contextlib.suppress(json.JSONDecodeError):
70 return json.loads(line)
71
72 return {}
73
74 def hasCoverage(self, interpreter):
75 """
76 Public method to get the test framework version and version information
77 of its installed plugins.
78
79 @param interpreter interpreter to be used for the test
80 @type str
81 @return flag indicating the availability of coverage functionality
82 @rtype bool
83 """
84 versions = self.getVersions(interpreter)
85 if "plugins" in versions:
86 return any(plugin["name"] == "pytest-cov" for plugin in versions["plugins"])
87
88 return False
89
90 def createArguments(self, config):
91 """
92 Public method to create the arguments needed to start the test process.
93
94 @param config configuration for the test execution
95 @type TestConfig
96 @return list of process arguments
97 @rtype list of str
98 """
99 #
100 # collectCoverage: --cov= + --cov-report= to suppress report generation
101 # eraseCoverage: --cov-append if eraseCoverage is False
102 # coverageFile
103 #
104 args = [
105 PytestExecutor.runner,
106 "runtest",
107 self.reader.address(),
108 str(self.reader.port()),
109 "--quiet",
110 ]
111
112 if config.failFast:
113 args.append("--exitfirst")
114
115 if config.failedOnly:
116 args.append("--last-failed")
117 else:
118 args.append("--cache-clear")
119
120 if config.collectCoverage:
121 args.extend(["--cov=.", "--cov-report="])
122 if not config.eraseCoverage:
123 args.append("--cov-append")
124
125 if config.testFilename:
126 if config.testName:
127 args.append(
128 "{0}::{1}".format(
129 config.testFilename, config.testName.replace(".", "::")
130 )
131 )
132 else:
133 args.append(config.testFilename)
134
135 return args
136
137 def start(self, config, pythonpath):
138 """
139 Public method to start the testing process.
140
141 @param config configuration for the test execution
142 @type TestConfig
143 @param pythonpath list of directories to be added to the Python path
144 @type list of str
145 """
146 self.reader = EricJsonReader(name="Unittest Reader", parent=self)
147 self.reader.dataReceived.connect(self.__processData)
148
149 self.__config = config
150
151 if config.discoveryStart:
152 pythonpath.insert(0, os.path.abspath(config.discoveryStart))
153 elif config.testFilename:
154 pythonpath.insert(0, os.path.abspath(os.path.dirname(config.testFilename)))
155
156 if config.discover:
157 self.__rootdir = config.discoveryStart
158 elif config.testFilename:
159 self.__rootdir = os.path.dirname(config.testFilename)
160 else:
161 self.__rootdir = ""
162
163 super().start(config, pythonpath)
164
165 def finished(self):
166 """
167 Public method handling the unit test process been finished.
168
169 This method should read the results (if necessary) and emit the signal
170 testFinished.
171 """
172 if self.__config.collectCoverage:
173 self.coverageDataSaved.emit(os.path.join(self.__rootdir, ".coverage"))
174
175 self.__config = None
176
177 self.reader.close()
178
179 output = self.readAllOutput()
180 self.testFinished.emit([], output)
181
182 @pyqtSlot(object)
183 def __processData(self, data):
184 """
185 Private slot to process the received data.
186
187 @param data data object received
188 @type dict
189 """
190 # test configuration
191 if data["event"] == "config":
192 self.__rootdir = data["root"]
193
194 # error collecting tests
195 elif data["event"] == "collecterror":
196 name = self.__normalizeModuleName(data["nodeid"])
197 self.collectError.emit([(name, data["report"])])
198
199 # tests collected
200 elif data["event"] == "collected":
201 self.collected.emit(
202 [(data["nodeid"], self.__nodeid2testname(data["nodeid"]), "")]
203 )
204
205 # test started
206 elif data["event"] == "starttest":
207 self.startTest.emit(
208 (data["nodeid"], self.__nodeid2testname(data["nodeid"]), "")
209 )
210
211 # test result
212 elif data["event"] == "result":
213 if data["status"] in ("failed", "xpassed") or data["with_error"]:
214 category = TestResultCategory.FAIL
215 elif data["status"] in ("passed", "xfailed"):
216 category = TestResultCategory.OK
217 else:
218 category = TestResultCategory.SKIP
219
220 status = (
221 self.tr("Error")
222 if data["with_error"]
223 else self.__statusDisplayMapping[data["status"]]
224 )
225
226 message = data.get("message", "")
227 extraText = data.get("report", "")
228 reportPhase = data.get("report_phase")
229 if reportPhase in ("setup", "teardown"):
230 message = self.tr("ERROR at {0}: {1}", "phase, message").format(
231 reportPhase, message
232 )
233 extraText = self.tr("ERROR at {0}: {1}", "phase, extra text").format(
234 reportPhase, extraText
235 )
236 sections = data.get("sections", [])
237 if sections:
238 extraText += "\n"
239 for heading, text in sections:
240 extraText += "----- {0} -----\n{1}".format(heading, text)
241
242 duration = data.get("duration_s", None)
243 if duration:
244 # convert to ms
245 duration *= 1000
246
247 filename = data["filename"]
248 if self.__rootdir:
249 filename = os.path.join(self.__rootdir, filename)
250
251 self.testResult.emit(
252 TestResult(
253 category=category,
254 status=status,
255 name=self.__nodeid2testname(data["nodeid"]),
256 id=data["nodeid"],
257 description="",
258 message=message,
259 extra=extraText.rstrip().splitlines(),
260 duration=duration,
261 filename=filename,
262 lineno=data.get("linenumber", 0) + 1,
263 # pytest reports 0-based line numbers
264 )
265 )
266
267 # test run finished
268 elif data["event"] == "finished":
269 self.testRunFinished.emit(data["tests"], data["duration_s"])
270
271 def __normalizeModuleName(self, name):
272 r"""
273 Private method to convert a module name reported by pytest to Python
274 conventions.
275
276 This method strips the extensions '.pyw' and '.py' first and replaces
277 '/' and '\' thereafter.
278
279 @param name module name reported by pytest
280 @type str
281 @return module name iaw. Python conventions
282 @rtype str
283 """
284 return (
285 name.replace(".pyw", "")
286 .replace(".py", "")
287 .replace("/", ".")
288 .replace("\\", ".")
289 )
290
291 def __nodeid2testname(self, nodeid):
292 """
293 Private method to convert a nodeid to a test name.
294
295 @param nodeid nodeid to be converted
296 @type str
297 @return test name
298 @rtype str
299 """
300 module, name = nodeid.split("::", 1)
301 module = self.__normalizeModuleName(module)
302 name = name.replace("::", ".")
303 testname, name = "{0}.{1}".format(module, name).rsplit(".", 1)
304 return "{0} ({1})".format(name, testname)

eric ide

mercurial