--- a/eric7/Testing/Interfaces/PytestExecutor.py Fri May 20 11:31:18 2022 +0200 +++ b/eric7/Testing/Interfaces/PytestExecutor.py Mon May 23 16:48:19 2022 +0200 @@ -11,12 +11,13 @@ import json import os -from PyQt6.QtCore import QProcess +from PyQt6.QtCore import pyqtSlot, QProcess -from .TestExecutorBase import TestExecutorBase +from EricNetwork.EricJsonStreamReader import EricJsonReader + +from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory -# TODO: implement 'pytest' support in PytestExecutor class PytestExecutor(TestExecutorBase): """ Class implementing the executor for the 'pytest' framework. @@ -26,6 +27,25 @@ runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py") + def __init__(self, testWidget): + """ + Constructor + + @param testWidget reference to the unit test widget + @type TestingWidget + """ + super().__init__(testWidget) + + self.__statusDisplayMapping = { + "failed": self.tr("Failure"), + "skipped": self.tr("Skipped"), + "xfailed": self.tr("Expected Failure"), + "xpassed": self.tr("Unexpected Success"), + "passed": self.tr("Success"), + } + + self.__config = None + def getVersions(self, interpreter): """ Public method to get the test framework version and version information @@ -49,3 +69,242 @@ return json.loads(line) return {} + + def hasCoverage(self, interpreter): + """ + Public method to get the test framework version and version information + of its installed plugins. + + @param interpreter interpreter to be used for the test + @type str + @return flag indicating the availability of coverage functionality + @rtype bool + """ + versions = self.getVersions(interpreter) + if "plugins" in versions: + return any(plugin["name"] == "pytest-cov" + for plugin in versions["plugins"]) + + return False + + def createArguments(self, config): + """ + Public method to create the arguments needed to start the test process. + + @param config configuration for the test execution + @type TestConfig + @return list of process arguments + @rtype list of str + """ + # + # collectCoverage: --cov= + --cov-report= to suppress report generation + # eraseCoverage: --cov-append if eraseCoverage is False + # coverageFile + args = [ + PytestExecutor.runner, + "runtest", + self.reader.address(), + str(self.reader.port()), + "--quiet", + ] + + if config.failFast: + args.append("--exitfirst") + + if config.failedOnly: + args.append("--last-failed") + else: + args.append("--cache-clear") + + if config.collectCoverage: + args.extend([ + "--cov=.", + "--cov-report=" + ]) + if not config.eraseCoverage: + args.append("--cov-append") + + if config.testFilename: + if config.testName: + args.append("{0}::{1}".format( + config.testFilename, + config.testName.replace(".", "::") + )) + else: + args.append(config.testFilename) + + return args + + def start(self, config, pythonpath): + """ + Public method to start the testing process. + + @param config configuration for the test execution + @type TestConfig + @param pythonpath list of directories to be added to the Python path + @type list of str + """ + self.reader = EricJsonReader(name="Unittest Reader", parent=self) + self.reader.dataReceived.connect(self.__processData) + + self.__config = config + + if config.discoveryStart: + pythonpath.insert(0, os.path.abspath(config.discoveryStart)) + elif config.testFilename: + pythonpath.insert( + 0, os.path.abspath(os.path.dirname(config.testFilename))) + + if config.discover: + self.__rootdir = config.discoveryStart + elif config.testFilename: + self.__rootdir = os.path.dirname(config.testFilename) + else: + self.__rootdir = "" + + super().start(config, pythonpath) + + def finished(self): + """ + Public method handling the unit test process been finished. + + This method should read the results (if necessary) and emit the signal + testFinished. + """ + if self.__config.collectCoverage: + self.coverageDataSaved.emit( + os.path.join(self.__rootdir, ".coverage")) + + self.__config = None + + self.reader.close() + + output = self.readAllOutput() + self.testFinished.emit([], output) + + @pyqtSlot(object) + def __processData(self, data): + """ + Private slot to process the received data. + + @param data data object received + @type dict + """ + # test configuration + if data["event"] == "config": + self.__rootdir = data["root"] + + # error collecting tests + elif data["event"] == "collecterror": + name = self.__normalizeModuleName(data["nodeid"]) + self.collectError.emit([(name, data["report"])]) + + # tests collected + elif data["event"] == "collected": + self.collected.emit([ + (data["nodeid"], + self.__nodeid2testname(data["nodeid"]), + "") + ]) + + # test started + elif data["event"] == "starttest": + self.startTest.emit( + (data["nodeid"], + self.__nodeid2testname(data["nodeid"]), + "") + ) + + # test result + elif data["event"] == "result": + if data["status"] in ("failed", "xpassed") or data["with_error"]: + category = TestResultCategory.FAIL + elif data["status"] in ("passed", "xfailed"): + category = TestResultCategory.OK + else: + category = TestResultCategory.SKIP + + status = ( + self.tr("Error") + if data["with_error"] else + self.__statusDisplayMapping[data["status"]] + ) + + message = data.get("message", "") + extraText = data.get("report", "") + reportPhase = data.get("report_phase") + if reportPhase in ("setup", "teardown"): + message = ( + self.tr("ERROR at {0}: {1}", "phase, message") + .format(reportPhase, message) + ) + extraText = ( + self.tr("ERROR at {0}: {1}", "phase, extra text") + .format(reportPhase, extraText) + ) + sections = data.get("sections", []) + if sections: + extraText += "\n" + for heading, text in sections: + extraText += "----- {0} -----\n{1}".format(heading, text) + + duration = data.get("duration_s", None) + if duration: + # convert to ms + duration *= 1000 + + filename = data["filename"] + if self.__rootdir: + filename = os.path.join(self.__rootdir, filename) + + self.testResult.emit(TestResult( + category=category, + status=status, + name=self.__nodeid2testname(data["nodeid"]), + id=data["nodeid"], + description="", + message=message, + extra=extraText.rstrip().splitlines(), + duration=duration, + filename=filename, + lineno=data.get("linenumber", 0) + 1, + # pytest reports 0-based line numbers + )) + + # test run finished + elif data["event"] == "finished": + self.testRunFinished.emit(data["tests"], data["duration_s"]) + + def __normalizeModuleName(self, name): + r""" + Private method to convert a module name reported by pytest to Python + conventions. + + This method strips the extensions '.pyw' and '.py' first and replaces + '/' and '\' thereafter. + + @param name module name reported by pytest + @type str + @return module name iaw. Python conventions + @rtype str + """ + return (name + .replace(".pyw", "") + .replace(".py", "") + .replace("/", ".") + .replace("\\", ".")) + + def __nodeid2testname(self, nodeid): + """ + Private method to convert a nodeid to a test name. + + @param nodeid nodeid to be converted + @type str + @return test name + @rtype str + """ + module, name = nodeid.split("::", 1) + module = self.__normalizeModuleName(module) + name = name.replace("::", ".") + testname, name = "{0}.{1}".format(module, name).rsplit(".", 1) + return "{0} ({1})".format(name, testname)