eric6/Utilities/BackgroundClient.py

Sat, 05 Dec 2020 18:02:17 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 05 Dec 2020 18:02:17 +0100
branch
multi_processing
changeset 7853
35dcac32984a
parent 7802
eefe954f01e8
parent 7836
2f0d208b8137
child 7923
91e843545d9a
permissions
-rw-r--r--

Merged with the default branch.

# -*- coding: utf-8 -*-

# Copyright (c) 2013 - 2020 Detlev Offenbach <detlev@die-offenbachs.de>
#
# pylint: disable=C0103

"""
Module implementing a Qt free version of a background client for the various
checkers and other python interpreter dependent functions.
"""

import io
import json
import socket
import struct
import sys
import time
import traceback
from zlib import adler32


class BackgroundClient(object):
    """
    Class implementing the main part of the background client.
    """
    def __init__(self, host, port, maxProcs):
        """
        Constructor of the BackgroundClient class.
        
        @param host ip address the background service is listening
        @type str
        @param port port of the background service
        @type int
        @param maxProcs maximum number of CPUs (processes) to use
            (0 = determined automatically)
        @type int
        """
        self.services = {}
        self.batchServices = {}
        
        self.connection = socket.create_connection((host, port))
        ver = b'Python3'
        self.connection.sendall(ver)
        self.__maxProcs = maxProcs

    def __initClientService(self, fn, path, module):
        """
        Private method to import the given module and register it as service.
        
        @param fn service name to register (str)
        @param path contains the path to the module (str)
        @param module name to import (str)
        @return text result of the import action (str)
        """
        sys.path.insert(1, path)
        try:
            importedModule = __import__(module, globals(), locals(), [], 0)
            self.services[fn] = importedModule.initService()
            try:
                self.batchServices["batch_" + fn] = (
                    importedModule.initBatchService()
                )
            except AttributeError:
                pass
            return 'ok'
        except ImportError:
            return 'Import Error'

    def __send(self, fx, fn, data):
        """
        Private method to send a job response back to the BackgroundService.
        
        @param fx remote function name to execute (str)
        @param fn filename for identification (str)
        @param data return value(s) (any basic datatype)
        """
        if not isinstance(data, (
            dict, list, tuple, str, int, float, bool, type(None),
        )):
            # handle sending of objects of unsupported types
            data = str(data)
        
        packedData = json.dumps([fx, fn, data])
        packedData = bytes(packedData, 'utf-8')
        header = struct.pack(
            b'!II', len(packedData), adler32(packedData) & 0xffffffff)
        self.connection.sendall(header)
        self.connection.sendall(packedData)

    def __receive(self, length):
        """
        Private methode to receive the given length of bytes.
        
        @param length bytes to receive (int)
        @return received bytes or None if connection closed (bytes)
        """
        data = b''
        while len(data) < length:
            newData = self.connection.recv(length - len(data))
            if not newData:
                return None
            data += newData
        return data
    
    def __peek(self, length):
        """
        Private methode to peek the given length of bytes.
        
        @param length bytes to receive (int)
        @return received bytes (bytes)
        """
        data = b''
        self.connection.setblocking(False)
        try:
            data = self.connection.recv(length, socket.MSG_PEEK)
        except OSError:
            pass
        finally:
            self.connection.setblocking(True)
        
        return data
    
    def __cancelled(self):
        """
        Private method to check for a job cancellation.
        
        @return flag indicating a cancellation (boolean)
        """
        msg = self.__peek(struct.calcsize(b'!II') + 6)
        if msg[-6:] == b"CANCEL":
            # get rid of the message data
            self.__receive(struct.calcsize(b'!II') + 6)
            return True
        else:
            return False
    
    def run(self):
        """
        Public method implementing the main loop of the client.
        
        @exception RuntimeError raised if hashes don't match
        """
        try:
            while True:
                header = self.__receive(struct.calcsize(b'!II'))
                # Leave main loop if connection was closed.
                if not header:
                    break
                
                length, datahash = struct.unpack(b'!II', header)
                messageType = self.__receive(6)
                packedData = self.__receive(length)
                
                if messageType != b"JOB   ":
                    continue
                
                if adler32(packedData) & 0xffffffff != datahash:
                    raise RuntimeError('Hashes not equal')
                
                packedData = packedData.decode('utf-8')
                
                fx, fn, data = json.loads(packedData)
                if fx == 'INIT':
                    ret = self.__initClientService(fn, *data)
                elif fx.startswith("batch_"):
                    callback = self.batchServices.get(fx)
                    if callback:
                        try:
                            callback(data, self.__send, fx, self.__cancelled,
                                     maxProcesses=self.__maxProcs)
                        except TypeError:
                            # for backward compatibility
                            callback(data, self.__send, fx, self.__cancelled)
                        ret = "__DONE__"
                    else:
                        ret = 'Unknown batch service.'
                else:
                    callback = self.services.get(fx)
                    if callback:
                        ret = callback(fn, *data)
                    else:
                        ret = 'Unknown service.'
                
                if isinstance(ret, Exception):
                    ret = str(ret)
                
                self.__send(fx, fn, ret)
        except OSError:
            pass
        except Exception:
            exctype, excval, exctb = sys.exc_info()
            tbinfofile = io.StringIO()
            traceback.print_tb(exctb, None, tbinfofile)
            tbinfofile.seek(0)
            tbinfo = tbinfofile.read()
            del exctb
            self.__send(
                'EXCEPTION', '?', [str(exctype), str(excval), tbinfo])
        
        finally:
            # Give time to process latest response on server side
            time.sleep(0.5)
            self.connection.shutdown(socket.SHUT_RDWR)
            self.connection.close()

if __name__ == '__main__':
    if len(sys.argv) != 4:
        print('Host, port and max. processes parameters are missing. Abort.')
        sys.exit(1)
    
    host, port, maxProcs = sys.argv[1:]
    backgroundClient = BackgroundClient(host, int(port), int(maxProcs))
    # Start the main loop
    backgroundClient.run()

#
# eflag: noqa = M801

eric ide

mercurial