src/eric7/Testing/Interfaces/PytestExecutor.py

branch
eric7-maintenance
changeset 9264
18a7312cfdb3
parent 9192
a763d57e23bc
parent 9221
bf71ee032bb4
child 9371
1da8bc75946f
diff -r d23e9854aea4 -r 18a7312cfdb3 src/eric7/Testing/Interfaces/PytestExecutor.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/eric7/Testing/Interfaces/PytestExecutor.py	Sun Jul 24 11:29:56 2022 +0200
@@ -0,0 +1,304 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the executor for the 'pytest' framework.
+"""
+
+import contextlib
+import json
+import os
+
+from PyQt6.QtCore import pyqtSlot, QProcess
+
+from EricNetwork.EricJsonStreamReader import EricJsonReader
+
+from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory
+
+
+class PytestExecutor(TestExecutorBase):
+    """
+    Class implementing the executor for the 'pytest' framework.
+    """
+
+    module = "pytest"
+    name = "pytest"
+
+    runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py")
+
+    def __init__(self, testWidget):
+        """
+        Constructor
+
+        @param testWidget reference to the unit test widget
+        @type TestingWidget
+        """
+        super().__init__(testWidget)
+
+        self.__statusDisplayMapping = {
+            "failed": self.tr("Failure"),
+            "skipped": self.tr("Skipped"),
+            "xfailed": self.tr("Expected Failure"),
+            "xpassed": self.tr("Unexpected Success"),
+            "passed": self.tr("Success"),
+        }
+
+        self.__config = None
+
+    def getVersions(self, interpreter):
+        """
+        Public method to get the test framework version and version information
+        of its installed plugins.
+
+        @param interpreter interpreter to be used for the test
+        @type str
+        @return dictionary containing the framework name and version and the
+            list of available plugins with name and version each
+        @rtype dict
+        """
+        proc = QProcess()
+        proc.start(interpreter, [PytestExecutor.runner, "versions"])
+        if proc.waitForFinished(3000):
+            exitCode = proc.exitCode()
+            if exitCode == 0:
+                outputLines = self.readAllOutput(proc).splitlines()
+                for line in outputLines:
+                    if line.startswith("{") and line.endswith("}"):
+                        with contextlib.suppress(json.JSONDecodeError):
+                            return json.loads(line)
+
+        return {}
+
+    def hasCoverage(self, interpreter):
+        """
+        Public method to get the test framework version and version information
+        of its installed plugins.
+
+        @param interpreter interpreter to be used for the test
+        @type str
+        @return flag indicating the availability of coverage functionality
+        @rtype bool
+        """
+        versions = self.getVersions(interpreter)
+        if "plugins" in versions:
+            return any(plugin["name"] == "pytest-cov" for plugin in versions["plugins"])
+
+        return False
+
+    def createArguments(self, config):
+        """
+        Public method to create the arguments needed to start the test process.
+
+        @param config configuration for the test execution
+        @type TestConfig
+        @return list of process arguments
+        @rtype list of str
+        """
+        #
+        # collectCoverage: --cov= + --cov-report= to suppress report generation
+        # eraseCoverage: --cov-append if eraseCoverage is False
+        # coverageFile
+        #
+        args = [
+            PytestExecutor.runner,
+            "runtest",
+            self.reader.address(),
+            str(self.reader.port()),
+            "--quiet",
+        ]
+
+        if config.failFast:
+            args.append("--exitfirst")
+
+        if config.failedOnly:
+            args.append("--last-failed")
+        else:
+            args.append("--cache-clear")
+
+        if config.collectCoverage:
+            args.extend(["--cov=.", "--cov-report="])
+            if not config.eraseCoverage:
+                args.append("--cov-append")
+
+        if config.testFilename:
+            if config.testName:
+                args.append(
+                    "{0}::{1}".format(
+                        config.testFilename, config.testName.replace(".", "::")
+                    )
+                )
+            else:
+                args.append(config.testFilename)
+
+        return args
+
+    def start(self, config, pythonpath):
+        """
+        Public method to start the testing process.
+
+        @param config configuration for the test execution
+        @type TestConfig
+        @param pythonpath list of directories to be added to the Python path
+        @type list of str
+        """
+        self.reader = EricJsonReader(name="Unittest Reader", parent=self)
+        self.reader.dataReceived.connect(self.__processData)
+
+        self.__config = config
+
+        if config.discoveryStart:
+            pythonpath.insert(0, os.path.abspath(config.discoveryStart))
+        elif config.testFilename:
+            pythonpath.insert(0, os.path.abspath(os.path.dirname(config.testFilename)))
+
+        if config.discover:
+            self.__rootdir = config.discoveryStart
+        elif config.testFilename:
+            self.__rootdir = os.path.dirname(config.testFilename)
+        else:
+            self.__rootdir = ""
+
+        super().start(config, pythonpath)
+
+    def finished(self):
+        """
+        Public method handling the unit test process been finished.
+
+        This method should read the results (if necessary) and emit the signal
+        testFinished.
+        """
+        if self.__config.collectCoverage:
+            self.coverageDataSaved.emit(os.path.join(self.__rootdir, ".coverage"))
+
+        self.__config = None
+
+        self.reader.close()
+
+        output = self.readAllOutput()
+        self.testFinished.emit([], output)
+
+    @pyqtSlot(object)
+    def __processData(self, data):
+        """
+        Private slot to process the received data.
+
+        @param data data object received
+        @type dict
+        """
+        # test configuration
+        if data["event"] == "config":
+            self.__rootdir = data["root"]
+
+        # error collecting tests
+        elif data["event"] == "collecterror":
+            name = self.__normalizeModuleName(data["nodeid"])
+            self.collectError.emit([(name, data["report"])])
+
+        # tests collected
+        elif data["event"] == "collected":
+            self.collected.emit(
+                [(data["nodeid"], self.__nodeid2testname(data["nodeid"]), "")]
+            )
+
+        # test started
+        elif data["event"] == "starttest":
+            self.startTest.emit(
+                (data["nodeid"], self.__nodeid2testname(data["nodeid"]), "")
+            )
+
+        # test result
+        elif data["event"] == "result":
+            if data["status"] in ("failed", "xpassed") or data["with_error"]:
+                category = TestResultCategory.FAIL
+            elif data["status"] in ("passed", "xfailed"):
+                category = TestResultCategory.OK
+            else:
+                category = TestResultCategory.SKIP
+
+            status = (
+                self.tr("Error")
+                if data["with_error"]
+                else self.__statusDisplayMapping[data["status"]]
+            )
+
+            message = data.get("message", "")
+            extraText = data.get("report", "")
+            reportPhase = data.get("report_phase")
+            if reportPhase in ("setup", "teardown"):
+                message = self.tr("ERROR at {0}: {1}", "phase, message").format(
+                    reportPhase, message
+                )
+                extraText = self.tr("ERROR at {0}: {1}", "phase, extra text").format(
+                    reportPhase, extraText
+                )
+            sections = data.get("sections", [])
+            if sections:
+                extraText += "\n"
+                for heading, text in sections:
+                    extraText += "----- {0} -----\n{1}".format(heading, text)
+
+            duration = data.get("duration_s", None)
+            if duration:
+                # convert to ms
+                duration *= 1000
+
+            filename = data["filename"]
+            if self.__rootdir:
+                filename = os.path.join(self.__rootdir, filename)
+
+            self.testResult.emit(
+                TestResult(
+                    category=category,
+                    status=status,
+                    name=self.__nodeid2testname(data["nodeid"]),
+                    id=data["nodeid"],
+                    description="",
+                    message=message,
+                    extra=extraText.rstrip().splitlines(),
+                    duration=duration,
+                    filename=filename,
+                    lineno=data.get("linenumber", 0) + 1,
+                    # pytest reports 0-based line numbers
+                )
+            )
+
+        # test run finished
+        elif data["event"] == "finished":
+            self.testRunFinished.emit(data["tests"], data["duration_s"])
+
+    def __normalizeModuleName(self, name):
+        r"""
+        Private method to convert a module name reported by pytest to Python
+        conventions.
+
+        This method strips the extensions '.pyw' and '.py' first and replaces
+        '/' and '\' thereafter.
+
+        @param name module name reported by pytest
+        @type str
+        @return module name iaw. Python conventions
+        @rtype str
+        """
+        return (
+            name.replace(".pyw", "")
+            .replace(".py", "")
+            .replace("/", ".")
+            .replace("\\", ".")
+        )
+
+    def __nodeid2testname(self, nodeid):
+        """
+        Private method to convert a nodeid to a test name.
+
+        @param nodeid nodeid to be converted
+        @type str
+        @return test name
+        @rtype str
+        """
+        module, name = nodeid.split("::", 1)
+        module = self.__normalizeModuleName(module)
+        name = name.replace("::", ".")
+        testname, name = "{0}.{1}".format(module, name).rsplit(".", 1)
+        return "{0} ({1})".format(name, testname)

eric ide

mercurial