Tue, 17 May 2022 17:23:07 +0200
Implemented the "Show Coverage" functionality and corrected the coverage related code in UnittestRunner.
# -*- coding: utf-8 -*- # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the executor for the standard 'unittest' framework. """ import contextlib import json import os import re from PyQt6.QtCore import pyqtSlot, QProcess from EricNetwork.EricJsonStreamReader import EricJsonReader from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory class UnittestExecutor(TestExecutorBase): """ Class implementing the executor for the standard 'unittest' framework. """ module = "unittest" name = "unittest" runner = os.path.join(os.path.dirname(__file__), "UnittestRunner.py") def __init__(self, testWidget): """ Constructor @param testWidget reference to the unit test widget @type TestingWidget """ super().__init__(testWidget) self.__statusCategoryMapping = { "failure": TestResultCategory.FAIL, "error": TestResultCategory.FAIL, "skipped": TestResultCategory.SKIP, "expected failure": TestResultCategory.OK, "unexpected success": TestResultCategory.FAIL, "success": TestResultCategory.OK, } self.__statusDisplayMapping = { "failure": self.tr("Failure"), "error": self.tr("Error"), "skipped": self.tr("Skipped"), "expected failure": self.tr("Expected Failure"), "unexpected success": self.tr("Unexpected Success"), "success": self.tr("Success"), } self.__testWidget = testWidget def getVersions(self, interpreter): """ Public method to get the test framework version and version information of its installed plugins. @param interpreter interpreter to be used for the test @type str @return dictionary containing the framework name and version and the list of available plugins with name and version each @rtype dict """ proc = QProcess() proc.start(interpreter, [UnittestExecutor.runner, "versions"]) if proc.waitForFinished(3000): exitCode = proc.exitCode() if exitCode == 0: versionsStr = self.readAllOutput(proc) with contextlib.suppress(json.JSONDecodeError): return json.loads(versionsStr) return {} def createArguments(self, config): """ Public method to create the arguments needed to start the test process. @param config configuration for the test execution @type TestConfig @return list of process arguments @rtype list of str """ args = [ UnittestExecutor.runner, "runtest", self.reader.address(), str(self.reader.port()), ] if config.discover: args.extend([ "discover", "--start-directory", config.discoveryStart, ]) if config.failFast: args.append("--failfast") if config.collectCoverage: args.append("--cover") if config.eraseCoverage: args.append("--cover-erase") if config.coverageFile: args.append("--cover-file") args.append(config.coverageFile) if config.failedOnly: args.append("--failed-only") if config.testFilename: args.append(config.testFilename) args.extend(self.__testWidget.getFailedTests()) elif config.testFilename and config.testName: args.append(config.testFilename) args.append(config.testName) return args def start(self, config, pythonpath): """ Public method to start the testing process. @param config configuration for the test execution @type TestConfig @param pythonpath list of directories to be added to the Python path @type list of str """ self.reader = EricJsonReader(name="Unittest Reader", parent=self) self.reader.dataReceived.connect(self.__processData) super().start(config, pythonpath) def finished(self): """ Public method handling the unit test process been finished. This method should read the results (if necessary) and emit the signal testFinished. """ self.reader.close() output = self.readAllOutput() self.testFinished.emit([], output) @pyqtSlot(object) def __processData(self, data): """ Private slot to process the received data. @param data data object received @type dict """ # error collecting tests if data["event"] == "collecterror": self.collectError.emit([("", data["error"])]) # tests collected elif data["event"] == "collected": self.collected.emit([ (t["id"], t["name"], t["description"]) for t in data["tests"] ]) # test started elif data["event"] == "started": self.startTest.emit( (data["id"], data["name"], data["description"]) ) # test result elif data["event"] == "result": filename, lineno = None, None tracebackLines = [] if "traceback" in data: # get the error info tracebackLines = data["traceback"].splitlines() # find the last entry matching the pattern for index in range(len(tracebackLines) - 1, -1, -1): fmatch = re.search(r'File "(.*?)", line (\d*?),.*', tracebackLines[index]) if fmatch: break if fmatch: filename = fmatch.group(1) lineno = int(fmatch.group(2)) if "shortmsg" in data: message = data["shortmsg"] elif tracebackLines: message = tracebackLines[-1].split(":", 1)[1].strip() else: message = "" self.testResult.emit(TestResult( category=self.__statusCategoryMapping[data["status"]], status=self.__statusDisplayMapping[data["status"]], name=data["name"], id=data["id"], description=data["description"], message=message, extra=tracebackLines, duration=( data["duration_ms"] if "duration_ms" in data else None ), filename=filename, lineno=lineno, subtestResult=data["subtest"] if "subtest" in data else False )) # test run finished elif data["event"] == "finished": self.testRunFinished.emit(data["tests"], data["duration_s"]) # coverage data elif data["event"] == "coverage": self.coverageDataSaved.emit(data["file"])