src/eric7/RemoteServerInterface/EricServerFileSystemInterface.py

Mon, 19 Feb 2024 15:33:33 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Mon, 19 Feb 2024 15:33:33 +0100
branch
server
changeset 10589
75b656c80a40
parent 10585
83e5a9a64543
child 10596
ea35c92a3c7c
permissions
-rw-r--r--

Added methods to read and write files with a given encoding to the eric-ide server file system interface and adapted the code accordingly.

# -*- coding: utf-8 -*-

# Copyright (c) 2024 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the file system interface to the eric-ide server.
"""

import base64
import contextlib
import os
import re
import stat

from PyQt6.QtCore import QEventLoop, QObject, pyqtSlot

from eric7 import Utilities
from eric7.RemoteServer.EricRequestCategory import EricRequestCategory
from eric7.SystemUtilities import FileSystemUtilities


class EricServerFileSystemInterface(QObject):
    """
    Class implementing the file system interface to the eric-ide server.
    """

    _MagicCheck = re.compile("([*?[])")

    def __init__(self, serverInterface):
        """
        Constructor

        @param serverInterface reference to the eric-ide server interface
        @type EricServerInterface
        """
        super().__init__(parent=serverInterface)

        self.__serverInterface = serverInterface
        self.__serverInterface.connectionStateChanged.connect(
            self.__connectionStateChanged
        )

        self.__serverPathSep = self.__getPathSep()

    def __hasMagic(self, pathname):
        """
        Private method to check, if a given path contains glob style magic characters.

        Note: This was taken from 'glob.glob'.

        @param pathname path name to be checked
        @type str
        @return flag indicating the presence of magic characters
        @rtype bool
        """
        match = self._MagicCheck.search(pathname)
        return match is not None

    @pyqtSlot(bool)
    def __connectionStateChanged(self, connected):
        """
        Private slot handling a change of the server connection state.

        @param connected flag indicating a connected state
        @type bool
        """
        if connected:
            if not bool(self.__serverPathSep):
                self.__serverPathSep = self.__getPathSep()
        else:
            self.__serverPathSep = ""

    def __getPathSep(self):
        """
        Private method to get the path separator of the connected server.
        """
        loop = QEventLoop()
        sep = ""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal sep

            if reply == "GetPathSep":
                sep = params["separator"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="GetPathSep",
                params={},
                callback=callback,
            )

            loop.exec()

        return sep

    def getcwd(self):
        """
        Public method to get the current working directory of the eric-ide server.

        @return current working directory of the eric-ide server
        @rtype str
        """
        loop = QEventLoop()
        cwd = ""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal cwd

            if reply == "Getcwd":
                cwd = params["directory"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Getcwd",
                params={},
                callback=callback,
            )

            loop.exec()

        return cwd

    def chdir(self, directory):
        """
        Public method to change the current working directory of the eric-ide server.

        @param directory absolute path of the working directory to change to
        @type str
        @return tuple containing an OK flag and an error string in case of an issue
        @rtype tuple of (bool, str)
        """
        loop = QEventLoop()
        ok = False
        error = ""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal ok, error

            if reply == "Chdir":
                ok = params["ok"]
                with contextlib.suppress(KeyError):
                    error = params["error"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Chdir",
                params={"directory": FileSystemUtilities.plainFileName(directory)},
                callback=callback,
            )

            loop.exec()
            return ok, error

        else:
            return False, "Not connected to an 'eric-ide' server."

    def listdir(self, directory=""):
        """
        Public method to get a directory listing.

        @param directory directory to be listed. An empty directory means to list
            the eric-ide server current directory. (defaults to "")
        @type str (optional)
        @return tuple containing the listed directory, the path separartor and the
            directory listing. Each directory listing entry contains a dictionary
            with the relevant data.
        @rtype tuple of (str, str, dict)
        @exception OSError raised in case the server reported an issue
        """
        if directory is None:
            # sanitize the directory in case it is None
            directory = ""

        loop = QEventLoop()
        ok = False
        error = ""
        listedDirectory = ""
        separator = ""
        listing = []

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal listedDirectory, listing, separator, ok, error

            if reply == "Listdir":
                ok = params["ok"]
                if ok:
                    listedDirectory = params["directory"]
                    listing = params["listing"]
                    separator = params["separator"]
                else:
                    error = params["error"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Listdir",
                params={"directory": FileSystemUtilities.plainFileName(directory)},
                callback=callback,
            )

            loop.exec()
            if not ok:
                raise OSError(error)

        return listedDirectory, separator, listing

    def direntries(
        self,
        directory,
        filesonly=False,
        pattern=None,
        followsymlinks=True,
        ignore=None,
        recursive=True,
        dirsonly=False,
    ):
        """
        Public method to get a list of all files and directories of a given directory.

        @param directory root of the tree to check
        @type str
        @param filesonly flag indicating that only files are wanted (defaults to False)
        @type bool (optional)
        @param pattern a filename pattern or list of filename patterns to check
            against (defaults to None)
        @type str or list of str (optional)
        @param followsymlinks flag indicating whether symbolic links should be
            followed (defaults to True)
        @type bool (optional)
        @param ignore list of entries to be ignored (defaults to None)
        @type list of str (optional)
        @param recursive flag indicating a recursive search (defaults to True)
        @type bool (optional)
        @param dirsonly flag indicating to return only directories. When True it has
            precedence over the 'filesonly' parameter ((defaults to False)
        @type bool
        @return list of all files and directories in the tree rooted at path.
            The names are expanded to start with the given directory name.
        @rtype list of str
        @exception OSError raised in case the server reported an issue
        """
        loop = QEventLoop()
        ok = False
        error = ""
        result = []

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal result, ok, error

            if reply == "DirEntries":
                ok = params["ok"]
                if ok:
                    result = params["result"]
                else:
                    error = params["error"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="DirEntries",
                params={
                    "directory": FileSystemUtilities.plainFileName(directory),
                    "files_only": filesonly,
                    "pattern": [] if pattern is None else pattern,
                    "follow_symlinks": followsymlinks,
                    "ignore": [] if ignore is None else ignore,
                    "recursive": recursive,
                    "dirs_only": dirsonly,
                },
                callback=callback,
            )

            loop.exec()
            if not ok:
                raise OSError(error)

        return result

    def glob(self, pathname, recursive=False, includeHidden=False):
        """
        Public method to get a list of of all files matching a given pattern
        like 'glob.glob()'.

        @param pathname path name pattern with simple shell-style wildcards
        @type str
        @param recursive flag indicating a recursive list (defaults to False)
        @type bool (optional)
        @param includeHidden flag indicating to include hidden files (defaults to False)
        @type bool (optional)
        @return list of all files matching the pattern
        @rtype list of str
        """
        result = []

        pathname = FileSystemUtilities.plainFileName(pathname)
        dirname, basename = os.path.split(pathname)
        if dirname and not self.__hasMagic(dirname):
            with contextlib.suppress(OSError):
                entries = self.direntries(
                    dirname, pattern=basename, recursive=recursive, filesonly=True
                )
                if includeHidden:
                    result = entries
                else:
                    result = [e for e in entries if not e.startswith(".")]

        return result

    def stat(self, filename, stNames):
        """
        Public method to get the status of a file.

        @param filename name of the file
        @type str
        @param stNames list of 'stat_result' members to retrieve
        @type list of str
        @return dictionary containing the requested status data
        @rtype dict
        @exception OSError raised in case the server reported an issue
        """
        loop = QEventLoop()
        ok = False
        error = ""
        stResult = {}

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal ok, error, stResult

            if reply == "Stat":
                ok = params["ok"]
                if ok:
                    stResult = params["result"]
                else:
                    error = params["error"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Stat",
                params={
                    "filename": FileSystemUtilities.plainFileName(filename),
                    "st_names": stNames,
                },
                callback=callback,
            )

            loop.exec()
            if not ok:
                raise OSError(error)

        return stResult

    def isdir(self, name):
        """
        Public method to check, if the given name is a directory.

        @param name name to be checked
        @type str
        @return flag indicating a directory
        @rtype bool
        """
        with contextlib.suppress(KeyError, OSError):
            result = self.stat(name, ["st_mode"])
            return stat.S_ISDIR(result["st_mode"])

        return False

    def isfile(self, name):
        """
        Public method to check, if the given name is a regular file.

        @param name name to be checked
        @type str
        @return flag indicating a regular file
        @rtype bool
        """
        with contextlib.suppress(KeyError, OSError):
            result = self.stat(name, ["st_mode"])
            return stat.S_ISREG(result["st_mode"])

        return False

    def exists(self, name):
        """
        Public method the existence of a file or directory.

        @param name name of the file or directory
        @type str
        @return flag indicating the file existence
        @rtype bool
        """
        loop = QEventLoop()
        nameExists = False

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal nameExists

            if reply == "Exists":
                nameExists = params["exists"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Exists",
                params={"name": FileSystemUtilities.plainFileName(name)},
                callback=callback,
            )

            loop.exec()

        return nameExists

    def access(self, name, modes):
        """
        Public method to test the given access rights to a file or directory.

        The modes to check for are 'read', 'write' or 'execute' or any combination.

        @param name name of the file or directory
        @type str
        @param modes list of modes to check for
        @type str or list of str
        @return flag indicating the user has the asked for permissions
        @rtype bool
        @exception ValueError raised for an illegal modes list
        """
        if not modes:
            raise ValueError(
                "At least one of 'read', 'write' or 'execute' must be specified."
            )

        if isinstance(modes, str):
            # convert to a list with one element
            modes = [modes]

        loop = QEventLoop()
        accessOK = False

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal accessOK

            if reply == "Access":
                accessOK = params["ok"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Access",
                params={
                    "name": FileSystemUtilities.plainFileName(name),
                    "modes": modes,
                },
                callback=callback,
            )

            loop.exec()

        return accessOK

    def mkdir(self, directory):
        """
        Public method to create a new directory on the eric-ide server.

        @param directory absolute path of the new directory
        @type str
        @return tuple containing an OK flag and an error string in case of an issue
        @rtype tuple of (bool, str)
        """
        loop = QEventLoop()
        ok = False
        error = ""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal ok, error

            if reply == "Mkdir":
                ok = params["ok"]
                with contextlib.suppress(KeyError):
                    error = params["error"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Mkdir",
                params={"directory": FileSystemUtilities.plainFileName(directory)},
                callback=callback,
            )

            loop.exec()
            return ok, error

        else:
            return False, "Not connected to an 'eric-ide' server."

    def rmdir(self, directory):
        """
        Public method to delete a directory on the eric-ide server.

        @param directory absolute path of the directory
        @type str
        @return tuple containing an OK flag and an error string in case of an issue
        @rtype tuple of (bool, str)
        """
        loop = QEventLoop()
        ok = False
        error = ""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal ok, error

            if reply == "Rmdir":
                ok = params["ok"]
                with contextlib.suppress(KeyError):
                    error = params["error"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Rmdir",
                params={"directory": FileSystemUtilities.plainFileName(directory)},
                callback=callback,
            )

            loop.exec()
            return ok, error

        else:
            return False, "Not connected to an 'eric-ide' server."

    def replace(self, oldName, newName):
        """
        Public method to rename a file or directory.

        @param oldName current name of the file or directory
        @type str
        @param newName new name for the file or directory
        @type str
        @return tuple containing an OK flag and an error string in case of an issue
        @rtype tuple of (bool, str)
        """
        loop = QEventLoop()
        ok = False
        error = ""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal ok, error

            if reply == "Replace":
                ok = params["ok"]
                with contextlib.suppress(KeyError):
                    error = params["error"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Replace",
                params={
                    "old_name": FileSystemUtilities.plainFileName(oldName),
                    "new_name": FileSystemUtilities.plainFileName(newName),
                },
                callback=callback,
            )

            loop.exec()
            return ok, error

        else:
            return False, "Not connected to an 'eric-ide' server."

    def remove(self, filename):
        """
        Public method to delete a file on the eric-ide server.

        @param filename absolute path of the file
        @type str
        @return tuple containing an OK flag and an error string in case of an issue
        @rtype tuple of (bool, str)
        """
        loop = QEventLoop()
        ok = False
        error = ""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal ok, error

            if reply == "Remove":
                ok = params["ok"]
                with contextlib.suppress(KeyError):
                    error = params["error"]
                loop.quit()

        if self.__serverInterface.isServerConnected():
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="Remove",
                params={"filename": FileSystemUtilities.plainFileName(filename)},
                callback=callback,
            )

            loop.exec()
            return ok, error

        else:
            return False, "Not connected to an 'eric-ide' server."

    #######################################################################
    ## Methods for splitting or joining remote path names.
    ##
    ## These are simplified variants of the os.path functions. If the
    ## 'eric-ide' server is not connected, the os.path functions are used.
    #######################################################################

    def separator(self):
        """
        Public method to return the server side path separator string.

        @return path separator
        @rtype str
        """
        return self.__serverPathSep

    def join(self, a, *p):
        """
        Public method to join two or more path name components using the path separator
        of the server side.

        @param a first path component
        @type str
        @param *p list of additional path components
        @type list of str
        @return joined path name
        @rtype str
        """
        if self.__serverInterface.isServerConnected():
            path = a
            for b in p:
                if b.startswith(self.__serverPathSep):
                    path = b
                elif not path or path.endswith(self.__serverPathSep):
                    path += b
                else:
                    path += self.__serverPathSep + b
            return path

        else:
            return os.path.join(a, *p)

    def split(self, p):
        """
        Public method to split a path name.

        @param p path name to be split
        @type str
        @return tuple containing head and tail, where tail is everything after the last
            path separator.
        @rtype tuple of (str, str)
        """
        if self.__serverInterface.isServerConnected():
            if self.__serverPathSep == "\\":
                # remote is a Windows system
                normp = p.replace("/", "\\")
            else:
                # remote is a Posix system
                normp = p.replace("\\", "/")

            i = normp.rfind(self.__serverPathSep) + 1
            head, tail = normp[:i], normp[i:]
            if head and head != self.__serverPathSep * len(head):
                head = head.rstrip(self.__serverPathSep)
            return head, tail

        else:
            return os.path.split(p)

    def splitext(self, p):
        """
        Public method to split a path name into a root part and an extension.

        @param p path name to be split
        @type str
        @return tuple containing the root part and the extension
        @rtype tuple of (str, str)
        """
        return os.path.splitext(p)

    def splitdrive(self, p):
        """
        Public method to split a path into drive and path.

        @param p path name to be split
        @type str
        @return tuple containing the drive letter (incl. colon) and the path
        @rtype tuple of (str, str)
        """
        if self.__serverInterface.isServerConnected():
            plainp = FileSystemUtilities.plainFileName(p)

            if self.__serverPathSep == "\\":
                # remote is a Windows system
                normp = plainp.replace("/", "\\")
                if normp[1:2] == ":":
                    return normp[:2], normp[2:]
                else:
                    return "", normp
            else:
                # remote is a Posix system
                normp = plainp.replace("\\", "/")
                return "", normp

        else:
            return os.path.splitdrive(p)

    def dirname(self, p):
        """
        Public method to extract the directory component of a path name.

        @param p path name
        @type str
        @return directory component
        @rtype str
        """
        return self.split(p)[0]

    def basename(self, p):
        """
        Public method to extract the final component of a path name.

        @param p path name
        @type str
        @return final component
        @rtype str
        """
        return self.split(p)[1]

    #######################################################################
    ## Methods for reading and writing files
    #######################################################################

    def readFile(self, filename, create=False):
        """
        Public method to read a file from the eric-ide server.

        @param filename name of the file to read
        @type str
        @param create flag indicating to create an empty file, if it does not exist
            (defaults to False)
        @type bool (optional)
        @return bytes data read from the eric-ide server
        @rtype bytes
        @exception OSError raised in case the server reported an issue
        """
        loop = QEventLoop()
        ok = False
        error = ""
        bText = b""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal ok, error, bText

            if reply == "ReadFile":
                ok = params["ok"]
                if ok:
                    bText = base64.b85decode(
                        bytes(params["filedata"], encoding="ascii")
                    )
                else:
                    error = params["error"]
                loop.quit()

        if not self.__serverInterface.isServerConnected():
            raise OSError("Not connected to an 'eric-ide' server.")

        else:
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="ReadFile",
                params={
                    "filename": FileSystemUtilities.plainFileName(filename),
                    "create": create,
                },
                callback=callback,
            )

            loop.exec()
            if not ok:
                raise OSError(error)

            return bText

    def writeFile(self, filename, data, withBackup=False):
        """
        Public method to write the data to a file on the eric-ide server.

        @param filename name of the file to write
        @type str
        @param data data to be written
        @type bytes
        @param withBackup flag indicating to create a backup file first
            (defaults to False)
        @type bool (optional)
        @exception OSError raised in case the server reported an issue
        """
        loop = QEventLoop()
        ok = False
        error = ""

        def callback(reply, params):
            """
            Function to handle the server reply

            @param reply name of the server reply
            @type str
            @param params dictionary containing the reply data
            @type dict
            """
            nonlocal ok, error

            if reply == "WriteFile":
                ok = params["ok"]
                with contextlib.suppress(KeyError):
                    error = params["error"]
                loop.quit()

        if not self.__serverInterface.isServerConnected():
            raise OSError("Not connected to an 'eric-ide' server.")

        else:
            self.__serverInterface.sendJson(
                category=EricRequestCategory.FileSystem,
                request="WriteFile",
                params={
                    "filename": FileSystemUtilities.plainFileName(filename),
                    "filedata": str(base64.b85encode(data), encoding="ascii"),
                    "with_backup": withBackup,
                },
                callback=callback,
            )

            loop.exec()
            if not ok:
                raise OSError(error)

    def readEncodedFile(self, filename, create=False):
        """
        Function to read a file and decode its contents into proper text.

        @param filename name of the file to read
        @type str
        @param create flag indicating to create an empty file, if it does not exist
            (defaults to False)
        @type bool (optional)
        @return tuple of decoded text and encoding
        @rtype tuple of (str, str)
        """
        data = self.readFile(filename, create=create)
        return Utilities.decode(data)

    def readEncodedFileWithEncoding(self, filename, encoding, create=False):
        """
        Function to read a file and decode its contents into proper text.

        @param filename name of the file to read
        @type str
        @param encoding encoding to be used to read the file
        @type str
        @param create flag indicating to create an empty file, if it does not exist
            (defaults to False)
        @type bool (optional)
        @return tuple of decoded text and encoding
        @rtype tuple of (str, str)
        """
        data = self.readFile(filename, create=create)
        return Utilities.decodeWithEncoding(data, encoding)

    def writeEncodedFile(self, filename, text, origEncoding, forcedEncoding="", withBackup=False):
        """
        Function to write a file with properly encoded text.

        @param filename name of the file to read
        @type str
        @param text text to be written
        @type str
        @param origEncoding type of the original encoding
        @type str
        @param forcedEncoding encoding to be used for writing, if no coding
            line is present (defaults to "")
        @type str (optional)
        @param withBackup flag indicating to create a backup file first
            (defaults to False)
        @type bool (optional)
        @return encoding used for writing the file
        @rtype str
        """
        data, encoding = Utilities.encode(
            text, origEncoding, forcedEncoding=forcedEncoding
        )
        self.writeFile(filename, data, withBackup=withBackup)

        return encoding

eric ide

mercurial