eric7/Testing/Interfaces/UnittestRunner.py

Tue, 17 May 2022 17:23:07 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Tue, 17 May 2022 17:23:07 +0200
branch
unittest
changeset 9070
eab09a1ab8ce
parent 9066
a219ade50f7c
child 9089
b48a6d0f6309
permissions
-rw-r--r--

Implemented the "Show Coverage" functionality and corrected the coverage related code in UnittestRunner.

# -*- coding: utf-8 -*-

# Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the test runner script for the 'unittest' framework.
"""

import json
import os
import sys
import time
import unittest


sys.path.insert(
    2,
    os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
)


class EricTestResult(unittest.TestResult):
    """
    Class implementing a TestResult derivative to send the data via a network
    connection.
    """
    def __init__(self, writer, failfast):
        """
        Constructor
        
        @param writer reference to the object to write the results to
        @type EricJsonWriter
        @param failfast flag indicating to stop at the first error
        @type bool
        """
        super().__init__()
        self.__writer = writer
        self.failfast = failfast
        self.__testsRun = 0
        
        self.__currentTestStatus = {}
    
    def addFailure(self, test, err):
        """
        Public method called if a test failed.
        
        @param test reference to the test object
        @type TestCase
        @param err tuple containing the exception data like sys.exc_info
            (exception type, exception instance, traceback)
        @type tuple
        """
        super().addFailure(test, err)
        tracebackLines = self._exc_info_to_string(err, test)
        
        self.__currentTestStatus.update({
            "status": "failure",
            "traceback": tracebackLines,
        })
    
    def addError(self, test, err):
        """
        Public method called if a test errored.
        
        @param test reference to the test object
        @type TestCase
        @param err tuple containing the exception data like sys.exc_info
            (exception type, exception instance, traceback)
        @type tuple
        """
        super().addError(test, err)
        tracebackLines = self._exc_info_to_string(err, test)
        
        self.__currentTestStatus.update({
            "status": "error",
            "traceback": tracebackLines,
        })
    
    def addSkip(self, test, reason):
        """
        Public method called if a test was skipped.
        
        @param test reference to the test object
        @type TestCase
        @param reason reason for skipping the test
        @type str
        """
        super().addSkip(test, reason)
        
        self.__currentTestStatus.update({
            "status": "skipped",
            "shortmsg": reason,
        })
    
    def addExpectedFailure(self, test, err):
        """
        Public method called if a test failed expected.
        
        @param test reference to the test object
        @type TestCase
        @param err tuple containing the exception data like sys.exc_info
            (exception type, exception instance, traceback)
        @type tuple
        """
        super().addExpectedFailure(test, err)
        tracebackLines = self._exc_info_to_string(err, test)
        
        self.__currentTestStatus.update({
            "status": "expected failure",
            "traceback": tracebackLines,
        })
    
    def addUnexpectedSuccess(self, test):
        """
        Public method called if a test succeeded expectedly.
        
        @param test reference to the test object
        @type TestCase
        """
        super().addUnexpectedSuccess(test)
        
        self.__currentTestStatus["status"] = "unexpected success"
    
    def addSubTest(self, test, subtest, err):
        """
        Public method called for each subtest to record its result.
        
        @param test reference to the test object
        @type TestCase
        @param subtest reference to the subtest object
        @type TestCase
        @param err tuple containing the exception data like sys.exc_info
            (exception type, exception instance, traceback)
        @type tuple
        """
        if err is not None:
            super().addSubTest(test, subtest, err)
            tracebackLines = self._exc_info_to_string(err, test)
            status = (
                "failure"
                if issubclass(err[0], test.failureException) else
                "error"
            )
            
            # record the last subtest fail status as the overall status
            self.__currentTestStatus["status"] = status
            
            self.__writer.write({
                "event": "result",
                "status": status,
                "name": str(subtest),
                "id": subtest.id(),
                "description": subtest.shortDescription(),
                "traceback": tracebackLines,
                "subtest": True,
            })
            
            if self.failfast:
                self.stop()
        else:
            self.__writer.write({
                "event": "result",
                "status": "success",
                "name": str(subtest),
                "id": subtest.id(),
                "description": subtest.shortDescription(),
                "subtest": True,
            })
    
    def startTest(self, test):
        """
        Public method called at the start of a test.
        
        @param test reference to the test object
        @type TestCase
        """
        super().startTest(test)
        
        self.__testsRun += 1
        self.__currentTestStatus = {
            "event": "result",
            "status": "success",
            "name": str(test),
            "id": test.id(),
            "description": test.shortDescription(),
            "subtest": False,
        }
        
        self.__writer.write({
            "event": "started",
            "name": str(test),
            "id": test.id(),
            "description": test.shortDescription(),
        })
        
        self.__startTime = time.monotonic_ns()
    
    def stopTest(self, test):
        """
        Public method called at the end of a test.
        
        @param test reference to the test object
        @type TestCase
        """
        stopTime = time.monotonic_ns()
        duration = (stopTime - self.__startTime) / 1_000_000     # ms
        
        super().stopTest(test)
        
        self.__currentTestStatus["duration_ms"] = duration
        self.__writer.write(self.__currentTestStatus)
    
    def startTestRun(self):
        """
        Public method called once before any tests are executed.
        """
        self.__totalStartTime = time.monotonic_ns()
        self.__testsRun = 0
    
    def stopTestRun(self):
        """
        Public method called once after all tests are executed.
        """
        stopTime = time.monotonic_ns()
        duration = (stopTime - self.__totalStartTime) / 1_000_000_000   # s
        
        self.__writer.write({
            "event": "finished",
            "duration_s": duration,
            "tests": self.__testsRun,
        })


def _assembleTestCasesList(suite):
    """
    Protected function to assemble a list of test cases included in a test
    suite.
    
    @param suite test suite to be inspected
    @type unittest.TestSuite
    @return list of tuples containing the test case ID, the string
        representation and the short description
    @rtype list of tuples of (str, str)
    """
    testCases = []
    for test in suite:
        if isinstance(test, unittest.TestSuite):
            testCases.extend(_assembleTestCasesList(test))
        else:
            testId = test.id()
            if (
                "ModuleImportFailure" not in testId and
                "LoadTestsFailure" not in testId and
                "_FailedTest" not in testId
            ):
                testCases.append(
                    (testId, str(test), test.shortDescription())
                )
    return testCases


def runtest(argv):
    """
    Function to run the tests.
    
    @param argv list of command line parameters.
    @type list of str
    """
    from EricNetwork.EricJsonStreamWriter import EricJsonWriter
    writer = EricJsonWriter(argv[0], int(argv[1]))
    del argv[:2]
    
    # process arguments
    if argv[0] == "discover":
        discover = True
        argv.pop(0)
        if argv[0] == "--start-directory":
            discoveryStart = argv[1]
            del argv[:2]
    else:
        discover = False
        discoveryStart = ""
    
    failfast = "--failfast" in argv
    if failfast:
        argv.remove("--failfast")
    
    collectCoverage = "--cover" in argv
    if collectCoverage:
        argv.remove("--cover")
    coverageErase = "--cover-erase" in argv
    if coverageErase:
        argv.remove("--cover-erase")
    if "--cover-file" in argv:
        index = argv.index("--cover-file")
        covDataFile = argv[index + 1]
        del argv[index:index + 2]
    else:
        covDataFile = ""
    
    if argv and argv[0] == "--failed-only":
        if discover:
            testFileName = ""
            failed = argv[1:]
        else:
            testFileName = argv[1]
            failed = argv[2:]
    else:
        failed = []
        if discover:
            testFileName = testName = ""
        else:
            testFileName, testName = argv[:2]
            del argv[:2]
        
        testCases = argv[:]
    
    if testFileName:
        sys.path.insert(1, os.path.dirname(os.path.abspath(testFileName)))
    elif discoveryStart:
        sys.path.insert(1, os.path.abspath(discoveryStart))
    
    # setup test coverage
    if collectCoverage:
        if not covDataFile:
            if discover:
                covname = os.path.join(discoveryStart, "test")
            elif testFileName:
                covname = os.path.splitext(
                    os.path.abspath(testFileName))[0]
            else:
                covname = "test"
            covDataFile = "{0}.coverage".format(covname)
        if not os.path.isabs(covDataFile):
            covDataFile = os.path.abspath(covDataFile)
        
        sys.path.insert(
            2,
            os.path.abspath(os.path.join(
                os.path.dirname(__file__), "..", "..", "DebugClients", "Python"
            ))
        )
        from DebugClients.Python.coverage import Coverage
        cover = Coverage(data_file=covDataFile)
        if coverageErase:
            cover.erase()
        cover.start()
    else:
        cover = None
    
    try:
        testLoader = unittest.TestLoader()
        if discover and not failed:
            if testCases:
                test = testLoader.loadTestsFromNames(testCases)
            else:
                test = testLoader.discover(discoveryStart)
        else:
            if testFileName:
                module = __import__(os.path.splitext(
                    os.path.basename(testFileName))[0])
            else:
                module = None
            if failed:
                if module:
                    failed = [t.split(".", 1)[1]
                              for t in failed]
                test = testLoader.loadTestsFromNames(
                    failed, module)
            else:
                test = testLoader.loadTestsFromName(
                    testName, module)
    except Exception as err:
        print("Exception:", str(err))
        writer.write({
            "event": "collecterror",
            "error": str(err),
        })
        sys.exit(1)
    
    collectedTests = {
        "event": "collected",
        "tests": [
            {"id": id, "name": name, "description": desc}
            for id, name, desc in _assembleTestCasesList(test)
        ]
    }
    writer.write(collectedTests)
    
    testResult = EricTestResult(writer, failfast)
    startTestRun = getattr(testResult, 'startTestRun', None)
    if startTestRun is not None:
        startTestRun()
    try:
        test.run(testResult)
    finally:
        if cover:
            cover.stop()
            cover.save()
            writer.write({
                "event": "coverage",
                "file": covDataFile,
            })
        stopTestRun = getattr(testResult, 'stopTestRun', None)
        if stopTestRun is not None:
            stopTestRun()
    
    writer.close()
    sys.exit(0)

if __name__ == '__main__':
    if len(sys.argv) > 1:
        command = sys.argv[1]
        if command == "installed":
            sys.exit(0)
        
        elif command == "versions":
            import platform
            versions = {
                "name": "unittest",
                "version": platform.python_version(),
                "plugins": [],
            }
            print(json.dumps(versions))
            sys.exit(0)
        
        elif command == "runtest":
            runtest(sys.argv[2:])
            sys.exit(0)
    
    sys.exit(42)

#
# eflag: noqa = M801

eric ide

mercurial