eric7/Testing/Interfaces/PytestExecutor.py

branch
unittest
changeset 9089
b48a6d0f6309
parent 9066
a219ade50f7c
child 9175
21e2be5f0b41
--- a/eric7/Testing/Interfaces/PytestExecutor.py	Fri May 20 11:31:18 2022 +0200
+++ b/eric7/Testing/Interfaces/PytestExecutor.py	Mon May 23 16:48:19 2022 +0200
@@ -11,12 +11,13 @@
 import json
 import os
 
-from PyQt6.QtCore import QProcess
+from PyQt6.QtCore import pyqtSlot, QProcess
 
-from .TestExecutorBase import TestExecutorBase
+from EricNetwork.EricJsonStreamReader import EricJsonReader
+
+from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory
 
 
-# TODO: implement 'pytest' support in PytestExecutor
 class PytestExecutor(TestExecutorBase):
     """
     Class implementing the executor for the 'pytest' framework.
@@ -26,6 +27,25 @@
     
     runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py")
     
+    def __init__(self, testWidget):
+        """
+        Constructor
+        
+        @param testWidget reference to the unit test widget
+        @type TestingWidget
+        """
+        super().__init__(testWidget)
+        
+        self.__statusDisplayMapping = {
+            "failed": self.tr("Failure"),
+            "skipped": self.tr("Skipped"),
+            "xfailed": self.tr("Expected Failure"),
+            "xpassed": self.tr("Unexpected Success"),
+            "passed": self.tr("Success"),
+        }
+        
+        self.__config = None
+    
     def getVersions(self, interpreter):
         """
         Public method to get the test framework version and version information
@@ -49,3 +69,242 @@
                             return json.loads(line)
         
         return {}
+    
+    def hasCoverage(self, interpreter):
+        """
+        Public method to get the test framework version and version information
+        of its installed plugins.
+        
+        @param interpreter interpreter to be used for the test
+        @type str
+        @return flag indicating the availability of coverage functionality
+        @rtype bool
+        """
+        versions = self.getVersions(interpreter)
+        if "plugins" in versions:
+            return any(plugin["name"] == "pytest-cov"
+                       for plugin in versions["plugins"])
+        
+        return False
+    
+    def createArguments(self, config):
+        """
+        Public method to create the arguments needed to start the test process.
+        
+        @param config configuration for the test execution
+        @type TestConfig
+        @return list of process arguments
+        @rtype list of str
+        """
+        #
+        # collectCoverage: --cov= + --cov-report= to suppress report generation
+        # eraseCoverage: --cov-append if eraseCoverage is False
+        # coverageFile
+        args = [
+            PytestExecutor.runner,
+            "runtest",
+            self.reader.address(),
+            str(self.reader.port()),
+            "--quiet",
+        ]
+        
+        if config.failFast:
+            args.append("--exitfirst")
+        
+        if config.failedOnly:
+            args.append("--last-failed")
+        else:
+            args.append("--cache-clear")
+        
+        if config.collectCoverage:
+            args.extend([
+                "--cov=.",
+                "--cov-report="
+            ])
+            if not config.eraseCoverage:
+                args.append("--cov-append")
+        
+        if config.testFilename:
+            if config.testName:
+                args.append("{0}::{1}".format(
+                    config.testFilename,
+                    config.testName.replace(".", "::")
+                ))
+            else:
+                args.append(config.testFilename)
+        
+        return args
+    
+    def start(self, config, pythonpath):
+        """
+        Public method to start the testing process.
+        
+        @param config configuration for the test execution
+        @type TestConfig
+        @param pythonpath list of directories to be added to the Python path
+        @type list of str
+        """
+        self.reader = EricJsonReader(name="Unittest Reader", parent=self)
+        self.reader.dataReceived.connect(self.__processData)
+        
+        self.__config = config
+        
+        if config.discoveryStart:
+            pythonpath.insert(0, os.path.abspath(config.discoveryStart))
+        elif config.testFilename:
+            pythonpath.insert(
+                0, os.path.abspath(os.path.dirname(config.testFilename)))
+        
+        if config.discover:
+            self.__rootdir = config.discoveryStart
+        elif config.testFilename:
+            self.__rootdir = os.path.dirname(config.testFilename)
+        else:
+            self.__rootdir = ""
+        
+        super().start(config, pythonpath)
+    
+    def finished(self):
+        """
+        Public method handling the unit test process been finished.
+        
+        This method should read the results (if necessary) and emit the signal
+        testFinished.
+        """
+        if self.__config.collectCoverage:
+            self.coverageDataSaved.emit(
+                os.path.join(self.__rootdir, ".coverage"))
+        
+        self.__config = None
+        
+        self.reader.close()
+        
+        output = self.readAllOutput()
+        self.testFinished.emit([], output)
+    
+    @pyqtSlot(object)
+    def __processData(self, data):
+        """
+        Private slot to process the received data.
+        
+        @param data data object received
+        @type dict
+        """
+        # test configuration
+        if data["event"] == "config":
+            self.__rootdir = data["root"]
+        
+        # error collecting tests
+        elif data["event"] == "collecterror":
+            name = self.__normalizeModuleName(data["nodeid"])
+            self.collectError.emit([(name, data["report"])])
+        
+        # tests collected
+        elif data["event"] == "collected":
+            self.collected.emit([
+                (data["nodeid"],
+                 self.__nodeid2testname(data["nodeid"]),
+                 "")
+            ])
+        
+        # test started
+        elif data["event"] == "starttest":
+            self.startTest.emit(
+                (data["nodeid"],
+                 self.__nodeid2testname(data["nodeid"]),
+                 "")
+            )
+        
+        # test result
+        elif data["event"] == "result":
+            if data["status"] in ("failed", "xpassed") or data["with_error"]:
+                category = TestResultCategory.FAIL
+            elif data["status"] in ("passed", "xfailed"):
+                category = TestResultCategory.OK
+            else:
+                category = TestResultCategory.SKIP
+            
+            status = (
+                self.tr("Error")
+                if data["with_error"] else
+                self.__statusDisplayMapping[data["status"]]
+            )
+            
+            message = data.get("message", "")
+            extraText = data.get("report", "")
+            reportPhase = data.get("report_phase")
+            if reportPhase in ("setup", "teardown"):
+                message = (
+                    self.tr("ERROR at {0}: {1}", "phase, message")
+                    .format(reportPhase, message)
+                )
+                extraText = (
+                    self.tr("ERROR at {0}: {1}", "phase, extra text")
+                    .format(reportPhase, extraText)
+                )
+            sections = data.get("sections", [])
+            if sections:
+                extraText += "\n"
+                for heading, text in sections:
+                    extraText += "----- {0} -----\n{1}".format(heading, text)
+            
+            duration = data.get("duration_s", None)
+            if duration:
+                # convert to ms
+                duration *= 1000
+            
+            filename = data["filename"]
+            if self.__rootdir:
+                filename = os.path.join(self.__rootdir, filename)
+            
+            self.testResult.emit(TestResult(
+                category=category,
+                status=status,
+                name=self.__nodeid2testname(data["nodeid"]),
+                id=data["nodeid"],
+                description="",
+                message=message,
+                extra=extraText.rstrip().splitlines(),
+                duration=duration,
+                filename=filename,
+                lineno=data.get("linenumber", 0) + 1,
+                # pytest reports 0-based line numbers
+            ))
+        
+        # test run finished
+        elif data["event"] == "finished":
+            self.testRunFinished.emit(data["tests"], data["duration_s"])
+    
+    def __normalizeModuleName(self, name):
+        r"""
+        Private method to convert a module name reported by pytest to Python
+        conventions.
+        
+        This method strips the extensions '.pyw' and '.py' first and replaces
+        '/' and '\' thereafter.
+        
+        @param name module name reported by pytest
+        @type str
+        @return module name iaw. Python conventions
+        @rtype str
+        """
+        return (name
+                .replace(".pyw", "")
+                .replace(".py", "")
+                .replace("/", ".")
+                .replace("\\", "."))
+    
+    def __nodeid2testname(self, nodeid):
+        """
+        Private method to convert a nodeid to a test name.
+        
+        @param nodeid nodeid to be converted
+        @type str
+        @return test name
+        @rtype str
+        """
+        module, name = nodeid.split("::", 1)
+        module = self.__normalizeModuleName(module)
+        name = name.replace("::", ".")
+        testname, name = "{0}.{1}".format(module, name).rsplit(".", 1)
+        return "{0} ({1})".format(name, testname)

eric ide

mercurial