src/eric7/Plugins/VcsPlugins/vcsMercurial/HgClient.py

branch
eric7
changeset 9221
bf71ee032bb4
parent 9209
b99e7fd55fd3
child 9473
3f23dbf37dbe
diff -r e9e7eca7efee -r bf71ee032bb4 src/eric7/Plugins/VcsPlugins/vcsMercurial/HgClient.py
--- a/src/eric7/Plugins/VcsPlugins/vcsMercurial/HgClient.py	Wed Jul 13 11:16:20 2022 +0200
+++ b/src/eric7/Plugins/VcsPlugins/vcsMercurial/HgClient.py	Wed Jul 13 14:55:47 2022 +0200
@@ -10,9 +10,7 @@
 import struct
 import io
 
-from PyQt6.QtCore import (
-    QProcess, QObject, QByteArray, QCoreApplication, QThread
-)
+from PyQt6.QtCore import QProcess, QObject, QByteArray, QCoreApplication, QThread
 from PyQt6.QtWidgets import QDialog
 
 from .HgUtilities import prepareProcess, getHgExecutable
@@ -22,17 +20,18 @@
     """
     Class implementing the Mercurial command server interface.
     """
+
     InputFormat = ">I"
     OutputFormat = ">cI"
     OutputFormatSize = struct.calcsize(OutputFormat)
     ReturnFormat = ">i"
-    
+
     Channels = (b"I", b"L", b"o", b"e", b"r", b"d")
-    
+
     def __init__(self, repoPath, encoding, vcs, parent=None):
         """
         Constructor
-        
+
         @param repoPath root directory of the repository
         @type str
         @param encoding encoding to be used by the command server
@@ -43,7 +42,7 @@
         @type QObject
         """
         super().__init__(parent)
-        
+
         self.__server = None
         self.__started = False
         self.__version = None
@@ -51,7 +50,7 @@
         self.__cancel = False
         self.__commandRunning = False
         self.__repoPath = repoPath
-        
+
         # generate command line and environment
         self.__serverArgs = vcs.initCommand("serve")
         self.__serverArgs.append("--cmdserver")
@@ -61,44 +60,44 @@
         if repoPath:
             self.__serverArgs.append("--repository")
             self.__serverArgs.append(repoPath)
-        
+
         if encoding:
             self.__encoding = encoding
             if "--encoding" in self.__serverArgs:
                 # use the defined encoding via the environment
                 index = self.__serverArgs.index("--encoding")
-                del self.__serverArgs[index:index + 2]
-    
+                del self.__serverArgs[index : index + 2]
+
     def startServer(self):
         """
         Public method to start the command server.
-        
+
         @return tuple of flag indicating a successful start and an error
             message in case of failure
         @rtype tuple of (bool, str)
         """
         self.__server = QProcess()
         self.__server.setWorkingDirectory(self.__repoPath)
-        
+
         # connect signals
         self.__server.finished.connect(self.__serverFinished)
-        
+
         prepareProcess(self.__server, self.__encoding)
-        
+
         exe = getHgExecutable()
         self.__server.start(exe, self.__serverArgs)
         serverStarted = self.__server.waitForStarted(15000)
         if not serverStarted:
             return False, self.tr(
-                'The process {0} could not be started. '
-                'Ensure, that it is in the search path.'
+                "The process {0} could not be started. "
+                "Ensure, that it is in the search path."
             ).format(exe)
-        
+
         self.__server.setReadChannel(QProcess.ProcessChannel.StandardOutput)
         ok, error = self.__readHello()
         self.__started = ok
         return ok, error
-    
+
     def stopServer(self):
         """
         Public method to stop the command server.
@@ -112,26 +111,26 @@
                 if not res:
                     self.__server.kill()
                     self.__server.waitForFinished(3000)
-            
+
             self.__started = False
             self.__server.deleteLater()
             self.__server = None
-    
+
     def restartServer(self):
         """
         Public method to restart the command server.
-        
+
         @return tuple of flag indicating a successful start and an error
             message in case of failure
         @rtype tuple of (bool, str)
         """
         self.stopServer()
         return self.startServer()
-    
+
     def __readHello(self):
         """
         Private method to read the hello message sent by the command server.
-        
+
         @return tuple of flag indicating success and an error message in case
             of failure
         @rtype tuple of (bool, str)
@@ -141,70 +140,64 @@
             return False, self.tr("Did not receive the 'hello' message.")
         elif ch != "o":
             return False, self.tr("Received data on unexpected channel.")
-        
+
         msg = msg.split("\n")
-        
+
         if not msg[0].startswith("capabilities: "):
             return False, self.tr(
-                "Bad 'hello' message, expected 'capabilities: '"
-                " but got '{0}'.").format(msg[0])
-        self.__capabilities = msg[0][len('capabilities: '):]
+                "Bad 'hello' message, expected 'capabilities: '" " but got '{0}'."
+            ).format(msg[0])
+        self.__capabilities = msg[0][len("capabilities: ") :]
         if not self.__capabilities:
-            return False, self.tr("'capabilities' message did not contain"
-                                  " any capability.")
-        
+            return False, self.tr(
+                "'capabilities' message did not contain" " any capability."
+            )
+
         self.__capabilities = set(self.__capabilities.split())
         if "runcommand" not in self.__capabilities:
             return False, "'capabilities' did not contain 'runcommand'."
-        
+
         if not msg[1].startswith("encoding: "):
             return False, self.tr(
-                "Bad 'hello' message, expected 'encoding: '"
-                " but got '{0}'.").format(msg[1])
-        encoding = msg[1][len('encoding: '):]
+                "Bad 'hello' message, expected 'encoding: '" " but got '{0}'."
+            ).format(msg[1])
+        encoding = msg[1][len("encoding: ") :]
         if not encoding:
-            return False, self.tr("'encoding' message did not contain"
-                                  " any encoding.")
+            return False, self.tr("'encoding' message did not contain" " any encoding.")
         self.__encoding = encoding
-        
+
         return True, ""
-    
+
     def __serverFinished(self, exitCode, exitStatus):
         """
         Private slot connected to the finished signal.
-        
+
         @param exitCode exit code of the process
         @type int
         @param exitStatus exit status of the process
         @type QProcess.ExitStatus
         """
         self.__started = False
-    
+
     def __readChannel(self):
         """
         Private method to read data from the command server.
-        
+
         @return tuple of channel designator and channel data
         @rtype tuple of (str, int or str or bytes)
         """
-        if (
-            self.__server.bytesAvailable() > 0 or
-            self.__server.waitForReadyRead(10000)
-        ):
+        if self.__server.bytesAvailable() > 0 or self.__server.waitForReadyRead(10000):
             data = bytes(self.__server.peek(HgClient.OutputFormatSize))
             if not data or len(data) < HgClient.OutputFormatSize:
                 return "", ""
-            
+
             channel, length = struct.unpack(HgClient.OutputFormat, data)
             channel = channel.decode(self.__encoding)
             if channel in "IL":
                 self.__server.read(HgClient.OutputFormatSize)
                 return channel, length
             else:
-                if (
-                    self.__server.bytesAvailable() <
-                    HgClient.OutputFormatSize + length
-                ):
+                if self.__server.bytesAvailable() < HgClient.OutputFormatSize + length:
                     return "", ""
                 self.__server.read(HgClient.OutputFormatSize)
                 data = self.__server.read(length)
@@ -214,11 +207,11 @@
                     return (channel, str(data, self.__encoding, "replace"))
         else:
             return "", ""
-    
+
     def __writeDataBlock(self, data):
         """
         Private slot to write some data to the command server.
-        
+
         @param data data to be sent
         @type str
         """
@@ -227,11 +220,11 @@
         self.__server.write(struct.pack(HgClient.InputFormat, len(data)))
         self.__server.write(data)
         self.__server.waitForBytesWritten()
-    
+
     def __runcommand(self, args, inputChannels, outputChannels):
         """
         Private method to run a command in the server (low level).
-        
+
         @param args list of arguments for the command
         @type list of str
         @param inputChannels dictionary of input channels. The dictionary must
@@ -250,24 +243,24 @@
         """
         if not self.__started:
             return -1
-        
-        self.__server.write(QByteArray(b'runcommand\n'))
-        self.__writeDataBlock('\0'.join(args))
-        
+
+        self.__server.write(QByteArray(b"runcommand\n"))
+        self.__writeDataBlock("\0".join(args))
+
         while True:
             QCoreApplication.processEvents()
-            
+
             if self.__cancel:
                 return -10
-            
+
             if self.__server is None:
                 return -1
-            
+
             if self.__server.bytesAvailable() == 0:
                 QThread.msleep(50)
                 continue
             channel, data = self.__readChannel()
-            
+
             # input channels
             if channel in inputChannels:
                 if channel == "L":
@@ -278,28 +271,29 @@
                 else:
                     inputData = inputChannels[channel](data)
                 self.__writeDataBlock(inputData)
-            
+
             # output channels
             elif channel in outputChannels:
                 outputChannels[channel](data)
-            
+
             # result channel, command is finished
             elif channel == "r":
                 return struct.unpack(HgClient.ReturnFormat, data)[0]
-            
+
             # unexpected but required channel
             elif channel.isupper():
                 raise RuntimeError(
-                    "Unexpected but required channel '{0}'.".format(channel))
-            
+                    "Unexpected but required channel '{0}'.".format(channel)
+                )
+
             # optional channels or no channel at all
             else:
                 pass
-    
+
     def __prompt(self, size, message):
         """
         Private method to prompt the user for some input.
-        
+
         @param size maximum length of the requested input
         @type int
         @param message message sent by the server
@@ -309,19 +303,19 @@
         @rtype tuple of (str, bool)
         """
         from .HgClientPromptDialog import HgClientPromptDialog
+
         inputData = ""
         isPassword = False
         dlg = HgClientPromptDialog(size, message)
         if dlg.exec() == QDialog.DialogCode.Accepted:
-            inputData = dlg.getInput() + '\n'
+            inputData = dlg.getInput() + "\n"
             isPassword = dlg.isPassword()
         return inputData, isPassword
-    
-    def runcommand(self, args, prompt=None, inputData=None, output=None,
-                   error=None):
+
+    def runcommand(self, args, prompt=None, inputData=None, output=None, error=None):
         """
         Public method to execute a command via the command server.
-        
+
         @param args list of arguments for the command
         @type list of str
         @param prompt function to reply to prompts by the server. It
@@ -350,12 +344,12 @@
             ok, startError = self.startServer()
             if not ok:
                 return "", startError
-        
+
         self.__commandRunning = True
         outputChannels = {}
         outputBuffer = None
         errorBuffer = None
-        
+
         if output is None:
             outputBuffer = io.StringIO()
             outputChannels["o"] = outputBuffer.write
@@ -366,63 +360,70 @@
         else:
             errorBuffer = io.StringIO()
             outputChannels["e"] = errorBuffer.write
-        
+
         inputChannels = {}
         if prompt is not None:
+
             def func(size):
                 msg = "" if outputBuffer is None else outputBuffer.getvalue()
                 reply, isPassword = prompt(size, msg)
                 return reply, isPassword
+
             inputChannels["L"] = func
         else:
+
             def myprompt(size):
-                msg = (self.tr("For message see output dialog.")
-                       if outputBuffer is None else outputBuffer.getvalue())
+                msg = (
+                    self.tr("For message see output dialog.")
+                    if outputBuffer is None
+                    else outputBuffer.getvalue()
+                )
                 reply, isPassword = self.__prompt(size, msg)
                 return reply, isPassword
+
             inputChannels["L"] = myprompt
         if inputData is not None:
             inputChannels["I"] = inputData
-        
+
         self.__cancel = False
         self.__runcommand(args, inputChannels, outputChannels)
-        
+
         out = outputBuffer.getvalue() if outputBuffer else ""
         err = errorBuffer.getvalue() if errorBuffer else ""
-        
+
         self.__commandRunning = False
-        
+
         return out, err
-    
+
     def cancel(self):
         """
         Public method to cancel the running command.
         """
         self.__cancel = True
         self.restartServer()
-    
+
     def wasCanceled(self):
         """
         Public method to check, if the last command was canceled.
-        
+
         @return flag indicating the cancel state
         @rtype bool
         """
         return self.__cancel
-    
+
     def isExecuting(self):
         """
         Public method to check, if the server is executing a command.
-        
+
         @return flag indicating the execution of a command
         @rtype bool
         """
         return self.__commandRunning
-    
+
     def getRepository(self):
         """
         Public method to get the repository path this client is serving.
-        
+
         @return repository path
         @rtype str
         """

eric ide

mercurial