Mon, 23 May 2022 16:48:19 +0200
Implemented support for the 'pytest' framework.
--- a/eric7/Testing/Interfaces/PytestExecutor.py Fri May 20 11:31:18 2022 +0200 +++ b/eric7/Testing/Interfaces/PytestExecutor.py Mon May 23 16:48:19 2022 +0200 @@ -11,12 +11,13 @@ import json import os -from PyQt6.QtCore import QProcess +from PyQt6.QtCore import pyqtSlot, QProcess -from .TestExecutorBase import TestExecutorBase +from EricNetwork.EricJsonStreamReader import EricJsonReader + +from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory -# TODO: implement 'pytest' support in PytestExecutor class PytestExecutor(TestExecutorBase): """ Class implementing the executor for the 'pytest' framework. @@ -26,6 +27,25 @@ runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py") + def __init__(self, testWidget): + """ + Constructor + + @param testWidget reference to the unit test widget + @type TestingWidget + """ + super().__init__(testWidget) + + self.__statusDisplayMapping = { + "failed": self.tr("Failure"), + "skipped": self.tr("Skipped"), + "xfailed": self.tr("Expected Failure"), + "xpassed": self.tr("Unexpected Success"), + "passed": self.tr("Success"), + } + + self.__config = None + def getVersions(self, interpreter): """ Public method to get the test framework version and version information @@ -49,3 +69,242 @@ return json.loads(line) return {} + + def hasCoverage(self, interpreter): + """ + Public method to get the test framework version and version information + of its installed plugins. + + @param interpreter interpreter to be used for the test + @type str + @return flag indicating the availability of coverage functionality + @rtype bool + """ + versions = self.getVersions(interpreter) + if "plugins" in versions: + return any(plugin["name"] == "pytest-cov" + for plugin in versions["plugins"]) + + return False + + def createArguments(self, config): + """ + Public method to create the arguments needed to start the test process. + + @param config configuration for the test execution + @type TestConfig + @return list of process arguments + @rtype list of str + """ + # + # collectCoverage: --cov= + --cov-report= to suppress report generation + # eraseCoverage: --cov-append if eraseCoverage is False + # coverageFile + args = [ + PytestExecutor.runner, + "runtest", + self.reader.address(), + str(self.reader.port()), + "--quiet", + ] + + if config.failFast: + args.append("--exitfirst") + + if config.failedOnly: + args.append("--last-failed") + else: + args.append("--cache-clear") + + if config.collectCoverage: + args.extend([ + "--cov=.", + "--cov-report=" + ]) + if not config.eraseCoverage: + args.append("--cov-append") + + if config.testFilename: + if config.testName: + args.append("{0}::{1}".format( + config.testFilename, + config.testName.replace(".", "::") + )) + else: + args.append(config.testFilename) + + return args + + def start(self, config, pythonpath): + """ + Public method to start the testing process. + + @param config configuration for the test execution + @type TestConfig + @param pythonpath list of directories to be added to the Python path + @type list of str + """ + self.reader = EricJsonReader(name="Unittest Reader", parent=self) + self.reader.dataReceived.connect(self.__processData) + + self.__config = config + + if config.discoveryStart: + pythonpath.insert(0, os.path.abspath(config.discoveryStart)) + elif config.testFilename: + pythonpath.insert( + 0, os.path.abspath(os.path.dirname(config.testFilename))) + + if config.discover: + self.__rootdir = config.discoveryStart + elif config.testFilename: + self.__rootdir = os.path.dirname(config.testFilename) + else: + self.__rootdir = "" + + super().start(config, pythonpath) + + def finished(self): + """ + Public method handling the unit test process been finished. + + This method should read the results (if necessary) and emit the signal + testFinished. + """ + if self.__config.collectCoverage: + self.coverageDataSaved.emit( + os.path.join(self.__rootdir, ".coverage")) + + self.__config = None + + self.reader.close() + + output = self.readAllOutput() + self.testFinished.emit([], output) + + @pyqtSlot(object) + def __processData(self, data): + """ + Private slot to process the received data. + + @param data data object received + @type dict + """ + # test configuration + if data["event"] == "config": + self.__rootdir = data["root"] + + # error collecting tests + elif data["event"] == "collecterror": + name = self.__normalizeModuleName(data["nodeid"]) + self.collectError.emit([(name, data["report"])]) + + # tests collected + elif data["event"] == "collected": + self.collected.emit([ + (data["nodeid"], + self.__nodeid2testname(data["nodeid"]), + "") + ]) + + # test started + elif data["event"] == "starttest": + self.startTest.emit( + (data["nodeid"], + self.__nodeid2testname(data["nodeid"]), + "") + ) + + # test result + elif data["event"] == "result": + if data["status"] in ("failed", "xpassed") or data["with_error"]: + category = TestResultCategory.FAIL + elif data["status"] in ("passed", "xfailed"): + category = TestResultCategory.OK + else: + category = TestResultCategory.SKIP + + status = ( + self.tr("Error") + if data["with_error"] else + self.__statusDisplayMapping[data["status"]] + ) + + message = data.get("message", "") + extraText = data.get("report", "") + reportPhase = data.get("report_phase") + if reportPhase in ("setup", "teardown"): + message = ( + self.tr("ERROR at {0}: {1}", "phase, message") + .format(reportPhase, message) + ) + extraText = ( + self.tr("ERROR at {0}: {1}", "phase, extra text") + .format(reportPhase, extraText) + ) + sections = data.get("sections", []) + if sections: + extraText += "\n" + for heading, text in sections: + extraText += "----- {0} -----\n{1}".format(heading, text) + + duration = data.get("duration_s", None) + if duration: + # convert to ms + duration *= 1000 + + filename = data["filename"] + if self.__rootdir: + filename = os.path.join(self.__rootdir, filename) + + self.testResult.emit(TestResult( + category=category, + status=status, + name=self.__nodeid2testname(data["nodeid"]), + id=data["nodeid"], + description="", + message=message, + extra=extraText.rstrip().splitlines(), + duration=duration, + filename=filename, + lineno=data.get("linenumber", 0) + 1, + # pytest reports 0-based line numbers + )) + + # test run finished + elif data["event"] == "finished": + self.testRunFinished.emit(data["tests"], data["duration_s"]) + + def __normalizeModuleName(self, name): + r""" + Private method to convert a module name reported by pytest to Python + conventions. + + This method strips the extensions '.pyw' and '.py' first and replaces + '/' and '\' thereafter. + + @param name module name reported by pytest + @type str + @return module name iaw. Python conventions + @rtype str + """ + return (name + .replace(".pyw", "") + .replace(".py", "") + .replace("/", ".") + .replace("\\", ".")) + + def __nodeid2testname(self, nodeid): + """ + Private method to convert a nodeid to a test name. + + @param nodeid nodeid to be converted + @type str + @return test name + @rtype str + """ + module, name = nodeid.split("::", 1) + module = self.__normalizeModuleName(module) + name = name.replace("::", ".") + testname, name = "{0}.{1}".format(module, name).rsplit(".", 1) + return "{0} ({1})".format(name, testname)
--- a/eric7/Testing/Interfaces/PytestRunner.py Fri May 20 11:31:18 2022 +0200 +++ b/eric7/Testing/Interfaces/PytestRunner.py Mon May 23 16:48:19 2022 +0200 @@ -8,9 +8,14 @@ """ import json +import os import sys +import time -# TODO: implement 'pytest' support in PytestRunner +sys.path.insert( + 2, + os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +) class GetPluginVersionsPlugin(): @@ -51,13 +56,208 @@ return self.versions +class EricPlugin(): + """ + Class implementing a pytest plugin which reports the data in a format + suitable for the PytestExecutor. + """ + def __init__(self, writer): + """ + Constructor + + @param writer reference to the object to write the results to + @type EricJsonWriter + """ + self.__writer = writer + + self.__testsRun = 0 + + def __initializeReportData(self): + """ + Private method to initialize attributes for data collection. + """ + self.__status = '---' + self.__duration = 0 + self.__report = [] + self.__reportPhase = "" + self.__sections = [] + self.__hadError = False + self.__wasSkipped = False + self.__wasXfail = False + + def pytest_report_header(self, config, startdir): + """ + Public method called by pytest before any reporting. + + @param config reference to the configuration object + @type Config + @param startdir starting directory + @type LocalPath + """ + self.__writer.write({ + "event": "config", + "root": str(config.rootdir) + }) + + def pytest_collectreport(self, report): + """ + Public method called by pytest after the tests have been collected. + + @param report reference to the report object + @type CollectReport + """ + if report.outcome == "failed": + self.__writer.write({ + "event": "collecterror", + "nodeid": report.nodeid, + "report": str(report.longrepr), + }) + + def pytest_itemcollected(self, item): + """ + Public malled by pytest after a test item has been collected. + + @param item reference to the collected test item + @type Item + """ + self.__writer.write({ + "event": "collected", + "nodeid": item.nodeid, + "name": item.name, + }) + + def pytest_runtest_logstart(self, nodeid, location): + """ + Public method called by pytest before running a test. + + @param nodeid node id of the test item + @type str + @param location tuple containing the file name, the line number and + the test name + @type tuple of (str, int, str) + """ + self.__testsRun += 1 + + self.__writer.write({ + "event": "starttest", + "nodeid": nodeid, + }) + + self.__initializeReportData() + + def pytest_runtest_logreport(self, report): + """ + Public method called by pytest when a test phase (setup, call and + teardown) has been completed. + + @param report reference to the test report object + @type TestReport + """ + if report.when == "call": + self.__status = report.outcome + self.__duration = report.duration + else: + if report.outcome == "failed": + self.__hadError = True + elif report.outcome == "skipped": + self.__wasSkipped = True + + if hasattr(report, "wasxfail"): + self.__wasXfail = True + self.__report.append(report.wasxfail) + self.__reportPhase = report.when + + self.__sections = report.sections + + if report.longrepr: + self.__reportPhase = report.when + if ( + hasattr(report.longrepr, "reprcrash") and + report.longrepr.reprcrash is not None + ): + self.__report.append( + report.longrepr.reprcrash.message) + if isinstance(report.longrepr, tuple): + self.__report.append(report.longrepr[2]) + elif isinstance(report.longrepr, str): + self.__report.append(report.longrepr) + else: + self.__report.append(str(report.longrepr)) + + def pytest_runtest_logfinish(self, nodeid, location): + """ + Public method called by pytest after a test has been completed. + + @param nodeid node id of the test item + @type str + @param location tuple containing the file name, the line number and + the test name + @type tuple of (str, int, str) + """ + if self.__wasXfail: + self.__status = ( + "xpassed" + if self.__status == "passed" else + "xfailed" + ) + elif self.__wasSkipped: + self.__status = "skipped" + + data = { + "event": "result", + "status": self.__status, + "with_error": self.__hadError, + "sections": self.__sections, + "duration_s": self.__duration, + "nodeid": nodeid, + "filename": location[0], + "linenumber": location[1], + "report_phase": self.__reportPhase, + } + if self.__report: + messageLines = self.__report[0].rstrip().splitlines() + data["message"] = messageLines[0] + data["report"] = "\n".join(self.__report) + + self.__writer.write(data) + + def pytest_sessionstart(self, session): + """ + Public method called by pytest before performing collection and + entering the run test loop. + + @param session reference to the session object + @type Session + """ + self.__totalStartTime = time.monotonic_ns() + self.__testsRun = 0 + + def pytest_sessionfinish(self, session, exitstatus): + """ + Public method called by pytest after the whole test run finished. + + @param session reference to the session object + @type Session + @param exitstatus exit status + @type int or ExitCode + """ + stopTime = time.monotonic_ns() + duration = (stopTime - self.__totalStartTime) / 1_000_000_000 # s + + self.__writer.write({ + "event": "finished", + "duration_s": duration, + "tests": self.__testsRun, + }) + + def getVersions(): """ Function to determine the framework version and versions of all available plugins. """ try: - import pytest # __IGNORE_WARNING__ + import pytest versions = { "name": "pytest", "version": pytest.__version__, @@ -88,6 +288,14 @@ elif command == "versions": getVersions() + elif command == "runtest": + import pytest + from EricNetwork.EricJsonStreamWriter import EricJsonWriter + writer = EricJsonWriter(sys.argv[2], int(sys.argv[3])) + pytest.main(sys.argv[4:], plugins=[EricPlugin(writer)]) + writer.close() + sys.exit(0) + sys.exit(42) #
--- a/eric7/Testing/Interfaces/TestExecutorBase.py Fri May 20 11:31:18 2022 +0200 +++ b/eric7/Testing/Interfaces/TestExecutorBase.py Mon May 23 16:48:19 2022 +0200 @@ -150,6 +150,22 @@ return {} + def hasCoverage(self, interpreter): + """ + Public method to get the test framework version and version information + of its installed plugins. + + @param interpreter interpreter to be used for the test + @type str + @return flag indicating the availability of coverage functionality + @rtype bool + @exception NotImplementedError this method needs to be implemented by + derived classes + """ + raise NotImplementedError + + return False + def createArguments(self, config): """ Public method to create the arguments needed to start the test process.
--- a/eric7/Testing/Interfaces/UnittestExecutor.py Fri May 20 11:31:18 2022 +0200 +++ b/eric7/Testing/Interfaces/UnittestExecutor.py Mon May 23 16:48:19 2022 +0200 @@ -79,6 +79,18 @@ return {} + def hasCoverage(self, interpreter): + """ + Public method to get the test framework version and version information + of its installed plugins. + + @param interpreter interpreter to be used for the test + @type str + @return flag indicating the availability of coverage functionality + @rtype bool + """ + return True + def createArguments(self, config): """ Public method to create the arguments needed to start the test process. @@ -118,10 +130,9 @@ if config.testFilename: args.append(config.testFilename) args.extend(self.__testWidget.getFailedTests()) - - elif config.testFilename and config.testName: + elif config.testFilename: args.append(config.testFilename) - args.append(config.testName) + args.append(config.testName if config.testName else "suite") return args @@ -178,10 +189,8 @@ # test result elif data["event"] == "result": filename, lineno = None, None - tracebackLines = [] - if "traceback" in data: - # get the error info - tracebackLines = data["traceback"].splitlines() + tracebackLines = data.get("traceback", "").splitlines() + if tracebackLines: # find the last entry matching the pattern for index in range(len(tracebackLines) - 1, -1, -1): fmatch = re.search(r'File "(.*?)", line (\d*?),.*', @@ -192,12 +201,9 @@ filename = fmatch.group(1) lineno = int(fmatch.group(2)) - if "shortmsg" in data: - message = data["shortmsg"] - elif tracebackLines: + message = data.get("shortmsg", "") + if not message and tracebackLines: message = tracebackLines[-1].split(":", 1)[1].strip() - else: - message = "" self.testResult.emit(TestResult( category=self.__statusCategoryMapping[data["status"]], @@ -207,12 +213,10 @@ description=data["description"], message=message, extra=tracebackLines, - duration=( - data["duration_ms"] if "duration_ms" in data else None - ), + duration=data.get("duration_ms", None), filename=filename, lineno=lineno, - subtestResult=data["subtest"] if "subtest" in data else False + subtestResult=data.get("subtest", False) )) # test run finished
--- a/eric7/Testing/Interfaces/UnittestRunner.py Fri May 20 11:31:18 2022 +0200 +++ b/eric7/Testing/Interfaces/UnittestRunner.py Mon May 23 16:48:19 2022 +0200 @@ -13,7 +13,6 @@ import time import unittest - sys.path.insert( 2, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
--- a/eric7/Testing/Interfaces/__init__.py Fri May 20 11:31:18 2022 +0200 +++ b/eric7/Testing/Interfaces/__init__.py Mon May 23 16:48:19 2022 +0200 @@ -7,21 +7,21 @@ Package containg the various test framework interfaces. """ -#from .PytestExecutor import PytestExecutor +from .PytestExecutor import PytestExecutor from .UnittestExecutor import UnittestExecutor Frameworks = ( UnittestExecutor, -# PytestExecutor, + PytestExecutor, ) FrameworkNames = { "MicroPython": ( UnittestExecutor.name, -# PytestExecutor.name, + PytestExecutor.name, ), "Python3": ( UnittestExecutor.name, -# PytestExecutor.name, + PytestExecutor.name, ), }
--- a/eric7/Testing/TestingWidget.py Fri May 20 11:31:18 2022 +0200 +++ b/eric7/Testing/TestingWidget.py Mon May 23 16:48:19 2022 +0200 @@ -158,6 +158,7 @@ self.__populateVenvComboBox) self.__venvManager.virtualEnvironmentChanged.connect( self.__populateVenvComboBox) + ericApp().registerObject("VirtualEnvManager", self.__venvManager) self.__project = None @@ -175,8 +176,6 @@ self.__testExecutor = None # connect some signals - self.frameworkComboBox.currentIndexChanged.connect( - self.__resetResults) self.discoveryPicker.editTextChanged.connect( self.__resetResults) self.testsuitePicker.editTextChanged.connect( @@ -330,6 +329,9 @@ self.discoverCheckBox.setChecked(forProject or not bool(testFile)) + if forProject: + self.__projectOpened() + self.tabWidget.setCurrentIndex(0) @pyqtSlot(str) @@ -495,11 +497,11 @@ self.__showCoverageButton.setEnabled( self.__mode == TestingWidgetModes.STOPPED and bool(self.__coverageFile) and - ( - (self.discoverCheckBox.isChecked() and - bool(self.discoveryPicker.currentText())) or - bool(self.testsuitePicker.currentText()) - ) + ( + (self.discoverCheckBox.isChecked() and + bool(self.discoveryPicker.currentText())) or + bool(self.testsuitePicker.currentText()) + ) ) # Close button @@ -644,6 +646,41 @@ self.__updateButtonBoxButtons() self.versionsButton.setEnabled(bool(self.venvComboBox.currentText())) + + self.__updateCoverage() + + @pyqtSlot(int) + def on_frameworkComboBox_currentIndexChanged(self, index): + """ + Private slot handling the selection of a test framework. + + @param index index of the selected framework + @type int + """ + self.__resetResults() + self.__updateCoverage() + + @pyqtSlot() + def __updateCoverage(self): + """ + Private slot to update the state of the coverage checkbox depending on + the selected framework's capabilities. + """ + hasCoverage = False + + venvName = self.venvComboBox.currentText() + if venvName: + framework = self.frameworkComboBox.currentText() + if framework: + interpreter = self.__venvManager.getVirtualenvInterpreter( + venvName) + executor = self.__frameworkRegistry.createExecutor( + framework, self) + hasCoverage = executor.hasCoverage(interpreter) + + self.coverageCheckBox.setEnabled(hasCoverage) + if not hasCoverage: + self.coverageCheckBox.setChecked(False) @pyqtSlot() def on_versionsButton_clicked(self): @@ -723,8 +760,6 @@ testName = self.testComboBox.currentText() if testName: self.__insertTestName(testName) - if testFileName and not testName: - testName = "suite" self.sbLabel.setText(self.tr("Preparing Testsuite")) QCoreApplication.processEvents() @@ -781,9 +816,9 @@ """ Private slot handling the 'collected' signal of the executor. - @param testNames list of tuples containing the test id and test name - of collected tests - @type list of tuple of (str, str) + @param testNames list of tuples containing the test id, the test name + and a description of collected tests + @type list of tuple of (str, str, str) """ testResults = [ TestResult( @@ -794,9 +829,9 @@ message=desc, ) for id, name, desc in testNames ] - self.__resultsModel.setTestResults(testResults) + self.__resultsModel.addTestResults(testResults) - self.__totalCount = len(testResults) + self.__totalCount += len(testResults) self.__updateProgress() @pyqtSlot(list) @@ -879,6 +914,8 @@ """ self.__setStoppedMode() self.__testExecutor = None + + self.__adjustPendingState() @pyqtSlot(int, float) def __testRunFinished(self, noTests, duration): @@ -915,6 +952,21 @@ """ self.__resultsModel.clear() + def __adjustPendingState(self): + """ + Private method to change the status indicator of all still pending + tests to "not run". + """ + newResults = [] + for result in self.__resultsModel.getTestResults(): + if result.category == TestResultCategory.PENDING: + result.category = TestResultCategory.SKIP + result.status = self.tr("not run") + newResults.append(result) + + if newResults: + self.__resultsModel.updateTestResults(newResults) + @pyqtSlot(str) def __coverageData(self, coverageFile): """ @@ -936,10 +988,11 @@ self.__coverageDialog = PyCoverageDialog(self) self.__coverageDialog.openFile.connect(self.__openEditor) - if self.discoverCheckBox.isChecked(): - testDir = self.discoveryPicker.currentText() - else: - testDir = os.path.dirname(self.testsuitePicker.currentText()) + testDir = ( + self.discoveryPicker.currentText() + if self.discoverCheckBox.isChecked() else + os.path.dirname(self.testsuitePicker.currentText()) + ) if testDir: self.__coverageDialog.show() self.__coverageDialog.start(self.__coverageFile, testDir)