src/eric7/Utilities/BackgroundClient.py

branch
eric7
changeset 9221
bf71ee032bb4
parent 9209
b99e7fd55fd3
child 9348
f61d71d95cb1
equal deleted inserted replaced
9220:e9e7eca7efee 9221:bf71ee032bb4
22 22
23 class BackgroundClient: 23 class BackgroundClient:
24 """ 24 """
25 Class implementing the main part of the background client. 25 Class implementing the main part of the background client.
26 """ 26 """
27
27 def __init__(self, host, port, maxProcs): 28 def __init__(self, host, port, maxProcs):
28 """ 29 """
29 Constructor 30 Constructor
30 31
31 @param host ip address the background service is listening 32 @param host ip address the background service is listening
32 @type str 33 @type str
33 @param port port of the background service 34 @param port port of the background service
34 @type int 35 @type int
35 @param maxProcs maximum number of CPUs (processes) to use 36 @param maxProcs maximum number of CPUs (processes) to use
36 (0 = determined automatically) 37 (0 = determined automatically)
37 @type int 38 @type int
38 """ 39 """
39 self.services = {} 40 self.services = {}
40 self.batchServices = {} 41 self.batchServices = {}
41 42
42 self.connection = socket.create_connection((host, port)) 43 self.connection = socket.create_connection((host, port))
43 ver = b'Python3' 44 ver = b"Python3"
44 self.connection.sendall(ver) 45 self.connection.sendall(ver)
45 self.__maxProcs = maxProcs 46 self.__maxProcs = maxProcs
46 47
47 def __initClientService(self, fn, path, module): 48 def __initClientService(self, fn, path, module):
48 """ 49 """
49 Private method to import the given module and register it as service. 50 Private method to import the given module and register it as service.
50 51
51 @param fn service name to register 52 @param fn service name to register
52 @type str 53 @type str
53 @param path contains the path to the module 54 @param path contains the path to the module
54 @type str 55 @type str
55 @param module name to import 56 @param module name to import
60 sys.path.insert(1, path) 61 sys.path.insert(1, path)
61 try: 62 try:
62 importedModule = __import__(module, globals(), locals(), [], 0) 63 importedModule = __import__(module, globals(), locals(), [], 0)
63 self.services[fn] = importedModule.initService() 64 self.services[fn] = importedModule.initService()
64 with contextlib.suppress(AttributeError): 65 with contextlib.suppress(AttributeError):
65 self.batchServices["batch_" + fn] = ( 66 self.batchServices["batch_" + fn] = importedModule.initBatchService()
66 importedModule.initBatchService() 67 return "ok"
67 )
68 return 'ok'
69 except ImportError as err: 68 except ImportError as err:
70 return 'Import Error: ' + str(err) 69 return "Import Error: " + str(err)
71 except Exception as err: 70 except Exception as err:
72 return str(err) 71 return str(err)
73 72
74 def __send(self, fx, fn, data): 73 def __send(self, fx, fn, data):
75 """ 74 """
76 Private method to send a job response back to the BackgroundService 75 Private method to send a job response back to the BackgroundService
77 server. 76 server.
78 77
79 @param fx remote function name to execute 78 @param fx remote function name to execute
80 @type str 79 @type str
81 @param fn filename for identification 80 @param fn filename for identification
82 @type str 81 @type str
83 @param data return value(s) 82 @param data return value(s)
84 @type any basic datatype 83 @type any basic datatype
85 """ 84 """
86 if not isinstance(data, ( 85 if not isinstance(
87 dict, list, tuple, str, int, float, bool, type(None), 86 data,
88 )): 87 (
88 dict,
89 list,
90 tuple,
91 str,
92 int,
93 float,
94 bool,
95 type(None),
96 ),
97 ):
89 # handle sending of objects of unsupported types 98 # handle sending of objects of unsupported types
90 data = str(data) 99 data = str(data)
91 100
92 packedData = json.dumps([fx, fn, data]) 101 packedData = json.dumps([fx, fn, data])
93 packedData = bytes(packedData, 'utf-8') 102 packedData = bytes(packedData, "utf-8")
94 header = struct.pack( 103 header = struct.pack(b"!II", len(packedData), adler32(packedData) & 0xFFFFFFFF)
95 b'!II', len(packedData), adler32(packedData) & 0xffffffff)
96 self.connection.sendall(header) 104 self.connection.sendall(header)
97 self.connection.sendall(packedData) 105 self.connection.sendall(packedData)
98 106
99 def __receive(self, length): 107 def __receive(self, length):
100 """ 108 """
101 Private method to receive the given length of bytes. 109 Private method to receive the given length of bytes.
102 110
103 @param length bytes to receive 111 @param length bytes to receive
104 @type int 112 @type int
105 @return received bytes or None if connection closed 113 @return received bytes or None if connection closed
106 @rtype bytes 114 @rtype bytes
107 """ 115 """
108 data = b'' 116 data = b""
109 while len(data) < length: 117 while len(data) < length:
110 newData = self.connection.recv(length - len(data)) 118 newData = self.connection.recv(length - len(data))
111 if not newData: 119 if not newData:
112 return None 120 return None
113 data += newData 121 data += newData
114 return data 122 return data
115 123
116 def __peek(self, length): 124 def __peek(self, length):
117 """ 125 """
118 Private method to peek the given length of bytes. 126 Private method to peek the given length of bytes.
119 127
120 @param length bytes to receive 128 @param length bytes to receive
121 @type int 129 @type int
122 @return received bytes 130 @return received bytes
123 @rtype bytes 131 @rtype bytes
124 """ 132 """
125 data = b'' 133 data = b""
126 self.connection.setblocking(False) 134 self.connection.setblocking(False)
127 try: 135 try:
128 with contextlib.suppress(OSError): 136 with contextlib.suppress(OSError):
129 data = self.connection.recv(length, socket.MSG_PEEK) 137 data = self.connection.recv(length, socket.MSG_PEEK)
130 finally: 138 finally:
131 self.connection.setblocking(True) 139 self.connection.setblocking(True)
132 140
133 return data 141 return data
134 142
135 def __cancelled(self): 143 def __cancelled(self):
136 """ 144 """
137 Private method to check for a job cancellation. 145 Private method to check for a job cancellation.
138 146
139 @return flag indicating a cancellation 147 @return flag indicating a cancellation
140 @rtype bool 148 @rtype bool
141 """ 149 """
142 msg = self.__peek(struct.calcsize(b'!II') + 6) 150 msg = self.__peek(struct.calcsize(b"!II") + 6)
143 if msg[-6:] == b"CANCEL": 151 if msg[-6:] == b"CANCEL":
144 # get rid of the message data 152 # get rid of the message data
145 self.__receive(struct.calcsize(b'!II') + 6) 153 self.__receive(struct.calcsize(b"!II") + 6)
146 return True 154 return True
147 else: 155 else:
148 return False 156 return False
149 157
150 def run(self): 158 def run(self):
151 """ 159 """
152 Public method implementing the main loop of the client. 160 Public method implementing the main loop of the client.
153 161
154 @exception RuntimeError raised if hashes don't match 162 @exception RuntimeError raised if hashes don't match
155 """ 163 """
156 try: 164 try:
157 while True: 165 while True:
158 header = self.__receive(struct.calcsize(b'!II')) 166 header = self.__receive(struct.calcsize(b"!II"))
159 # Leave main loop if connection was closed. 167 # Leave main loop if connection was closed.
160 if not header: 168 if not header:
161 break 169 break
162 170
163 length, datahash = struct.unpack(b'!II', header) 171 length, datahash = struct.unpack(b"!II", header)
164 messageType = self.__receive(6) 172 messageType = self.__receive(6)
165 packedData = self.__receive(length) 173 packedData = self.__receive(length)
166 174
167 if messageType != b"JOB ": 175 if messageType != b"JOB ":
168 continue 176 continue
169 177
170 if adler32(packedData) & 0xffffffff != datahash: 178 if adler32(packedData) & 0xFFFFFFFF != datahash:
171 raise RuntimeError('Hashes not equal') 179 raise RuntimeError("Hashes not equal")
172 180
173 packedData = packedData.decode('utf-8') 181 packedData = packedData.decode("utf-8")
174 182
175 fx, fn, data = json.loads(packedData) 183 fx, fn, data = json.loads(packedData)
176 if fx == 'INIT': 184 if fx == "INIT":
177 ret = self.__initClientService(fn, *data) 185 ret = self.__initClientService(fn, *data)
178 elif fx.startswith("batch_"): 186 elif fx.startswith("batch_"):
179 callback = self.batchServices.get(fx) 187 callback = self.batchServices.get(fx)
180 if callback: 188 if callback:
181 callback(data, self.__send, fx, self.__cancelled, 189 callback(
182 maxProcesses=self.__maxProcs) 190 data,
191 self.__send,
192 fx,
193 self.__cancelled,
194 maxProcesses=self.__maxProcs,
195 )
183 ret = "__DONE__" 196 ret = "__DONE__"
184 else: 197 else:
185 ret = 'Unknown batch service.' 198 ret = "Unknown batch service."
186 else: 199 else:
187 callback = self.services.get(fx) 200 callback = self.services.get(fx)
188 if callback: 201 if callback:
189 ret = callback(fn, *data) 202 ret = callback(fn, *data)
190 else: 203 else:
191 ret = 'Unknown service.' 204 ret = "Unknown service."
192 205
193 if isinstance(ret, Exception): 206 if isinstance(ret, Exception):
194 ret = str(ret) 207 ret = str(ret)
195 208
196 self.__send(fx, fn, ret) 209 self.__send(fx, fn, ret)
197 except OSError: 210 except OSError:
198 pass 211 pass
199 except Exception: 212 except Exception:
200 exctype, excval, exctb = sys.exc_info() 213 exctype, excval, exctb = sys.exc_info()
201 tbinfofile = io.StringIO() 214 tbinfofile = io.StringIO()
202 traceback.print_tb(exctb, None, tbinfofile) 215 traceback.print_tb(exctb, None, tbinfofile)
203 tbinfofile.seek(0) 216 tbinfofile.seek(0)
204 tbinfo = tbinfofile.read() 217 tbinfo = tbinfofile.read()
205 del exctb 218 del exctb
206 self.__send( 219 self.__send("EXCEPTION", "?", [str(exctype), str(excval), tbinfo])
207 'EXCEPTION', '?', [str(exctype), str(excval), tbinfo]) 220
208
209 finally: 221 finally:
210 # Give time to process latest response on server side 222 # Give time to process latest response on server side
211 time.sleep(0.5) 223 time.sleep(0.5)
212 with contextlib.suppress(OSError): 224 with contextlib.suppress(OSError):
213 self.connection.shutdown(socket.SHUT_RDWR) 225 self.connection.shutdown(socket.SHUT_RDWR)
214 self.connection.close() 226 self.connection.close()
215 227
216 if __name__ == '__main__': 228
229 if __name__ == "__main__":
217 if len(sys.argv) != 5: 230 if len(sys.argv) != 5:
218 print('Host, port, max. processes and Python library path parameters' 231 print(
219 ' are missing. Aborting...') 232 "Host, port, max. processes and Python library path parameters"
233 " are missing. Aborting..."
234 )
220 sys.exit(1) 235 sys.exit(1)
221 236
222 host, port, maxProcs, pyLibraryPath = sys.argv[1:] 237 host, port, maxProcs, pyLibraryPath = sys.argv[1:]
223 238
224 # insert pyLibraryPath into the search path because external stuff might 239 # insert pyLibraryPath into the search path because external stuff might
225 # be installed in the eric (virtual) environment 240 # be installed in the eric (virtual) environment
226 sys.path.insert(1, pyLibraryPath) 241 sys.path.insert(1, pyLibraryPath)
227 242
228 backgroundClient = BackgroundClient(host, int(port), int(maxProcs)) 243 backgroundClient = BackgroundClient(host, int(port), int(maxProcs))
229 # Start the main loop 244 # Start the main loop
230 backgroundClient.run() 245 backgroundClient.run()
231 246
232 # 247 #

eric ide

mercurial