src/eric7/RemoteServerInterface/EricServerFileSystemInterface.py

branch
server
changeset 10631
00f5aae565a3
parent 10610
bb0149571d94
child 10633
dda7e43934dc
--- a/src/eric7/RemoteServerInterface/EricServerFileSystemInterface.py	Fri Mar 08 15:30:23 2024 +0100
+++ b/src/eric7/RemoteServerInterface/EricServerFileSystemInterface.py	Fri Mar 08 15:30:53 2024 +0100
@@ -13,13 +13,17 @@
 import re
 import stat
 
-from PyQt6.QtCore import QEventLoop, QObject, pyqtSlot
+from PyQt6.QtCore import QByteArray, QEventLoop, QObject, pyqtSlot
 
 from eric7 import Utilities
 from eric7.RemoteServer.EricRequestCategory import EricRequestCategory
 from eric7.SystemUtilities import FileSystemUtilities
 
 
+_RemoteFsCache = {}
+# dictionary containing cached remote file system data keyed by remote path
+
+
 class EricServerNotConnectedError(OSError):
     """
     Class defining a special OSError indicating a missing server connection.
@@ -29,7 +33,7 @@
         """
         Constructor
         """
-        super().__init("Not connected to an 'eric-ide' server.")
+        super().__init__("Not connected to an 'eric-ide' server.")
 
 
 class EricServerFileSystemInterface(QObject):
@@ -82,8 +86,6 @@
         if connected:
             if not bool(self.__serverPathSep):
                 self.__serverPathSep = self.__getPathSep()
-        else:
-            self.__serverPathSep = ""
 
     def __getPathSep(self):
         """
@@ -203,13 +205,15 @@
         else:
             return False, EricServerFileSystemInterface.NotConnectedMessage
 
-    def listdir(self, directory=""):
+    def listdir(self, directory="", recursive=False):
         """
         Public method to get a directory listing.
 
         @param directory directory to be listed. An empty directory means to list
             the eric-ide server current directory. (defaults to "")
         @type str (optional)
+        @param recursive flag indicating a recursive listing (defaults to False)
+        @type bool (optional)
         @return tuple containing the listed directory, the path separator and the
             directory listing. Each directory listing entry contains a dictionary
             with the relevant data.
@@ -252,7 +256,10 @@
             self.__serverInterface.sendJson(
                 category=EricRequestCategory.FileSystem,
                 request="Listdir",
-                params={"directory": FileSystemUtilities.plainFileName(directory)},
+                params={
+                    "directory": FileSystemUtilities.plainFileName(directory),
+                    "recursive": recursive,
+                },
                 callback=callback,
             )
 
@@ -293,7 +300,7 @@
         @param recursive flag indicating a recursive search (defaults to True)
         @type bool (optional)
         @param dirsonly flag indicating to return only directories. When True it has
-            precedence over the 'filesonly' parameter ((defaults to False)
+            precedence over the 'filesonly' parameter (defaults to False)
         @type bool
         @return list of all files and directories in the tree rooted at path.
             The names are expanded to start with the given directory name.
@@ -443,9 +450,12 @@
         @return flag indicating a directory
         @rtype bool
         """
-        with contextlib.suppress(KeyError, OSError):
-            result = self.stat(name, ["st_mode"])
-            return stat.S_ISDIR(result["st_mode"])
+        try:
+            return stat.S_ISDIR(_RemoteFsCache[name]["mode"])
+        except KeyError:
+            with contextlib.suppress(KeyError, OSError):
+                result = self.stat(name, ["st_mode"])
+                return stat.S_ISDIR(result["st_mode"])
 
         return False
 
@@ -458,9 +468,12 @@
         @return flag indicating a regular file
         @rtype bool
         """
-        with contextlib.suppress(KeyError, OSError):
-            result = self.stat(name, ["st_mode"])
-            return stat.S_ISREG(result["st_mode"])
+        try:
+            return stat.S_ISREG(_RemoteFsCache[name]["mode"])
+        except KeyError:
+            with contextlib.suppress(KeyError, OSError):
+                result = self.stat(name, ["st_mode"])
+                return stat.S_ISREG(result["st_mode"])
 
         return False
 
@@ -491,6 +504,9 @@
                 nameExists = params["exists"]
                 loop.quit()
 
+        if name in _RemoteFsCache:
+            return True
+
         if self.__serverInterface.isServerConnected():
             self.__serverInterface.sendJson(
                 category=EricRequestCategory.FileSystem,
@@ -598,6 +614,8 @@
             )
 
             loop.exec()
+            if ok:
+                self.populateFsCache(directory)
             return ok, error
 
         else:
@@ -649,6 +667,8 @@
             )
 
             loop.exec()
+            if ok:
+                self.populateFsCache(directory)
             return ok, error
 
         else:
@@ -693,6 +713,8 @@
             )
 
             loop.exec()
+            if ok:
+                self.removeFromFsCache(directory)
             return ok, error
 
         else:
@@ -742,6 +764,12 @@
             )
 
             loop.exec()
+            if ok:
+                with contextlib.suppress(KeyError):
+                    entry = _RemoteFsCache.pop(oldName)
+                    entry["path"] = newName
+                    entry["name"] = self.basename(newName)
+                    _RemoteFsCache[newName] = entry
             return ok, error
 
         else:
@@ -786,6 +814,9 @@
             )
 
             loop.exec()
+            if ok:
+                with contextlib.suppress(KeyError):
+                    del _RemoteFsCache[filename]
             return ok, error
 
         else:
@@ -859,14 +890,11 @@
         @return flag indicating an absolute path
         @rtype bool
         """
-        if self.__serverInterface.isServerConnected():
-            if self.__serverPathSep == "\\":
-                s = FileSystemUtilities.plainFileName(p)[:3].replace("/", "\\")
-                return s.startswith("\\)") or s.startswith(":\\", 1)
-            else:
-                return FileSystemUtilities.plainFileName(p).startswith("/")
+        if self.__serverPathSep == "\\":
+            s = FileSystemUtilities.plainFileName(p)[:3].replace("/", "\\")
+            return s.startswith("\\)") or s.startswith(":\\", 1)
         else:
-            return os.path.isabs(p)
+            return FileSystemUtilities.plainFileName(p).startswith("/")
 
     def abspath(self, p):
         """
@@ -877,13 +905,10 @@
         @return absolute path
         @rtype str
         """
-        if self.__serverInterface.isServerConnected():
-            p = FileSystemUtilities.plainFileName(p)
-            if not self.isabs(p):
-                p = self.join(self.getcwd(), p)
-            return FileSystemUtilities.remoteFileName(p)
-        else:
-            return os.path.abspath(p)
+        p = FileSystemUtilities.plainFileName(p)
+        if not self.isabs(p):
+            p = self.join(self.getcwd(), p)
+        return FileSystemUtilities.remoteFileName(p)
 
     def join(self, a, *p):
         """
@@ -897,19 +922,15 @@
         @return joined path name
         @rtype str
         """
-        if self.__serverInterface.isServerConnected():
-            path = a
-            for b in p:
-                if b.startswith(self.__serverPathSep):
-                    path = b
-                elif not path or path.endswith(self.__serverPathSep):
-                    path += b
-                else:
-                    path += self.__serverPathSep + b
-            return path
-
-        else:
-            return os.path.join(a, *p)
+        path = a
+        for b in p:
+            if b.startswith(self.__serverPathSep):
+                path = b
+            elif not path or path.endswith(self.__serverPathSep):
+                path += b
+            else:
+                path += self.__serverPathSep + b
+        return path
 
     def split(self, p):
         """
@@ -921,22 +942,18 @@
             path separator.
         @rtype tuple of (str, str)
         """
-        if self.__serverInterface.isServerConnected():
-            if self.__serverPathSep == "\\":
-                # remote is a Windows system
-                normp = p.replace("/", "\\")
-            else:
-                # remote is a Posix system
-                normp = p.replace("\\", "/")
+        if self.__serverPathSep == "\\":
+            # remote is a Windows system
+            normp = p.replace("/", "\\")
+        else:
+            # remote is a Posix system
+            normp = p.replace("\\", "/")
 
-            i = normp.rfind(self.__serverPathSep) + 1
-            head, tail = normp[:i], normp[i:]
-            if head and head != self.__serverPathSep * len(head):
-                head = head.rstrip(self.__serverPathSep)
-            return head, tail
-
-        else:
-            return os.path.split(p)
+        i = normp.rfind(self.__serverPathSep) + 1
+        head, tail = normp[:i], normp[i:]
+        if head and head != self.__serverPathSep * len(head):
+            head = head.rstrip(self.__serverPathSep)
+        return head, tail
 
     def splitext(self, p):
         """
@@ -958,23 +975,19 @@
         @return tuple containing the drive letter (incl. colon) and the path
         @rtype tuple of (str, str)
         """
-        if self.__serverInterface.isServerConnected():
-            plainp = FileSystemUtilities.plainFileName(p)
+        plainp = FileSystemUtilities.plainFileName(p)
 
-            if self.__serverPathSep == "\\":
-                # remote is a Windows system
-                normp = plainp.replace("/", "\\")
-                if normp[1:2] == ":":
-                    return normp[:2], normp[2:]
-                else:
-                    return "", normp
+        if self.__serverPathSep == "\\":
+            # remote is a Windows system
+            normp = plainp.replace("/", "\\")
+            if normp[1:2] == ":":
+                return normp[:2], normp[2:]
             else:
-                # remote is a Posix system
-                normp = plainp.replace("\\", "/")
                 return "", normp
-
         else:
-            return os.path.splitdrive(p)
+            # remote is a Posix system
+            normp = plainp.replace("\\", "/")
+            return "", normp
 
     def dirname(self, p):
         """
@@ -1028,7 +1041,7 @@
     ## Methods for reading and writing files
     #######################################################################
 
-    def readFile(self, filename, create=False):
+    def readFile(self, filename, create=False, newline=None):
         """
         Public method to read a file from the eric-ide server.
 
@@ -1037,6 +1050,9 @@
         @param create flag indicating to create an empty file, if it does not exist
             (defaults to False)
         @type bool (optional)
+        @param newline determines how to parse newline characters from the stream
+            (defaults to None)
+        @type str (optional)
         @return bytes data read from the eric-ide server
         @rtype bytes
         @exception EricServerNotConnectedError raised to indicate a missing server
@@ -1079,6 +1095,7 @@
                 params={
                     "filename": FileSystemUtilities.plainFileName(filename),
                     "create": create,
+                    "newline": "<<none>>" if newline is None else newline,
                 },
                 callback=callback,
             )
@@ -1089,17 +1106,20 @@
 
             return bText
 
-    def writeFile(self, filename, data, withBackup=False):
+    def writeFile(self, filename, data, withBackup=False, newline=None):
         """
         Public method to write the data to a file on the eric-ide server.
 
         @param filename name of the file to write
         @type str
         @param data data to be written
-        @type bytes
+        @type bytes or QByteArray
         @param withBackup flag indicating to create a backup file first
             (defaults to False)
         @type bool (optional)
+        @param newline determines how to parse newline characters from the stream
+            (defaults to None)
+        @type str (optional)
         @exception EricServerNotConnectedError raised to indicate a missing server
             connection
         @exception OSError raised in case the server reported an issue
@@ -1129,6 +1149,8 @@
             raise EricServerNotConnectedError
 
         else:
+            if isinstance(data, QByteArray):
+                data = bytes(data)
             self.__serverInterface.sendJson(
                 category=EricRequestCategory.FileSystem,
                 request="WriteFile",
@@ -1136,6 +1158,7 @@
                     "filename": FileSystemUtilities.plainFileName(filename),
                     "filedata": str(base64.b85encode(data), encoding="ascii"),
                     "with_backup": withBackup,
+                    "newline": "<<none>>" if newline is None else newline,
                 },
                 callback=callback,
             )
@@ -1315,6 +1338,9 @@
             )
 
             loop.exec()
+            if ok:
+                self.removeFromFsCache(pathname)
+
             if not ok:
                 raise OSError(error)
 
@@ -1364,3 +1390,41 @@
                 return cpath
             tail = tail[1:]
         return ""
+
+    #######################################################################
+    ## Remote file system cache methods.
+    #######################################################################
+
+    # TODO: change cache when file renamed/moved/deleted/...
+    def populateFsCache(self, directory):
+        """
+        Public method to populate the remote file system cache for a given directory.
+
+        @param directory remote directory to be cached
+        @type str
+        @exception ValueError raised to indicate an empty directory
+        """
+        if not directory:
+            raise ValueError("The directory to be cached must not be empty.")
+
+        try:
+            listing = self.listdir(directory=directory, recursive=True)[2]
+            for entry in listing:
+                _RemoteFsCache[
+                    FileSystemUtilities.remoteFileName(entry["path"])
+                ] = entry
+            print(f"Remote Cache Size: {len(_RemoteFsCache)} entries")
+        except OSError as err:
+            print("error in 'populateFsCache()':", str(err))
+
+    def removeFromFsCache(self, directory):
+        """
+        Public method to remove a given directory from the remote file system cache.
+
+        @param directory remote directory to be removed
+        @type str
+        """
+        for entryPath in list(_RemoteFsCache.keys()):
+            if entryPath.startswith(directory):
+                del _RemoteFsCache[entryPath]
+        print(f"Remote Cache Size: {len(_RemoteFsCache)} entries")

eric ide

mercurial