eric6/Utilities/BackgroundClient.py

changeset 6942
2602857055c5
parent 6645
ad476851d7e0
child 7249
0bf517e60f54
equal deleted inserted replaced
6941:f99d60d6b59b 6942:2602857055c5
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2013 - 2019 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5 # pylint: disable=C0103
6
7 """
8 Module implementing a Qt free version of a background client for the various
9 checkers and other python interpreter dependent functions.
10 """
11
12 from __future__ import unicode_literals
13 try:
14 bytes = unicode
15 import StringIO as io # __IGNORE_EXCEPTION__
16 except NameError:
17 import io # __IGNORE_WARNING__
18
19 import json
20 import socket
21 import struct
22 import sys
23 import time
24 import traceback
25 from zlib import adler32
26
27
28 class BackgroundClient(object):
29 """
30 Class implementing the main part of the background client.
31 """
32 def __init__(self, host, port, maxProcs):
33 """
34 Constructor of the BackgroundClient class.
35
36 @param host ip address the background service is listening
37 @type str
38 @param port port of the background service
39 @type int
40 @param maxProcs maximum number of CPUs (processes) to use
41 (0 = determined automatically)
42 @type int
43 """
44 self.services = {}
45 self.batchServices = {}
46
47 self.connection = socket.create_connection((host, port))
48 ver = b'Python2' if sys.version_info[0] == 2 else b'Python3'
49 self.connection.sendall(ver)
50 self.__maxProcs = maxProcs
51
52 def __initClientService(self, fn, path, module):
53 """
54 Private method to import the given module and register it as service.
55
56 @param fn service name to register (str)
57 @param path contains the path to the module (str)
58 @param module name to import (str)
59 @return text result of the import action (str)
60 """
61 sys.path.insert(1, path)
62 try:
63 importedModule = __import__(module, globals(), locals(), [], 0)
64 self.services[fn] = importedModule.initService()
65 try:
66 self.batchServices["batch_" + fn] = \
67 importedModule.initBatchService()
68 except AttributeError:
69 pass
70 return 'ok'
71 except ImportError:
72 return 'Import Error'
73
74 def __send(self, fx, fn, data):
75 """
76 Private method to send a job response back to the BackgroundService.
77
78 @param fx remote function name to execute (str)
79 @param fn filename for identification (str)
80 @param data return value(s) (any basic datatype)
81 """
82 packedData = json.dumps([fx, fn, data])
83 if sys.version_info[0] == 3:
84 packedData = bytes(packedData, 'utf-8')
85 header = struct.pack(
86 b'!II', len(packedData), adler32(packedData) & 0xffffffff)
87 self.connection.sendall(header)
88 self.connection.sendall(packedData)
89
90 def __receive(self, length):
91 """
92 Private methode to receive the given length of bytes.
93
94 @param length bytes to receive (int)
95 @return received bytes or None if connection closed (bytes)
96 """
97 data = b''
98 while len(data) < length:
99 newData = self.connection.recv(length - len(data))
100 if not newData:
101 return None
102 data += newData
103 return data
104
105 def __peek(self, length):
106 """
107 Private methode to peek the given length of bytes.
108
109 @param length bytes to receive (int)
110 @return received bytes (bytes)
111 """
112 data = b''
113 self.connection.setblocking(False)
114 try:
115 data = self.connection.recv(length, socket.MSG_PEEK)
116 except socket.error:
117 pass
118 finally:
119 self.connection.setblocking(True)
120
121 return data
122
123 def __cancelled(self):
124 """
125 Private method to check for a job cancellation.
126
127 @return flag indicating a cancellation (boolean)
128 """
129 msg = self.__peek(struct.calcsize(b'!II') + 6)
130 if msg[-6:] == b"CANCEL":
131 # get rid of the message data
132 self.__receive(struct.calcsize(b'!II') + 6)
133 return True
134 else:
135 return False
136
137 def run(self):
138 """
139 Public method implementing the main loop of the client.
140 """
141 try:
142 while True:
143 header = self.__receive(struct.calcsize(b'!II'))
144 # Leave main loop if connection was closed.
145 if not header:
146 break
147
148 length, datahash = struct.unpack(b'!II', header)
149 messageType = self.__receive(6)
150 packedData = self.__receive(length)
151
152 if messageType != b"JOB ":
153 continue
154
155 assert adler32(packedData) & 0xffffffff == datahash, \
156 'Hashes not equal'
157 if sys.version_info[0] == 3:
158 packedData = packedData.decode('utf-8')
159
160 fx, fn, data = json.loads(packedData)
161 if fx == 'INIT':
162 ret = self.__initClientService(fn, *data)
163 elif fx.startswith("batch_"):
164 callback = self.batchServices.get(fx)
165 if callback:
166 try:
167 callback(data, self.__send, fx, self.__cancelled,
168 maxProcesses=self.__maxProcs)
169 except TypeError:
170 # for backward compatibility
171 callback(data, self.__send, fx, self.__cancelled)
172 ret = "__DONE__"
173 else:
174 ret = 'Unknown batch service.'
175 else:
176 callback = self.services.get(fx)
177 if callback:
178 ret = callback(fn, *data)
179 else:
180 ret = 'Unknown service.'
181
182 self.__send(fx, fn, ret)
183 except socket.error:
184 pass
185 except Exception:
186 exctype, excval, exctb = sys.exc_info()
187 tbinfofile = io.StringIO()
188 traceback.print_tb(exctb, None, tbinfofile)
189 tbinfofile.seek(0)
190 tbinfo = tbinfofile.read()
191 del exctb
192 self.__send(
193 'EXCEPTION', '?', [str(exctype), str(excval), tbinfo])
194
195 finally:
196 # Give time to process latest response on server side
197 time.sleep(0.5)
198 self.connection.shutdown(socket.SHUT_RDWR)
199 self.connection.close()
200
201 if __name__ == '__main__':
202 if len(sys.argv) != 4:
203 print('Host, port and max. processes parameters are missing. Abort.')
204 sys.exit(1)
205
206 host, port, maxProcs = sys.argv[1:]
207 backgroundClient = BackgroundClient(host, int(port), int(maxProcs))
208 # Start the main loop
209 backgroundClient.run()
210
211 #
212 # eflag: noqa = M801

eric ide

mercurial