src/eric7/Testing/Interfaces/PytestExecutor.py

Sun, 24 Jul 2022 11:29:56 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 24 Jul 2022 11:29:56 +0200
branch
eric7-maintenance
changeset 9264
18a7312cfdb3
parent 9192
eric7/Testing/Interfaces/PytestExecutor.py@a763d57e23bc
parent 9221
eric7/Testing/Interfaces/PytestExecutor.py@bf71ee032bb4
child 9371
1da8bc75946f
permissions
-rw-r--r--

Merged with branch 'eric7' to prepare a new release.

# -*- coding: utf-8 -*-

# Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the executor for the 'pytest' framework.
"""

import contextlib
import json
import os

from PyQt6.QtCore import pyqtSlot, QProcess

from EricNetwork.EricJsonStreamReader import EricJsonReader

from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory


class PytestExecutor(TestExecutorBase):
    """
    Class implementing the executor for the 'pytest' framework.
    """

    module = "pytest"
    name = "pytest"

    runner = os.path.join(os.path.dirname(__file__), "PytestRunner.py")

    def __init__(self, testWidget):
        """
        Constructor

        @param testWidget reference to the unit test widget
        @type TestingWidget
        """
        super().__init__(testWidget)

        self.__statusDisplayMapping = {
            "failed": self.tr("Failure"),
            "skipped": self.tr("Skipped"),
            "xfailed": self.tr("Expected Failure"),
            "xpassed": self.tr("Unexpected Success"),
            "passed": self.tr("Success"),
        }

        self.__config = None

    def getVersions(self, interpreter):
        """
        Public method to get the test framework version and version information
        of its installed plugins.

        @param interpreter interpreter to be used for the test
        @type str
        @return dictionary containing the framework name and version and the
            list of available plugins with name and version each
        @rtype dict
        """
        proc = QProcess()
        proc.start(interpreter, [PytestExecutor.runner, "versions"])
        if proc.waitForFinished(3000):
            exitCode = proc.exitCode()
            if exitCode == 0:
                outputLines = self.readAllOutput(proc).splitlines()
                for line in outputLines:
                    if line.startswith("{") and line.endswith("}"):
                        with contextlib.suppress(json.JSONDecodeError):
                            return json.loads(line)

        return {}

    def hasCoverage(self, interpreter):
        """
        Public method to get the test framework version and version information
        of its installed plugins.

        @param interpreter interpreter to be used for the test
        @type str
        @return flag indicating the availability of coverage functionality
        @rtype bool
        """
        versions = self.getVersions(interpreter)
        if "plugins" in versions:
            return any(plugin["name"] == "pytest-cov" for plugin in versions["plugins"])

        return False

    def createArguments(self, config):
        """
        Public method to create the arguments needed to start the test process.

        @param config configuration for the test execution
        @type TestConfig
        @return list of process arguments
        @rtype list of str
        """
        #
        # collectCoverage: --cov= + --cov-report= to suppress report generation
        # eraseCoverage: --cov-append if eraseCoverage is False
        # coverageFile
        #
        args = [
            PytestExecutor.runner,
            "runtest",
            self.reader.address(),
            str(self.reader.port()),
            "--quiet",
        ]

        if config.failFast:
            args.append("--exitfirst")

        if config.failedOnly:
            args.append("--last-failed")
        else:
            args.append("--cache-clear")

        if config.collectCoverage:
            args.extend(["--cov=.", "--cov-report="])
            if not config.eraseCoverage:
                args.append("--cov-append")

        if config.testFilename:
            if config.testName:
                args.append(
                    "{0}::{1}".format(
                        config.testFilename, config.testName.replace(".", "::")
                    )
                )
            else:
                args.append(config.testFilename)

        return args

    def start(self, config, pythonpath):
        """
        Public method to start the testing process.

        @param config configuration for the test execution
        @type TestConfig
        @param pythonpath list of directories to be added to the Python path
        @type list of str
        """
        self.reader = EricJsonReader(name="Unittest Reader", parent=self)
        self.reader.dataReceived.connect(self.__processData)

        self.__config = config

        if config.discoveryStart:
            pythonpath.insert(0, os.path.abspath(config.discoveryStart))
        elif config.testFilename:
            pythonpath.insert(0, os.path.abspath(os.path.dirname(config.testFilename)))

        if config.discover:
            self.__rootdir = config.discoveryStart
        elif config.testFilename:
            self.__rootdir = os.path.dirname(config.testFilename)
        else:
            self.__rootdir = ""

        super().start(config, pythonpath)

    def finished(self):
        """
        Public method handling the unit test process been finished.

        This method should read the results (if necessary) and emit the signal
        testFinished.
        """
        if self.__config.collectCoverage:
            self.coverageDataSaved.emit(os.path.join(self.__rootdir, ".coverage"))

        self.__config = None

        self.reader.close()

        output = self.readAllOutput()
        self.testFinished.emit([], output)

    @pyqtSlot(object)
    def __processData(self, data):
        """
        Private slot to process the received data.

        @param data data object received
        @type dict
        """
        # test configuration
        if data["event"] == "config":
            self.__rootdir = data["root"]

        # error collecting tests
        elif data["event"] == "collecterror":
            name = self.__normalizeModuleName(data["nodeid"])
            self.collectError.emit([(name, data["report"])])

        # tests collected
        elif data["event"] == "collected":
            self.collected.emit(
                [(data["nodeid"], self.__nodeid2testname(data["nodeid"]), "")]
            )

        # test started
        elif data["event"] == "starttest":
            self.startTest.emit(
                (data["nodeid"], self.__nodeid2testname(data["nodeid"]), "")
            )

        # test result
        elif data["event"] == "result":
            if data["status"] in ("failed", "xpassed") or data["with_error"]:
                category = TestResultCategory.FAIL
            elif data["status"] in ("passed", "xfailed"):
                category = TestResultCategory.OK
            else:
                category = TestResultCategory.SKIP

            status = (
                self.tr("Error")
                if data["with_error"]
                else self.__statusDisplayMapping[data["status"]]
            )

            message = data.get("message", "")
            extraText = data.get("report", "")
            reportPhase = data.get("report_phase")
            if reportPhase in ("setup", "teardown"):
                message = self.tr("ERROR at {0}: {1}", "phase, message").format(
                    reportPhase, message
                )
                extraText = self.tr("ERROR at {0}: {1}", "phase, extra text").format(
                    reportPhase, extraText
                )
            sections = data.get("sections", [])
            if sections:
                extraText += "\n"
                for heading, text in sections:
                    extraText += "----- {0} -----\n{1}".format(heading, text)

            duration = data.get("duration_s", None)
            if duration:
                # convert to ms
                duration *= 1000

            filename = data["filename"]
            if self.__rootdir:
                filename = os.path.join(self.__rootdir, filename)

            self.testResult.emit(
                TestResult(
                    category=category,
                    status=status,
                    name=self.__nodeid2testname(data["nodeid"]),
                    id=data["nodeid"],
                    description="",
                    message=message,
                    extra=extraText.rstrip().splitlines(),
                    duration=duration,
                    filename=filename,
                    lineno=data.get("linenumber", 0) + 1,
                    # pytest reports 0-based line numbers
                )
            )

        # test run finished
        elif data["event"] == "finished":
            self.testRunFinished.emit(data["tests"], data["duration_s"])

    def __normalizeModuleName(self, name):
        r"""
        Private method to convert a module name reported by pytest to Python
        conventions.

        This method strips the extensions '.pyw' and '.py' first and replaces
        '/' and '\' thereafter.

        @param name module name reported by pytest
        @type str
        @return module name iaw. Python conventions
        @rtype str
        """
        return (
            name.replace(".pyw", "")
            .replace(".py", "")
            .replace("/", ".")
            .replace("\\", ".")
        )

    def __nodeid2testname(self, nodeid):
        """
        Private method to convert a nodeid to a test name.

        @param nodeid nodeid to be converted
        @type str
        @return test name
        @rtype str
        """
        module, name = nodeid.split("::", 1)
        module = self.__normalizeModuleName(module)
        name = name.replace("::", ".")
        testname, name = "{0}.{1}".format(module, name).rsplit(".", 1)
        return "{0} ({1})".format(name, testname)

eric ide

mercurial