Wed, 13 Jul 2022 14:55:47 +0200
Reformatted the source code using the 'Black' utility.
# -*- coding: utf-8 -*- # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the executor for the 'pytest' framework. """ import contextlib import json import os from PyQt6.QtCore import pyqtSlot, QProcess from EricNetwork.EricJsonStreamReader import EricJsonReader from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory class PytestExecutor(TestExecutorBase): """ Class implementing the executor for the 'pytest' framework. """ module = "pytest" name = "pytest" runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py") def __init__(self, testWidget): """ Constructor @param testWidget reference to the unit test widget @type TestingWidget """ super().__init__(testWidget) self.__statusDisplayMapping = { "failed": self.tr("Failure"), "skipped": self.tr("Skipped"), "xfailed": self.tr("Expected Failure"), "xpassed": self.tr("Unexpected Success"), "passed": self.tr("Success"), } self.__config = None def getVersions(self, interpreter): """ Public method to get the test framework version and version information of its installed plugins. @param interpreter interpreter to be used for the test @type str @return dictionary containing the framework name and version and the list of available plugins with name and version each @rtype dict """ proc = QProcess() proc.start(interpreter, [PytestExecutor.runner, "versions"]) if proc.waitForFinished(3000): exitCode = proc.exitCode() if exitCode == 0: outputLines = self.readAllOutput(proc).splitlines() for line in outputLines: if line.startswith("{") and line.endswith("}"): with contextlib.suppress(json.JSONDecodeError): return json.loads(line) return {} def hasCoverage(self, interpreter): """ Public method to get the test framework version and version information of its installed plugins. @param interpreter interpreter to be used for the test @type str @return flag indicating the availability of coverage functionality @rtype bool """ versions = self.getVersions(interpreter) if "plugins" in versions: return any(plugin["name"] == "pytest-cov" for plugin in versions["plugins"]) return False def createArguments(self, config): """ Public method to create the arguments needed to start the test process. @param config configuration for the test execution @type TestConfig @return list of process arguments @rtype list of str """ # # collectCoverage: --cov= + --cov-report= to suppress report generation # eraseCoverage: --cov-append if eraseCoverage is False # coverageFile # args = [ PytestExecutor.runner, "runtest", self.reader.address(), str(self.reader.port()), "--quiet", ] if config.failFast: args.append("--exitfirst") if config.failedOnly: args.append("--last-failed") else: args.append("--cache-clear") if config.collectCoverage: args.extend(["--cov=.", "--cov-report="]) if not config.eraseCoverage: args.append("--cov-append") if config.testFilename: if config.testName: args.append( "{0}::{1}".format( config.testFilename, config.testName.replace(".", "::") ) ) else: args.append(config.testFilename) return args def start(self, config, pythonpath): """ Public method to start the testing process. @param config configuration for the test execution @type TestConfig @param pythonpath list of directories to be added to the Python path @type list of str """ self.reader = EricJsonReader(name="Unittest Reader", parent=self) self.reader.dataReceived.connect(self.__processData) self.__config = config if config.discoveryStart: pythonpath.insert(0, os.path.abspath(config.discoveryStart)) elif config.testFilename: pythonpath.insert(0, os.path.abspath(os.path.dirname(config.testFilename))) if config.discover: self.__rootdir = config.discoveryStart elif config.testFilename: self.__rootdir = os.path.dirname(config.testFilename) else: self.__rootdir = "" super().start(config, pythonpath) def finished(self): """ Public method handling the unit test process been finished. This method should read the results (if necessary) and emit the signal testFinished. """ if self.__config.collectCoverage: self.coverageDataSaved.emit(os.path.join(self.__rootdir, ".coverage")) self.__config = None self.reader.close() output = self.readAllOutput() self.testFinished.emit([], output) @pyqtSlot(object) def __processData(self, data): """ Private slot to process the received data. @param data data object received @type dict """ # test configuration if data["event"] == "config": self.__rootdir = data["root"] # error collecting tests elif data["event"] == "collecterror": name = self.__normalizeModuleName(data["nodeid"]) self.collectError.emit([(name, data["report"])]) # tests collected elif data["event"] == "collected": self.collected.emit( [(data["nodeid"], self.__nodeid2testname(data["nodeid"]), "")] ) # test started elif data["event"] == "starttest": self.startTest.emit( (data["nodeid"], self.__nodeid2testname(data["nodeid"]), "") ) # test result elif data["event"] == "result": if data["status"] in ("failed", "xpassed") or data["with_error"]: category = TestResultCategory.FAIL elif data["status"] in ("passed", "xfailed"): category = TestResultCategory.OK else: category = TestResultCategory.SKIP status = ( self.tr("Error") if data["with_error"] else self.__statusDisplayMapping[data["status"]] ) message = data.get("message", "") extraText = data.get("report", "") reportPhase = data.get("report_phase") if reportPhase in ("setup", "teardown"): message = self.tr("ERROR at {0}: {1}", "phase, message").format( reportPhase, message ) extraText = self.tr("ERROR at {0}: {1}", "phase, extra text").format( reportPhase, extraText ) sections = data.get("sections", []) if sections: extraText += "\n" for heading, text in sections: extraText += "----- {0} -----\n{1}".format(heading, text) duration = data.get("duration_s", None) if duration: # convert to ms duration *= 1000 filename = data["filename"] if self.__rootdir: filename = os.path.join(self.__rootdir, filename) self.testResult.emit( TestResult( category=category, status=status, name=self.__nodeid2testname(data["nodeid"]), id=data["nodeid"], description="", message=message, extra=extraText.rstrip().splitlines(), duration=duration, filename=filename, lineno=data.get("linenumber", 0) + 1, # pytest reports 0-based line numbers ) ) # test run finished elif data["event"] == "finished": self.testRunFinished.emit(data["tests"], data["duration_s"]) def __normalizeModuleName(self, name): r""" Private method to convert a module name reported by pytest to Python conventions. This method strips the extensions '.pyw' and '.py' first and replaces '/' and '\' thereafter. @param name module name reported by pytest @type str @return module name iaw. Python conventions @rtype str """ return ( name.replace(".pyw", "") .replace(".py", "") .replace("/", ".") .replace("\\", ".") ) def __nodeid2testname(self, nodeid): """ Private method to convert a nodeid to a test name. @param nodeid nodeid to be converted @type str @return test name @rtype str """ module, name = nodeid.split("::", 1) module = self.__normalizeModuleName(module) name = name.replace("::", ".") testname, name = "{0}.{1}".format(module, name).rsplit(".", 1) return "{0} ({1})".format(name, testname)