Mon, 05 Feb 2024 11:15:47 +0100
eric-ide Server
- Integrated the eric-ide server into the ViewManager and Editor file handling logic.
# -*- coding: utf-8 -*- # Copyright (c) 2024 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the file system request handler of the eric-ide server. """ import base64 import contextlib import os import stat import time from .EricRequestCategory import EricRequestCategory class EricServerFileSystemRequestHandler: """ Class implementing the file system request handler of the eric-ide server. """ def __init__(self, server): """ Constructor @param server reference to the eric-ide server object @type EricServer """ self.__server = server self.__requestMethodMapping = { "Chdir": self.__chdir, "Getcwd": self.__getcwd, "Listdir": self.__listdir, "Mkdir": self.__mkdir, "Rmdir": self.__rmdir, "Replace": self.__replace, "Remove": self.__remove, "Stat": self.__stat, "Exists": self.__exists, "Access": self.__access, "ReadFile": self.__readFile, "WriteFile": self.__writeFile, } def handleRequest(self, request, params, reqestUuid): """ Public method handling the received file system requests. @param request request name @type str @param params dictionary containing the request parameters @type dict @param reqestUuid UUID of the associated request as sent by the eric IDE @type str """ try: result = self.__requestMethodMapping[request](params) self.__server.sendJson( category=EricRequestCategory.FileSystem, reply=request, params=result, reqestUuid=reqestUuid, ) except KeyError: self.__server.sendJson( category=EricRequestCategory.FileSystem, reply=request, params={"Error": f"Request type '{request}' is not supported."}, ) def __chdir(self, params): """ Private method to change the current working directory. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ directory = params["directory"] try: os.chdir(directory) return {"ok": True} except OSError as err: return { "ok": False, "error": str(err), } def __getcwd(self, params): """ Private method to report the current working directory. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ return {"directory": os.getcwd()} def __listdir(self, params): """ Private method to report a directory listing. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ directory = params["directory"] if not directory: directory = os.getcwd() listing = [] for dirEntry in os.scandir(directory): filestat = dirEntry.stat() entry = { "name": dirEntry.name, "path": dirEntry.path, "is_dir": dirEntry.is_dir(), "is_file": dirEntry.is_file(), "is_link": dirEntry.is_symlink(), "mode": filestat.st_mode, "mode_str": stat.filemode(filestat.st_mode), "size": filestat.st_size, "mtime": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(filestat.st_mtime) ), } listing.append(entry) return { "directory": directory, "listing": listing, "separator": os.sep, } def __stat(self, params): """ Private method to get the status of a file. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ filename = params["filename"] statItems = params["st_names"] try: result = os.stat(filename) resultDict = {st: getattr(result, st) for st in statItems} return {"ok": True, "result": resultDict} except OSError as err: return { "ok": False, "error": str(err), } def __exists(self, params): """ Private method to check if a file or directory of the given name exists. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ return {"exists": os.path.exists(params["name"])} def __access(self, params): """ Private method to test, if the eric-ide server has the given access rights to a file or directory.. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ mode = os.F_OK for modeStr in params["modes"]: if modeStr == "read": mode |= os.R_OK elif modeStr == "write": mode |= os.W_OK elif modeStr == "execute": mode |= os.X_OK return {"ok": os.access(params["name"], mode)} def __mkdir(self, params): """ Private method to create a new directory. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ directory = params["directory"] try: os.makedirs(directory) return {"ok": True} except OSError as err: return { "ok": False, "error": str(err), } def __rmdir(self, params): """ Private method to delete a directory. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ directory = params["directory"] try: os.rmdir(directory) return {"ok": True} except OSError as err: return { "ok": False, "error": str(err), } def __replace(self, params): """ Private method to replace (rename) a file or directory. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ oldName = params["old_name"] newName = params["new_name"] try: os.replace(oldName, newName) return {"ok": True} except OSError as err: return { "ok": False, "error": str(err), } def __remove(self, params): """ Private method to delete a file. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ filename = params["filename"] try: os.remove(filename) return {"ok": True} except OSError as err: return { "ok": False, "error": str(err), } def __readFile(self, params): """ Private method to read the contents of a file. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ filename = params["filename"] if params["create"] and not os.path.exists(filename): with open(filename, "wb"): pass try: with open(filename, "rb") as f: data = f.read() return { "ok": True, "filedata": str(base64.b85encode(data), encoding="ascii") } except OSError as err: return { "ok": False, "error": str(err), } def __writeFile(self, params): """ Private method to write data into a file. @param params dictionary containing the request data @type dict @return dictionary containing the reply data @rtype dict """ filename = params["filename"] data = base64.b85decode(bytes(params["filedata"], encoding="ascii")) # 1. create backup file if asked for if params["with_backup"]: if os.path.islink(filename): filename = os.path.realpath(filename) backupFilename = "{0}~".format(filename) try: permissions = os.stat(filename).st_mode perms_valid = True except OSError: # if there was an error, ignore it perms_valid = False with contextlib.suppress(OSError): os.remove(backupFilename) with contextlib.suppress(OSError): os.rename(filename, backupFilename) # 2. write the data to the file and reset the permissions try: with open(filename, "wb") as f: f.write(data) if params["with_backup"] and perms_valid: os.chmod(filename, permissions) return {"ok": True} except OSError as err: return { "ok": False, "error": str(err), }