diff -r d23e9854aea4 -r 18a7312cfdb3 src/eric7/Testing/Interfaces/UnittestExecutor.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/eric7/Testing/Interfaces/UnittestExecutor.py Sun Jul 24 11:29:56 2022 +0200 @@ -0,0 +1,232 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the executor for the standard 'unittest' framework. +""" + +import contextlib +import json +import os +import re + +from PyQt6.QtCore import pyqtSlot, QProcess + +from EricNetwork.EricJsonStreamReader import EricJsonReader + +from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory + + +class UnittestExecutor(TestExecutorBase): + """ + Class implementing the executor for the standard 'unittest' framework. + """ + + module = "unittest" + name = "unittest" + + runner = os.path.join(os.path.dirname(__file__), "UnittestRunner.py") + + def __init__(self, testWidget): + """ + Constructor + + @param testWidget reference to the unit test widget + @type TestingWidget + """ + super().__init__(testWidget) + + self.__statusCategoryMapping = { + "failure": TestResultCategory.FAIL, + "error": TestResultCategory.FAIL, + "skipped": TestResultCategory.SKIP, + "expected failure": TestResultCategory.OK, + "unexpected success": TestResultCategory.FAIL, + "success": TestResultCategory.OK, + } + + self.__statusDisplayMapping = { + "failure": self.tr("Failure"), + "error": self.tr("Error"), + "skipped": self.tr("Skipped"), + "expected failure": self.tr("Expected Failure"), + "unexpected success": self.tr("Unexpected Success"), + "success": self.tr("Success"), + } + + self.__testWidget = testWidget + + def getVersions(self, interpreter): + """ + Public method to get the test framework version and version information + of its installed plugins. + + @param interpreter interpreter to be used for the test + @type str + @return dictionary containing the framework name and version and the + list of available plugins with name and version each + @rtype dict + """ + proc = QProcess() + proc.start(interpreter, [UnittestExecutor.runner, "versions"]) + if proc.waitForFinished(3000): + exitCode = proc.exitCode() + if exitCode == 0: + versionsStr = self.readAllOutput(proc) + with contextlib.suppress(json.JSONDecodeError): + return json.loads(versionsStr) + + return {} + + def hasCoverage(self, interpreter): + """ + Public method to get the test framework version and version information + of its installed plugins. + + @param interpreter interpreter to be used for the test + @type str + @return flag indicating the availability of coverage functionality + @rtype bool + """ + return True + + def createArguments(self, config): + """ + Public method to create the arguments needed to start the test process. + + @param config configuration for the test execution + @type TestConfig + @return list of process arguments + @rtype list of str + """ + args = [ + UnittestExecutor.runner, + "runtest", + self.reader.address(), + str(self.reader.port()), + ] + + if config.discover: + args.extend( + [ + "discover", + "--start-directory", + config.discoveryStart, + ] + ) + + if config.failFast: + args.append("--failfast") + + if config.collectCoverage: + args.append("--cover") + if config.eraseCoverage: + args.append("--cover-erase") + if config.coverageFile: + args.append("--cover-file") + args.append(config.coverageFile) + + if config.failedOnly: + args.append("--failed-only") + if config.testFilename: + args.append(config.testFilename) + args.extend(self.__testWidget.getFailedTests()) + elif config.testFilename: + args.append(config.testFilename) + args.append(config.testName if config.testName else "suite") + + return args + + def start(self, config, pythonpath): + """ + Public method to start the testing process. + + @param config configuration for the test execution + @type TestConfig + @param pythonpath list of directories to be added to the Python path + @type list of str + """ + self.reader = EricJsonReader(name="Unittest Reader", parent=self) + self.reader.dataReceived.connect(self.__processData) + + super().start(config, pythonpath) + + def finished(self): + """ + Public method handling the unit test process been finished. + + This method should read the results (if necessary) and emit the signal + testFinished. + """ + self.reader.close() + + output = self.readAllOutput() + self.testFinished.emit([], output) + + @pyqtSlot(object) + def __processData(self, data): + """ + Private slot to process the received data. + + @param data data object received + @type dict + """ + # error collecting tests + if data["event"] == "collecterror": + self.collectError.emit([("", data["error"])]) + + # tests collected + elif data["event"] == "collected": + self.collected.emit( + [(t["id"], t["name"], t["description"]) for t in data["tests"]] + ) + + # test started + elif data["event"] == "started": + self.startTest.emit((data["id"], data["name"], data["description"])) + + # test result + elif data["event"] == "result": + filename, lineno = None, None + tracebackLines = data.get("traceback", "").splitlines() + if tracebackLines: + # find the last entry matching the pattern + for index in range(len(tracebackLines) - 1, -1, -1): + fmatch = re.search( + r'File "(.*?)", line (\d*?),.*', tracebackLines[index] + ) + if fmatch: + break + if fmatch: + filename = fmatch.group(1) + lineno = int(fmatch.group(2)) + + message = data.get("shortmsg", "") + if not message and tracebackLines: + message = tracebackLines[-1].split(":", 1)[1].strip() + + self.testResult.emit( + TestResult( + category=self.__statusCategoryMapping[data["status"]], + status=self.__statusDisplayMapping[data["status"]], + name=data["name"], + id=data["id"], + description=data["description"], + message=message, + extra=tracebackLines, + duration=data.get("duration_ms", None), + filename=filename, + lineno=lineno, + subtestResult=data.get("subtest", False), + ) + ) + + # test run finished + elif data["event"] == "finished": + self.testRunFinished.emit(data["tests"], data["duration_s"]) + + # coverage data + elif data["event"] == "coverage": + self.coverageDataSaved.emit(data["file"])