src/eric7/Testing/Interfaces/PytestExecutor.py

branch
eric7
changeset 9209
b99e7fd55fd3
parent 9175
21e2be5f0b41
child 9221
bf71ee032bb4
equal deleted inserted replaced
9208:3fc8dfeb6ebe 9209:b99e7fd55fd3
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the executor for the 'pytest' framework.
8 """
9
10 import contextlib
11 import json
12 import os
13
14 from PyQt6.QtCore import pyqtSlot, QProcess
15
16 from EricNetwork.EricJsonStreamReader import EricJsonReader
17
18 from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory
19
20
21 class PytestExecutor(TestExecutorBase):
22 """
23 Class implementing the executor for the 'pytest' framework.
24 """
25 module = "pytest"
26 name = "pytest"
27
28 runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py")
29
30 def __init__(self, testWidget):
31 """
32 Constructor
33
34 @param testWidget reference to the unit test widget
35 @type TestingWidget
36 """
37 super().__init__(testWidget)
38
39 self.__statusDisplayMapping = {
40 "failed": self.tr("Failure"),
41 "skipped": self.tr("Skipped"),
42 "xfailed": self.tr("Expected Failure"),
43 "xpassed": self.tr("Unexpected Success"),
44 "passed": self.tr("Success"),
45 }
46
47 self.__config = None
48
49 def getVersions(self, interpreter):
50 """
51 Public method to get the test framework version and version information
52 of its installed plugins.
53
54 @param interpreter interpreter to be used for the test
55 @type str
56 @return dictionary containing the framework name and version and the
57 list of available plugins with name and version each
58 @rtype dict
59 """
60 proc = QProcess()
61 proc.start(interpreter, [PytestExecutor.runner, "versions"])
62 if proc.waitForFinished(3000):
63 exitCode = proc.exitCode()
64 if exitCode == 0:
65 outputLines = self.readAllOutput(proc).splitlines()
66 for line in outputLines:
67 if line.startswith("{") and line.endswith("}"):
68 with contextlib.suppress(json.JSONDecodeError):
69 return json.loads(line)
70
71 return {}
72
73 def hasCoverage(self, interpreter):
74 """
75 Public method to get the test framework version and version information
76 of its installed plugins.
77
78 @param interpreter interpreter to be used for the test
79 @type str
80 @return flag indicating the availability of coverage functionality
81 @rtype bool
82 """
83 versions = self.getVersions(interpreter)
84 if "plugins" in versions:
85 return any(plugin["name"] == "pytest-cov"
86 for plugin in versions["plugins"])
87
88 return False
89
90 def createArguments(self, config):
91 """
92 Public method to create the arguments needed to start the test process.
93
94 @param config configuration for the test execution
95 @type TestConfig
96 @return list of process arguments
97 @rtype list of str
98 """
99 #
100 # collectCoverage: --cov= + --cov-report= to suppress report generation
101 # eraseCoverage: --cov-append if eraseCoverage is False
102 # coverageFile
103 #
104 args = [
105 PytestExecutor.runner,
106 "runtest",
107 self.reader.address(),
108 str(self.reader.port()),
109 "--quiet",
110 ]
111
112 if config.failFast:
113 args.append("--exitfirst")
114
115 if config.failedOnly:
116 args.append("--last-failed")
117 else:
118 args.append("--cache-clear")
119
120 if config.collectCoverage:
121 args.extend([
122 "--cov=.",
123 "--cov-report="
124 ])
125 if not config.eraseCoverage:
126 args.append("--cov-append")
127
128 if config.testFilename:
129 if config.testName:
130 args.append("{0}::{1}".format(
131 config.testFilename,
132 config.testName.replace(".", "::")
133 ))
134 else:
135 args.append(config.testFilename)
136
137 return args
138
139 def start(self, config, pythonpath):
140 """
141 Public method to start the testing process.
142
143 @param config configuration for the test execution
144 @type TestConfig
145 @param pythonpath list of directories to be added to the Python path
146 @type list of str
147 """
148 self.reader = EricJsonReader(name="Unittest Reader", parent=self)
149 self.reader.dataReceived.connect(self.__processData)
150
151 self.__config = config
152
153 if config.discoveryStart:
154 pythonpath.insert(0, os.path.abspath(config.discoveryStart))
155 elif config.testFilename:
156 pythonpath.insert(
157 0, os.path.abspath(os.path.dirname(config.testFilename)))
158
159 if config.discover:
160 self.__rootdir = config.discoveryStart
161 elif config.testFilename:
162 self.__rootdir = os.path.dirname(config.testFilename)
163 else:
164 self.__rootdir = ""
165
166 super().start(config, pythonpath)
167
168 def finished(self):
169 """
170 Public method handling the unit test process been finished.
171
172 This method should read the results (if necessary) and emit the signal
173 testFinished.
174 """
175 if self.__config.collectCoverage:
176 self.coverageDataSaved.emit(
177 os.path.join(self.__rootdir, ".coverage"))
178
179 self.__config = None
180
181 self.reader.close()
182
183 output = self.readAllOutput()
184 self.testFinished.emit([], output)
185
186 @pyqtSlot(object)
187 def __processData(self, data):
188 """
189 Private slot to process the received data.
190
191 @param data data object received
192 @type dict
193 """
194 # test configuration
195 if data["event"] == "config":
196 self.__rootdir = data["root"]
197
198 # error collecting tests
199 elif data["event"] == "collecterror":
200 name = self.__normalizeModuleName(data["nodeid"])
201 self.collectError.emit([(name, data["report"])])
202
203 # tests collected
204 elif data["event"] == "collected":
205 self.collected.emit([
206 (data["nodeid"],
207 self.__nodeid2testname(data["nodeid"]),
208 "")
209 ])
210
211 # test started
212 elif data["event"] == "starttest":
213 self.startTest.emit(
214 (data["nodeid"],
215 self.__nodeid2testname(data["nodeid"]),
216 "")
217 )
218
219 # test result
220 elif data["event"] == "result":
221 if data["status"] in ("failed", "xpassed") or data["with_error"]:
222 category = TestResultCategory.FAIL
223 elif data["status"] in ("passed", "xfailed"):
224 category = TestResultCategory.OK
225 else:
226 category = TestResultCategory.SKIP
227
228 status = (
229 self.tr("Error")
230 if data["with_error"] else
231 self.__statusDisplayMapping[data["status"]]
232 )
233
234 message = data.get("message", "")
235 extraText = data.get("report", "")
236 reportPhase = data.get("report_phase")
237 if reportPhase in ("setup", "teardown"):
238 message = (
239 self.tr("ERROR at {0}: {1}", "phase, message")
240 .format(reportPhase, message)
241 )
242 extraText = (
243 self.tr("ERROR at {0}: {1}", "phase, extra text")
244 .format(reportPhase, extraText)
245 )
246 sections = data.get("sections", [])
247 if sections:
248 extraText += "\n"
249 for heading, text in sections:
250 extraText += "----- {0} -----\n{1}".format(heading, text)
251
252 duration = data.get("duration_s", None)
253 if duration:
254 # convert to ms
255 duration *= 1000
256
257 filename = data["filename"]
258 if self.__rootdir:
259 filename = os.path.join(self.__rootdir, filename)
260
261 self.testResult.emit(TestResult(
262 category=category,
263 status=status,
264 name=self.__nodeid2testname(data["nodeid"]),
265 id=data["nodeid"],
266 description="",
267 message=message,
268 extra=extraText.rstrip().splitlines(),
269 duration=duration,
270 filename=filename,
271 lineno=data.get("linenumber", 0) + 1,
272 # pytest reports 0-based line numbers
273 ))
274
275 # test run finished
276 elif data["event"] == "finished":
277 self.testRunFinished.emit(data["tests"], data["duration_s"])
278
279 def __normalizeModuleName(self, name):
280 r"""
281 Private method to convert a module name reported by pytest to Python
282 conventions.
283
284 This method strips the extensions '.pyw' and '.py' first and replaces
285 '/' and '\' thereafter.
286
287 @param name module name reported by pytest
288 @type str
289 @return module name iaw. Python conventions
290 @rtype str
291 """
292 return (name
293 .replace(".pyw", "")
294 .replace(".py", "")
295 .replace("/", ".")
296 .replace("\\", "."))
297
298 def __nodeid2testname(self, nodeid):
299 """
300 Private method to convert a nodeid to a test name.
301
302 @param nodeid nodeid to be converted
303 @type str
304 @return test name
305 @rtype str
306 """
307 module, name = nodeid.split("::", 1)
308 module = self.__normalizeModuleName(module)
309 name = name.replace("::", ".")
310 testname, name = "{0}.{1}".format(module, name).rsplit(".", 1)
311 return "{0} ({1})".format(name, testname)

eric ide

mercurial