DebugClients/Python/DebugClientBase.py

branch
maintenance
changeset 6923
d062df8f1d9f
parent 6908
a56b500d7d2d
--- a/DebugClients/Python/DebugClientBase.py	Sat Mar 02 11:17:15 2019 +0100
+++ b/DebugClients/Python/DebugClientBase.py	Fri Apr 05 19:06:39 2019 +0200
@@ -19,6 +19,7 @@
 import re
 import atexit
 import signal
+import time
 
 
 import DebugClientCapabilities
@@ -800,10 +801,15 @@
         elif method == "RequestCompletion":
             self.__completionList(params["text"])
         
-        elif method == "RequestUTPrepare":
-            sys.path.insert(
-                0, os.path.dirname(os.path.abspath(params["filename"])))
-            os.chdir(sys.path[0])
+        elif method == "RequestUTDiscover":
+            if params["syspath"]:
+                sys.path = params["syspath"] + sys.path
+            
+            discoveryStart = params["discoverystart"]
+            if not discoveryStart:
+                discoveryStart = params["workdir"]
+            
+            os.chdir(params["discoverystart"])
             
             # set the system exception handling function to ensure, that
             # we report on all unhandled exceptions
@@ -812,19 +818,75 @@
             
             try:
                 import unittest
-                utModule = imp.load_source(
-                    params["testname"], params["filename"])
-                try:
-                    if params["failed"]:
-                        self.test = unittest.defaultTestLoader\
-                            .loadTestsFromNames(params["failed"], utModule)
+                testLoader = unittest.TestLoader()
+                test = testLoader.discover(discoveryStart)
+                if hasattr(testLoader, "errors") and \
+                   bool(testLoader.errors):
+                    self.sendJsonCommand("ResponseUTDiscover", {
+                        "testCasesList": [],
+                        "exception": "DiscoveryError",
+                        "message": "\n\n".join(testLoader.errors),
+                    })
+                else:
+                    testsList = self.__assembleTestCasesList(test,
+                                                             discoveryStart)
+                    self.sendJsonCommand("ResponseUTDiscover", {
+                        "testCasesList": testsList,
+                        "exception": "",
+                        "message": "",
+                    })
+            except Exception:
+                exc_type, exc_value, exc_tb = sys.exc_info()
+                self.sendJsonCommand("ResponseUTDiscover", {
+                    "testCasesList": [],
+                    "exception": exc_type.__name__,
+                    "message": str(exc_value),
+                })
+        
+        elif method == "RequestUTPrepare":
+            if params["syspath"]:
+                sys.path = params["syspath"] + sys.path
+            sys.path.insert(
+                0, os.path.dirname(os.path.abspath(params["filename"])))
+            if params["workdir"]:
+                os.chdir(params["workdir"])
+            else:
+                os.chdir(sys.path[0])
+            
+            # set the system exception handling function to ensure, that
+            # we report on all unhandled exceptions
+            sys.excepthook = self.__unhandled_exception
+            self.__interceptSignals()
+            
+            try:
+                import unittest
+                testLoader = unittest.TestLoader()
+                if params["discover"]:
+                    discoveryStart = params["discoverystart"]
+                    if not discoveryStart:
+                        discoveryStart = params["workdir"]
+                    if params["testcases"]:
+                        self.test = testLoader.loadTestsFromNames(
+                            params["testcases"])
                     else:
-                        self.test = unittest.defaultTestLoader\
-                            .loadTestsFromName(params["testfunctionname"],
-                                               utModule)
-                except AttributeError:
-                    self.test = unittest.defaultTestLoader\
-                        .loadTestsFromModule(utModule)
+                        self.test = testLoader.discover(discoveryStart)
+                else:
+                    if params["filename"]:
+                        utModule = imp.load_source(
+                            params["testname"], params["filename"])
+                    else:
+                        utModule = None
+                    if params["failed"]:
+                        if utModule:
+                            failed = [t.split(".", 1)[1]
+                                      for t in params["failed"]]
+                        else:
+                            failed = params["failed"][:]
+                        self.test = testLoader.loadTestsFromNames(
+                            failed, utModule)
+                    else:
+                        self.test = testLoader.loadTestsFromName(
+                            params["testfunctionname"], utModule)
             except Exception:
                 exc_type, exc_value, exc_tb = sys.exc_info()
                 self.sendJsonCommand("ResponseUTPrepared", {
@@ -846,6 +908,10 @@
             else:
                 self.cover = None
             
+            if params["debug"]:
+                Breakpoint.clear_all_breaks()
+                Watch.clear_all_watches()
+            
             self.sendJsonCommand("ResponseUTPrepared", {
                 "count": self.test.countTestCases(),
                 "exception": "",
@@ -854,14 +920,27 @@
         
         elif method == "RequestUTRun":
             from DCTestResult import DCTestResult
-            self.testResult = DCTestResult(self)
+            self.testResult = DCTestResult(self, params["failfast"])
             if self.cover:
                 self.cover.start()
-            self.test.run(self.testResult)
+            self.debugging = params["debug"]
+            if params["debug"]:
+                locals_ = locals()
+                self.threads.clear()
+                self.attachThread(mainThread=True)
+                sys.setprofile(None)
+                self.mainThread.run(
+                    "result = self.test.run(self.testResult)\n",
+                    localsDict=locals_)
+                result = locals_["result"]
+            else:
+                result = self.test.run(self.testResult)
             if self.cover:
                 self.cover.stop()
                 self.cover.save()
-            self.sendJsonCommand("ResponseUTFinished", {})
+            self.sendJsonCommand("ResponseUTFinished", {
+                "status": 0 if result.wasSuccessful() else 1,
+            })
         
         elif method == "RequestUTStop":
             self.testResult.stop()
@@ -871,6 +950,37 @@
             self.fork_child = (params["target"] == 'child')
             self.eventExit = True
     
+    def __assembleTestCasesList(self, suite, start):
+        """
+        Private method to assemble a list of test cases included in a test
+        suite.
+        
+        @param suite test suite to be inspected
+        @type unittest.TestSuite
+        @param start name of directory discovery was started at
+        @type str
+        @return list of tuples containing the test case ID, a short description
+            and the path of the test file name
+        @rtype list of tuples of (str, str, str)
+        """
+        import unittest
+        testCases = []
+        for test in suite:
+            if isinstance(test, unittest.TestSuite):
+                testCases.extend(self.__assembleTestCasesList(test, start))
+            else:
+                testId = test.id()
+                if "ModuleImportFailure" not in testId and \
+                   "LoadTestsFailure" not in testId and \
+                   "_FailedTest" not in testId:
+                    filename = os.path.join(
+                        start,
+                        test.__module__.replace(".", os.sep) + ".py")
+                    testCases.append(
+                        (test.id(), test.shortDescription(), filename)
+                    )
+        return testCases
+    
     def sendJsonCommand(self, method, params):
         """
         Public method to send a single command or response to the IDE.
@@ -1987,8 +2097,17 @@
             family = socket.AF_INET
         else:
             family = socket.AF_INET6
-        return socket.getaddrinfo(host, None, family,
-                                  socket.SOCK_STREAM)[0][4][0]
+        
+        retryCount = 0
+        while retryCount < 10:
+            try:
+                addrinfo = socket.getaddrinfo(
+                    host, None, family, socket.SOCK_STREAM)
+                return addrinfo[0][4][0]
+            except Exception:
+                retryCount += 1
+                time.sleep(3)
+        return None
     
     def main(self):
         """
@@ -2073,16 +2192,15 @@
                 except (ValueError, IndexError):
                     redirect = True
             
-            try:
-                ipOrHost = sys.argv[3]
-                if ':' in ipOrHost:
-                    remoteAddress = ipOrHost
-                elif ipOrHost[0] in '0123456789':
-                    remoteAddress = ipOrHost
-                else:
-                    remoteAddress = self.__resolveHost(ipOrHost)
-            except Exception:
-                remoteAddress = None
+            ipOrHost = sys.argv[3]
+            if ':' in ipOrHost:
+                # IPv6 address
+                remoteAddress = ipOrHost
+            elif ipOrHost[0] in '0123456789':
+                # IPv4 address
+                remoteAddress = ipOrHost
+            else:
+                remoteAddress = self.__resolveHost(ipOrHost)
             
             sys.argv = ['']
             if '' not in sys.path:

eric ide

mercurial