src/eric7/Testing/Interfaces/UnittestExecutor.py

Mon, 22 May 2023 09:07:37 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Mon, 22 May 2023 09:07:37 +0200
branch
eric7
changeset 10050
3750abc45d5e
parent 9653
e67609152c5e
child 10069
435cc5875135
permissions
-rw-r--r--

Corrected some code style issues detected by the extended checkers.

# -*- coding: utf-8 -*-

# Copyright (c) 2022 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the executor for the standard 'unittest' framework.
"""

import contextlib
import json
import os
import re

from PyQt6.QtCore import QProcess, pyqtSlot

from eric7.EricNetwork.EricJsonStreamReader import EricJsonReader

from .TestExecutorBase import TestExecutorBase, TestResult, TestResultCategory


class UnittestExecutor(TestExecutorBase):
    """
    Class implementing the executor for the standard 'unittest' framework.
    """

    module = "unittest"
    name = "unittest"

    runner = os.path.join(os.path.dirname(__file__), "UnittestRunner.py")

    def __init__(self, testWidget):
        """
        Constructor

        @param testWidget reference to the unit test widget
        @type TestingWidget
        """
        super().__init__(testWidget)

        self.__statusCategoryMapping = {
            "failure": TestResultCategory.FAIL,
            "error": TestResultCategory.FAIL,
            "skipped": TestResultCategory.SKIP,
            "expected failure": TestResultCategory.OK,
            "unexpected success": TestResultCategory.FAIL,
            "success": TestResultCategory.OK,
        }

        self.__statusDisplayMapping = {
            "failure": self.tr("Failure"),
            "error": self.tr("Error"),
            "skipped": self.tr("Skipped"),
            "expected failure": self.tr("Expected Failure"),
            "unexpected success": self.tr("Unexpected Success"),
            "success": self.tr("Success"),
        }

        self.__testWidget = testWidget

    def getVersions(self, interpreter):
        """
        Public method to get the test framework version and version information
        of its installed plugins.

        @param interpreter interpreter to be used for the test
        @type str
        @return dictionary containing the framework name and version and the
            list of available plugins with name and version each
        @rtype dict
        """
        proc = QProcess()
        proc.start(interpreter, [UnittestExecutor.runner, "versions"])
        if proc.waitForFinished(3000):
            exitCode = proc.exitCode()
            if exitCode == 0:
                versionsStr = self.readAllOutput(proc)
                with contextlib.suppress(json.JSONDecodeError):
                    return json.loads(versionsStr)

        return {}

    def hasCoverage(self, interpreter):
        """
        Public method to get the test framework version and version information
        of its installed plugins.

        @param interpreter interpreter to be used for the test
        @type str
        @return flag indicating the availability of coverage functionality
        @rtype bool
        """
        return True

    def supportsPatterns(self, interpreter):
        """
        Public method to indicate the support for test filtering using test name
        patterns or a test name pattern expression.

        @param interpreter interpreter to be used for the test
        @type str
        @return flag indicating support of markers
        @rtype bool
        """
        return True

    def createArguments(self, config):
        """
        Public method to create the arguments needed to start the test process.

        @param config configuration for the test execution
        @type TestConfig
        @return list of process arguments
        @rtype list of str
        """
        args = [
            UnittestExecutor.runner,
            "runtest",
            self.reader.address(),
            str(self.reader.port()),
        ]

        if config.discover:
            args.extend(
                [
                    "discover",
                    "--start-directory",
                    config.discoveryStart,
                ]
            )

        if config.failFast:
            args.append("--failfast")

        if config.collectCoverage:
            args.append("--cover")
            if config.eraseCoverage:
                args.append("--cover-erase")
            if config.coverageFile:
                args.append("--cover-file")
                args.append(config.coverageFile)

        if config.testNamePattern:
            args.append("--pattern")
            args.append(config.testNamePattern)

        if config.failedOnly:
            args.append("--failed-only")
            if config.testFilename:
                args.append(config.testFilename)
            args.extend(self.__testWidget.getFailedTests())
        elif config.testFilename:
            args.append(config.testFilename)
            args.append(config.testName if config.testName else "@NONE@")
            # @NONE@ is just a marker for no test name given

        return args

    def start(self, config, pythonpath):
        """
        Public method to start the testing process.

        @param config configuration for the test execution
        @type TestConfig
        @param pythonpath list of directories to be added to the Python path
        @type list of str
        """
        self.reader = EricJsonReader(name="Unittest Reader", parent=self)
        self.reader.dataReceived.connect(self.__processData)

        super().start(config, pythonpath)

    def finished(self):
        """
        Public method handling the unit test process been finished.

        This method should read the results (if necessary) and emit the signal
        testFinished.
        """
        self.reader.close()

        output = self.readAllOutput()
        self.testFinished.emit([], output)

    @pyqtSlot(object)
    def __processData(self, data):
        """
        Private slot to process the received data.

        @param data data object received
        @type dict
        """
        # error collecting tests
        if data["event"] == "collecterror":
            self.collectError.emit([("", data["error"])])

        # tests collected
        elif data["event"] == "collected":
            self.collected.emit(
                [(t["id"], t["name"], t["description"]) for t in data["tests"]]
            )

        # test started
        elif data["event"] == "started":
            self.startTest.emit((data["id"], data["name"], data["description"]))

        # test result
        elif data["event"] == "result":
            filename, lineno = None, None
            tracebackLines = data.get("traceback", "").splitlines()
            if tracebackLines:
                # find the last entry matching the pattern
                for index in range(len(tracebackLines) - 1, -1, -1):
                    fmatch = re.search(
                        r'File "(.*?)", line (\d*?),.*', tracebackLines[index]
                    )
                    if fmatch:
                        break
                if fmatch:
                    filename = fmatch.group(1)
                    lineno = int(fmatch.group(2))

            message = data.get("shortmsg", "")
            if not message and tracebackLines:
                # search the line containing the assertion error
                for index in range(len(tracebackLines) - 1, -1, -1):
                    line = tracebackLines[index].strip()
                    if line.startswith("AssertionError:"):
                        message = line.replace("AssertionError:", "").strip()
                        break

            self.testResult.emit(
                TestResult(
                    category=self.__statusCategoryMapping[data["status"]],
                    status=self.__statusDisplayMapping[data["status"]],
                    name=data["name"],
                    id=data["id"],
                    description=data["description"],
                    message=message,
                    extra=tracebackLines,
                    duration=data.get("duration_ms"),
                    filename=filename,
                    lineno=lineno,
                    subtestResult=data.get("subtest", False),
                )
            )

        # test run finished
        elif data["event"] == "finished":
            self.testRunFinished.emit(data["tests"], data["duration_s"])

        # coverage data
        elif data["event"] == "coverage":
            self.coverageDataSaved.emit(data["file"])

eric ide

mercurial