eric7/Testing/Interfaces/UnittestRunner.py

branch
unittest
changeset 9066
a219ade50f7c
parent 9064
339bb8c8007d
child 9070
eab09a1ab8ce
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/eric7/Testing/Interfaces/UnittestRunner.py	Mon May 16 19:46:51 2022 +0200
@@ -0,0 +1,423 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the test runner script for the 'unittest' framework.
+"""
+
+import json
+import os
+import sys
+import time
+import unittest
+
+
+sys.path.insert(
+    2,
+    os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+)
+
+
+class EricTestResult(unittest.TestResult):
+    """
+    Class implementing a TestResult derivative to send the data via a network
+    connection.
+    """
+    def __init__(self, writer, failfast):
+        """
+        Constructor
+        
+        @param writer reference to the object to write the results to
+        @type EricJsonWriter
+        @param failfast flag indicating to stop at the first error
+        @type bool
+        """
+        super().__init__()
+        self.__writer = writer
+        self.failfast = failfast
+        self.__testsRun = 0
+        
+        self.__currentTestStatus = {}
+    
+    def addFailure(self, test, err):
+        """
+        Public method called if a test failed.
+        
+        @param test reference to the test object
+        @type TestCase
+        @param err tuple containing the exception data like sys.exc_info
+            (exception type, exception instance, traceback)
+        @type tuple
+        """
+        super().addFailure(test, err)
+        tracebackLines = self._exc_info_to_string(err, test)
+        
+        self.__currentTestStatus.update({
+            "status": "failure",
+            "traceback": tracebackLines,
+        })
+    
+    def addError(self, test, err):
+        """
+        Public method called if a test errored.
+        
+        @param test reference to the test object
+        @type TestCase
+        @param err tuple containing the exception data like sys.exc_info
+            (exception type, exception instance, traceback)
+        @type tuple
+        """
+        super().addError(test, err)
+        tracebackLines = self._exc_info_to_string(err, test)
+        
+        self.__currentTestStatus.update({
+            "status": "error",
+            "traceback": tracebackLines,
+        })
+    
+    def addSkip(self, test, reason):
+        """
+        Public method called if a test was skipped.
+        
+        @param test reference to the test object
+        @type TestCase
+        @param reason reason for skipping the test
+        @type str
+        """
+        super().addSkip(test, reason)
+        
+        self.__currentTestStatus.update({
+            "status": "skipped",
+            "shortmsg": reason,
+        })
+    
+    def addExpectedFailure(self, test, err):
+        """
+        Public method called if a test failed expected.
+        
+        @param test reference to the test object
+        @type TestCase
+        @param err tuple containing the exception data like sys.exc_info
+            (exception type, exception instance, traceback)
+        @type tuple
+        """
+        super().addExpectedFailure(test, err)
+        tracebackLines = self._exc_info_to_string(err, test)
+        
+        self.__currentTestStatus.update({
+            "status": "expected failure",
+            "traceback": tracebackLines,
+        })
+    
+    def addUnexpectedSuccess(self, test):
+        """
+        Public method called if a test succeeded expectedly.
+        
+        @param test reference to the test object
+        @type TestCase
+        """
+        super().addUnexpectedSuccess(test)
+        
+        self.__currentTestStatus["status"] = "unexpected success"
+    
+    def addSubTest(self, test, subtest, err):
+        """
+        Public method called for each subtest to record its result.
+        
+        @param test reference to the test object
+        @type TestCase
+        @param subtest reference to the subtest object
+        @type TestCase
+        @param err tuple containing the exception data like sys.exc_info
+            (exception type, exception instance, traceback)
+        @type tuple
+        """
+        if err is not None:
+            super().addSubTest(test, subtest, err)
+            tracebackLines = self._exc_info_to_string(err, test)
+            status = (
+                "failure"
+                if issubclass(err[0], test.failureException) else
+                "error"
+            )
+            
+            # record the last subtest fail status as the overall status
+            self.__currentTestStatus["status"] = status
+            
+            self.__writer.write({
+                "event": "result",
+                "status": status,
+                "name": str(subtest),
+                "id": subtest.id(),
+                "description": subtest.shortDescription(),
+                "traceback": tracebackLines,
+                "subtest": True,
+            })
+            
+            if self.failfast:
+                self.stop()
+        else:
+            self.__writer.write({
+                "event": "result",
+                "status": "success",
+                "name": str(subtest),
+                "id": subtest.id(),
+                "description": subtest.shortDescription(),
+                "subtest": True,
+            })
+    
+    def startTest(self, test):
+        """
+        Public method called at the start of a test.
+        
+        @param test reference to the test object
+        @type TestCase
+        """
+        super().startTest(test)
+        
+        self.__testsRun += 1
+        self.__currentTestStatus = {
+            "event": "result",
+            "status": "success",
+            "name": str(test),
+            "id": test.id(),
+            "description": test.shortDescription(),
+            "subtest": False,
+        }
+        
+        self.__writer.write({
+            "event": "started",
+            "name": str(test),
+            "id": test.id(),
+            "description": test.shortDescription(),
+        })
+        
+        self.__startTime = time.monotonic_ns()
+    
+    def stopTest(self, test):
+        """
+        Public method called at the end of a test.
+        
+        @param test reference to the test object
+        @type TestCase
+        """
+        stopTime = time.monotonic_ns()
+        duration = (stopTime - self.__startTime) / 1_000_000     # ms
+        
+        super().stopTest(test)
+        
+        self.__currentTestStatus["duration_ms"] = duration
+        self.__writer.write(self.__currentTestStatus)
+    
+    def startTestRun(self):
+        """
+        Public method called once before any tests are executed.
+        """
+        self.__totalStartTime = time.monotonic_ns()
+        self.__testsRun = 0
+    
+    def stopTestRun(self):
+        """
+        Public method called once after all tests are executed.
+        """
+        stopTime = time.monotonic_ns()
+        duration = (stopTime - self.__totalStartTime) / 1_000_000_000   # s
+        
+        self.__writer.write({
+            "event": "finished",
+            "duration_s": duration,
+            "tests": self.__testsRun,
+        })
+
+
+def _assembleTestCasesList(suite):
+    """
+    Protected function to assemble a list of test cases included in a test
+    suite.
+    
+    @param suite test suite to be inspected
+    @type unittest.TestSuite
+    @return list of tuples containing the test case ID, the string
+        representation and the short description
+    @rtype list of tuples of (str, str)
+    """
+    testCases = []
+    for test in suite:
+        if isinstance(test, unittest.TestSuite):
+            testCases.extend(_assembleTestCasesList(test))
+        else:
+            testId = test.id()
+            if (
+                "ModuleImportFailure" not in testId and
+                "LoadTestsFailure" not in testId and
+                "_FailedTest" not in testId
+            ):
+                testCases.append(
+                    (testId, str(test), test.shortDescription())
+                )
+    return testCases
+
+
+def runtest(argv):
+    """
+    Function to run the tests.
+    
+    @param argv list of command line parameters.
+    @type list of str
+    """
+    from EricNetwork.EricJsonStreamWriter import EricJsonWriter
+    writer = EricJsonWriter(argv[0], int(argv[1]))
+    del argv[:2]
+    
+    # process arguments
+    if argv[0] == "discover":
+        discover = True
+        argv.pop(0)
+        if argv[0] == "--start-directory":
+            discoveryStart = argv[1]
+            del argv[:2]
+    else:
+        discover = False
+        discoveryStart = ""
+    
+    failfast = "--failfast" in argv
+    if failfast:
+        argv.remove("--failfast")
+    
+    coverage = "--cover" in argv
+    if coverage:
+        argv.remove("--cover")
+    coverageErase = "--cover-erase" in argv
+    if coverageErase:
+        argv.remove("--cover-erase")
+    
+    if argv and argv[0] == "--failed-only":
+        if discover:
+            testFileName = ""
+            failed = argv[1:]
+        else:
+            testFileName = argv[1]
+            failed = argv[2:]
+    else:
+        failed = []
+        if discover:
+            testFileName = testName = ""
+        else:
+            testFileName, testName = argv[:2]
+            del argv[:2]
+        
+        testCases = argv[:]
+    
+    if testFileName:
+        sys.path.insert(1, os.path.dirname(os.path.abspath(testFileName)))
+    elif discoveryStart:
+        sys.path.insert(1, os.path.abspath(discoveryStart))
+    
+    try:
+        testLoader = unittest.TestLoader()
+        if discover and not failed:
+            if testCases:
+                test = testLoader.loadTestsFromNames(testCases)
+            else:
+                test = testLoader.discover(discoveryStart)
+        else:
+            if testFileName:
+                module = __import__(os.path.splitext(
+                    os.path.basename(testFileName))[0])
+            else:
+                module = None
+            if failed:
+                if module:
+                    failed = [t.split(".", 1)[1]
+                              for t in failed]
+                test = testLoader.loadTestsFromNames(
+                    failed, module)
+            else:
+                test = testLoader.loadTestsFromName(
+                    testName, module)
+    except Exception as err:
+        print("Exception:", str(err))
+        writer.write({
+            "event": "collecterror",
+            "error": str(err),
+        })
+        sys.exit(1)
+    
+    collectedTests = {
+        "event": "collected",
+        "tests": [
+            {"id": id, "name": name, "description": desc}
+            for id, name, desc in _assembleTestCasesList(test)
+        ]
+    }
+    writer.write(collectedTests)
+    
+    # setup test coverage
+    if coverage:
+        if discover:
+            covname = os.path.join(discoveryStart, "unittest")
+        elif testFileName:
+            covname = os.path.splitext(
+                os.path.abspath(testFileName))[0]
+        else:
+            covname = "unittest"
+        covDataFile = "{0}.coverage".format(covname)
+        if not os.path.isabs(covDataFile):
+            covDataFile = os.path.abspath(covDataFile)
+        
+        from DebugClients.Python.coverage import coverage as cov
+        cover = cov(data_file=covDataFile)
+        if coverageErase:
+            cover.erase()
+    else:
+        cover = None
+    
+    testResult = EricTestResult(writer, failfast)
+    startTestRun = getattr(testResult, 'startTestRun', None)
+    if startTestRun is not None:
+        startTestRun()
+    try:
+        if cover:
+            cover.start()
+        test.run(testResult)
+    finally:
+        if cover:
+            cover.stop()
+            cover.save()
+            writer.write({
+                "event": "coverage",
+                "file": covDataFile,
+            })
+        stopTestRun = getattr(testResult, 'stopTestRun', None)
+        if stopTestRun is not None:
+            stopTestRun()
+    
+    writer.close()
+    sys.exit(0)
+
+if __name__ == '__main__':
+    if len(sys.argv) > 1:
+        command = sys.argv[1]
+        if command == "installed":
+            sys.exit(0)
+        
+        elif command == "versions":
+            import platform
+            versions = {
+                "name": "unittest",
+                "version": platform.python_version(),
+                "plugins": [],
+            }
+            print(json.dumps(versions))
+            sys.exit(0)
+        
+        elif command == "runtest":
+            runtest(sys.argv[2:])
+            sys.exit(0)
+    
+    sys.exit(42)
+
+#
+# eflag: noqa = M801

eric ide

mercurial