src/eric7/RemoteServer/EricServerFileSystemRequestHandler.py

Mon, 05 Feb 2024 13:46:48 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Mon, 05 Feb 2024 13:46:48 +0100
branch
server
changeset 10548
d3e21f44887b
parent 10546
300487f5f517
child 10555
08e853c0c77b
permissions
-rw-r--r--

Improved the 'listdir()' remote server method in order to report issues.

# -*- coding: utf-8 -*-

# Copyright (c) 2024 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the file system request handler of the eric-ide server.
"""

import base64
import contextlib
import os
import stat
import time

from .EricRequestCategory import EricRequestCategory

class EricServerFileSystemRequestHandler:
    """
    Class implementing the file system request handler of the eric-ide server.
    """

    def __init__(self, server):
        """
        Constructor

        @param server reference to the eric-ide server object
        @type EricServer
        """
        self.__server = server
        
        self.__requestMethodMapping = {
            "Chdir": self.__chdir,
            "Getcwd": self.__getcwd,
            "Listdir": self.__listdir,
            "Mkdir": self.__mkdir,
            "Rmdir": self.__rmdir,
            "Replace": self.__replace,
            "Remove": self.__remove,
            "Stat": self.__stat,
            "Exists": self.__exists,
            "Access": self.__access,

            "ReadFile": self.__readFile,
            "WriteFile": self.__writeFile,
        }

    def handleRequest(self, request, params, reqestUuid):
        """
        Public method handling the received file system requests.

        @param request request name
        @type str
        @param params dictionary containing the request parameters
        @type dict
        @param reqestUuid UUID of the associated request as sent by the eric IDE
        @type str
        """
        try:
            result = self.__requestMethodMapping[request](params)
            self.__server.sendJson(
                category=EricRequestCategory.FileSystem,
                reply=request,
                params=result,
                reqestUuid=reqestUuid,
            )

        except KeyError:
            self.__server.sendJson(
                category=EricRequestCategory.FileSystem,
                reply=request,
                params={"Error": f"Request type '{request}' is not supported."},
            )

    def __chdir(self, params):
        """
        Private method to change the current working directory.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        directory = params["directory"]
        try:
            os.chdir(directory)
            return {"ok": True}
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

    def __getcwd(self, params):
        """
        Private method to report the current working directory.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        return {"directory": os.getcwd()}

    def __listdir(self, params):
        """
        Private method to report a directory listing.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        directory = params["directory"]
        if not directory:
            directory = os.getcwd()

        try:
            listing = []
            for dirEntry in os.scandir(directory):
                filestat = dirEntry.stat()
                entry = {
                    "name": dirEntry.name,
                    "path": dirEntry.path,
                    "is_dir": dirEntry.is_dir(),
                    "is_file": dirEntry.is_file(),
                    "is_link": dirEntry.is_symlink(),
                    "mode": filestat.st_mode,
                    "mode_str": stat.filemode(filestat.st_mode),
                    "size": filestat.st_size,
                    "mtime": time.strftime(
                        "%Y-%m-%d %H:%M:%S", time.localtime(filestat.st_mtime)
                    ),
                }
                listing.append(entry)

            return {
                "ok": True,
                "directory": directory,
                "listing": listing,
                "separator": os.sep,
            }
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

    def __stat(self, params):
        """
        Private method to get the status of a file.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        filename = params["filename"]
        statItems = params["st_names"]

        try:
            result = os.stat(filename)
            resultDict = {st: getattr(result, st) for st in statItems}
            return {"ok": True, "result": resultDict}
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

    def __exists(self, params):
        """
        Private method to check if a file or directory of the given name exists.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        return {"exists": os.path.exists(params["name"])}

    def __access(self, params):
        """
        Private method to test, if the eric-ide server has the given access rights
        to a file or directory..

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        mode = os.F_OK
        for modeStr in params["modes"]:
            if modeStr == "read":
                mode |= os.R_OK
            elif modeStr == "write":
                mode |= os.W_OK
            elif modeStr == "execute":
                mode |= os.X_OK

        return {"ok": os.access(params["name"], mode)}

    def __mkdir(self, params):
        """
        Private method to create a new directory.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        directory = params["directory"]

        try:
            os.makedirs(directory)
            return {"ok": True}
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

    def __rmdir(self, params):
        """
        Private method to delete a directory.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        directory = params["directory"]

        try:
            os.rmdir(directory)
            return {"ok": True}
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

    def __replace(self, params):
        """
        Private method to replace (rename) a file or directory.


        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        oldName = params["old_name"]
        newName = params["new_name"]

        try:
            os.replace(oldName, newName)
            return {"ok": True}
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

    def __remove(self, params):
        """
        Private method to delete a file.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        filename = params["filename"]

        try:
            os.remove(filename)
            return {"ok": True}
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

    def __readFile(self, params):
        """
        Private method to read the contents of a file.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        filename = params["filename"]

        if params["create"] and not os.path.exists(filename):
            with open(filename, "wb"):
                pass

        try:
            with open(filename, "rb") as f:
                data = f.read()
            return {
                "ok": True,
                "filedata": str(base64.b85encode(data), encoding="ascii")
            }
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

    def __writeFile(self, params):
        """
        Private method to write data into a file.

        @param params dictionary containing the request data
        @type dict
        @return dictionary containing the reply data
        @rtype dict
        """
        filename = params["filename"]
        data = base64.b85decode(bytes(params["filedata"], encoding="ascii"))

        # 1. create backup file if asked for
        if params["with_backup"]:
            if os.path.islink(filename):
                filename = os.path.realpath(filename)
            backupFilename = "{0}~".format(filename)
            try:
                permissions = os.stat(filename).st_mode
                perms_valid = True
            except OSError:
                # if there was an error, ignore it
                perms_valid = False
            with contextlib.suppress(OSError):
                os.remove(backupFilename)
            with contextlib.suppress(OSError):
                os.rename(filename, backupFilename)

        # 2. write the data to the file and reset the permissions
        try:
            with open(filename, "wb") as f:
                f.write(data)
            if params["with_backup"] and perms_valid:
                os.chmod(filename, permissions)
            return {"ok": True}
        except OSError as err:
            return {
                "ok": False,
                "error": str(err),
            }

eric ide

mercurial