60 sys.path.insert(1, path) |
61 sys.path.insert(1, path) |
61 try: |
62 try: |
62 importedModule = __import__(module, globals(), locals(), [], 0) |
63 importedModule = __import__(module, globals(), locals(), [], 0) |
63 self.services[fn] = importedModule.initService() |
64 self.services[fn] = importedModule.initService() |
64 with contextlib.suppress(AttributeError): |
65 with contextlib.suppress(AttributeError): |
65 self.batchServices["batch_" + fn] = ( |
66 self.batchServices["batch_" + fn] = importedModule.initBatchService() |
66 importedModule.initBatchService() |
67 return "ok" |
67 ) |
|
68 return 'ok' |
|
69 except ImportError as err: |
68 except ImportError as err: |
70 return 'Import Error: ' + str(err) |
69 return "Import Error: " + str(err) |
71 except Exception as err: |
70 except Exception as err: |
72 return str(err) |
71 return str(err) |
73 |
72 |
74 def __send(self, fx, fn, data): |
73 def __send(self, fx, fn, data): |
75 """ |
74 """ |
76 Private method to send a job response back to the BackgroundService |
75 Private method to send a job response back to the BackgroundService |
77 server. |
76 server. |
78 |
77 |
79 @param fx remote function name to execute |
78 @param fx remote function name to execute |
80 @type str |
79 @type str |
81 @param fn filename for identification |
80 @param fn filename for identification |
82 @type str |
81 @type str |
83 @param data return value(s) |
82 @param data return value(s) |
84 @type any basic datatype |
83 @type any basic datatype |
85 """ |
84 """ |
86 if not isinstance(data, ( |
85 if not isinstance( |
87 dict, list, tuple, str, int, float, bool, type(None), |
86 data, |
88 )): |
87 ( |
|
88 dict, |
|
89 list, |
|
90 tuple, |
|
91 str, |
|
92 int, |
|
93 float, |
|
94 bool, |
|
95 type(None), |
|
96 ), |
|
97 ): |
89 # handle sending of objects of unsupported types |
98 # handle sending of objects of unsupported types |
90 data = str(data) |
99 data = str(data) |
91 |
100 |
92 packedData = json.dumps([fx, fn, data]) |
101 packedData = json.dumps([fx, fn, data]) |
93 packedData = bytes(packedData, 'utf-8') |
102 packedData = bytes(packedData, "utf-8") |
94 header = struct.pack( |
103 header = struct.pack(b"!II", len(packedData), adler32(packedData) & 0xFFFFFFFF) |
95 b'!II', len(packedData), adler32(packedData) & 0xffffffff) |
|
96 self.connection.sendall(header) |
104 self.connection.sendall(header) |
97 self.connection.sendall(packedData) |
105 self.connection.sendall(packedData) |
98 |
106 |
99 def __receive(self, length): |
107 def __receive(self, length): |
100 """ |
108 """ |
101 Private method to receive the given length of bytes. |
109 Private method to receive the given length of bytes. |
102 |
110 |
103 @param length bytes to receive |
111 @param length bytes to receive |
104 @type int |
112 @type int |
105 @return received bytes or None if connection closed |
113 @return received bytes or None if connection closed |
106 @rtype bytes |
114 @rtype bytes |
107 """ |
115 """ |
108 data = b'' |
116 data = b"" |
109 while len(data) < length: |
117 while len(data) < length: |
110 newData = self.connection.recv(length - len(data)) |
118 newData = self.connection.recv(length - len(data)) |
111 if not newData: |
119 if not newData: |
112 return None |
120 return None |
113 data += newData |
121 data += newData |
114 return data |
122 return data |
115 |
123 |
116 def __peek(self, length): |
124 def __peek(self, length): |
117 """ |
125 """ |
118 Private method to peek the given length of bytes. |
126 Private method to peek the given length of bytes. |
119 |
127 |
120 @param length bytes to receive |
128 @param length bytes to receive |
121 @type int |
129 @type int |
122 @return received bytes |
130 @return received bytes |
123 @rtype bytes |
131 @rtype bytes |
124 """ |
132 """ |
125 data = b'' |
133 data = b"" |
126 self.connection.setblocking(False) |
134 self.connection.setblocking(False) |
127 try: |
135 try: |
128 with contextlib.suppress(OSError): |
136 with contextlib.suppress(OSError): |
129 data = self.connection.recv(length, socket.MSG_PEEK) |
137 data = self.connection.recv(length, socket.MSG_PEEK) |
130 finally: |
138 finally: |
131 self.connection.setblocking(True) |
139 self.connection.setblocking(True) |
132 |
140 |
133 return data |
141 return data |
134 |
142 |
135 def __cancelled(self): |
143 def __cancelled(self): |
136 """ |
144 """ |
137 Private method to check for a job cancellation. |
145 Private method to check for a job cancellation. |
138 |
146 |
139 @return flag indicating a cancellation |
147 @return flag indicating a cancellation |
140 @rtype bool |
148 @rtype bool |
141 """ |
149 """ |
142 msg = self.__peek(struct.calcsize(b'!II') + 6) |
150 msg = self.__peek(struct.calcsize(b"!II") + 6) |
143 if msg[-6:] == b"CANCEL": |
151 if msg[-6:] == b"CANCEL": |
144 # get rid of the message data |
152 # get rid of the message data |
145 self.__receive(struct.calcsize(b'!II') + 6) |
153 self.__receive(struct.calcsize(b"!II") + 6) |
146 return True |
154 return True |
147 else: |
155 else: |
148 return False |
156 return False |
149 |
157 |
150 def run(self): |
158 def run(self): |
151 """ |
159 """ |
152 Public method implementing the main loop of the client. |
160 Public method implementing the main loop of the client. |
153 |
161 |
154 @exception RuntimeError raised if hashes don't match |
162 @exception RuntimeError raised if hashes don't match |
155 """ |
163 """ |
156 try: |
164 try: |
157 while True: |
165 while True: |
158 header = self.__receive(struct.calcsize(b'!II')) |
166 header = self.__receive(struct.calcsize(b"!II")) |
159 # Leave main loop if connection was closed. |
167 # Leave main loop if connection was closed. |
160 if not header: |
168 if not header: |
161 break |
169 break |
162 |
170 |
163 length, datahash = struct.unpack(b'!II', header) |
171 length, datahash = struct.unpack(b"!II", header) |
164 messageType = self.__receive(6) |
172 messageType = self.__receive(6) |
165 packedData = self.__receive(length) |
173 packedData = self.__receive(length) |
166 |
174 |
167 if messageType != b"JOB ": |
175 if messageType != b"JOB ": |
168 continue |
176 continue |
169 |
177 |
170 if adler32(packedData) & 0xffffffff != datahash: |
178 if adler32(packedData) & 0xFFFFFFFF != datahash: |
171 raise RuntimeError('Hashes not equal') |
179 raise RuntimeError("Hashes not equal") |
172 |
180 |
173 packedData = packedData.decode('utf-8') |
181 packedData = packedData.decode("utf-8") |
174 |
182 |
175 fx, fn, data = json.loads(packedData) |
183 fx, fn, data = json.loads(packedData) |
176 if fx == 'INIT': |
184 if fx == "INIT": |
177 ret = self.__initClientService(fn, *data) |
185 ret = self.__initClientService(fn, *data) |
178 elif fx.startswith("batch_"): |
186 elif fx.startswith("batch_"): |
179 callback = self.batchServices.get(fx) |
187 callback = self.batchServices.get(fx) |
180 if callback: |
188 if callback: |
181 callback(data, self.__send, fx, self.__cancelled, |
189 callback( |
182 maxProcesses=self.__maxProcs) |
190 data, |
|
191 self.__send, |
|
192 fx, |
|
193 self.__cancelled, |
|
194 maxProcesses=self.__maxProcs, |
|
195 ) |
183 ret = "__DONE__" |
196 ret = "__DONE__" |
184 else: |
197 else: |
185 ret = 'Unknown batch service.' |
198 ret = "Unknown batch service." |
186 else: |
199 else: |
187 callback = self.services.get(fx) |
200 callback = self.services.get(fx) |
188 if callback: |
201 if callback: |
189 ret = callback(fn, *data) |
202 ret = callback(fn, *data) |
190 else: |
203 else: |
191 ret = 'Unknown service.' |
204 ret = "Unknown service." |
192 |
205 |
193 if isinstance(ret, Exception): |
206 if isinstance(ret, Exception): |
194 ret = str(ret) |
207 ret = str(ret) |
195 |
208 |
196 self.__send(fx, fn, ret) |
209 self.__send(fx, fn, ret) |
197 except OSError: |
210 except OSError: |
198 pass |
211 pass |
199 except Exception: |
212 except Exception: |
200 exctype, excval, exctb = sys.exc_info() |
213 exctype, excval, exctb = sys.exc_info() |
201 tbinfofile = io.StringIO() |
214 tbinfofile = io.StringIO() |
202 traceback.print_tb(exctb, None, tbinfofile) |
215 traceback.print_tb(exctb, None, tbinfofile) |
203 tbinfofile.seek(0) |
216 tbinfofile.seek(0) |
204 tbinfo = tbinfofile.read() |
217 tbinfo = tbinfofile.read() |
205 del exctb |
218 del exctb |
206 self.__send( |
219 self.__send("EXCEPTION", "?", [str(exctype), str(excval), tbinfo]) |
207 'EXCEPTION', '?', [str(exctype), str(excval), tbinfo]) |
220 |
208 |
|
209 finally: |
221 finally: |
210 # Give time to process latest response on server side |
222 # Give time to process latest response on server side |
211 time.sleep(0.5) |
223 time.sleep(0.5) |
212 with contextlib.suppress(OSError): |
224 with contextlib.suppress(OSError): |
213 self.connection.shutdown(socket.SHUT_RDWR) |
225 self.connection.shutdown(socket.SHUT_RDWR) |
214 self.connection.close() |
226 self.connection.close() |
215 |
227 |
216 if __name__ == '__main__': |
228 |
|
229 if __name__ == "__main__": |
217 if len(sys.argv) != 5: |
230 if len(sys.argv) != 5: |
218 print('Host, port, max. processes and Python library path parameters' |
231 print( |
219 ' are missing. Aborting...') |
232 "Host, port, max. processes and Python library path parameters" |
|
233 " are missing. Aborting..." |
|
234 ) |
220 sys.exit(1) |
235 sys.exit(1) |
221 |
236 |
222 host, port, maxProcs, pyLibraryPath = sys.argv[1:] |
237 host, port, maxProcs, pyLibraryPath = sys.argv[1:] |
223 |
238 |
224 # insert pyLibraryPath into the search path because external stuff might |
239 # insert pyLibraryPath into the search path because external stuff might |
225 # be installed in the eric (virtual) environment |
240 # be installed in the eric (virtual) environment |
226 sys.path.insert(1, pyLibraryPath) |
241 sys.path.insert(1, pyLibraryPath) |
227 |
242 |
228 backgroundClient = BackgroundClient(host, int(port), int(maxProcs)) |
243 backgroundClient = BackgroundClient(host, int(port), int(maxProcs)) |
229 # Start the main loop |
244 # Start the main loop |
230 backgroundClient.run() |
245 backgroundClient.run() |
231 |
246 |
232 # |
247 # |