|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2013 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 # pylint: disable=C0103 |
|
6 |
|
7 """ |
|
8 Module implementing a Qt free version of a background client for the various |
|
9 checkers and other python interpreter dependent functions. |
|
10 """ |
|
11 |
|
12 import io |
|
13 import json |
|
14 import socket |
|
15 import struct |
|
16 import sys |
|
17 import time |
|
18 import traceback |
|
19 import contextlib |
|
20 from zlib import adler32 |
|
21 |
|
22 |
|
23 class BackgroundClient: |
|
24 """ |
|
25 Class implementing the main part of the background client. |
|
26 """ |
|
27 def __init__(self, host, port, maxProcs): |
|
28 """ |
|
29 Constructor |
|
30 |
|
31 @param host ip address the background service is listening |
|
32 @type str |
|
33 @param port port of the background service |
|
34 @type int |
|
35 @param maxProcs maximum number of CPUs (processes) to use |
|
36 (0 = determined automatically) |
|
37 @type int |
|
38 """ |
|
39 self.services = {} |
|
40 self.batchServices = {} |
|
41 |
|
42 self.connection = socket.create_connection((host, port)) |
|
43 ver = b'Python3' |
|
44 self.connection.sendall(ver) |
|
45 self.__maxProcs = maxProcs |
|
46 |
|
47 def __initClientService(self, fn, path, module): |
|
48 """ |
|
49 Private method to import the given module and register it as service. |
|
50 |
|
51 @param fn service name to register |
|
52 @type str |
|
53 @param path contains the path to the module |
|
54 @type str |
|
55 @param module name to import |
|
56 @type str |
|
57 @return text result of the import action |
|
58 @rtype str |
|
59 """ |
|
60 sys.path.insert(1, path) |
|
61 try: |
|
62 importedModule = __import__(module, globals(), locals(), [], 0) |
|
63 self.services[fn] = importedModule.initService() |
|
64 with contextlib.suppress(AttributeError): |
|
65 self.batchServices["batch_" + fn] = ( |
|
66 importedModule.initBatchService() |
|
67 ) |
|
68 return 'ok' |
|
69 except ImportError as err: |
|
70 return 'Import Error: ' + str(err) |
|
71 except Exception as err: |
|
72 return str(err) |
|
73 |
|
74 def __send(self, fx, fn, data): |
|
75 """ |
|
76 Private method to send a job response back to the BackgroundService |
|
77 server. |
|
78 |
|
79 @param fx remote function name to execute |
|
80 @type str |
|
81 @param fn filename for identification |
|
82 @type str |
|
83 @param data return value(s) |
|
84 @type any basic datatype |
|
85 """ |
|
86 if not isinstance(data, ( |
|
87 dict, list, tuple, str, int, float, bool, type(None), |
|
88 )): |
|
89 # handle sending of objects of unsupported types |
|
90 data = str(data) |
|
91 |
|
92 packedData = json.dumps([fx, fn, data]) |
|
93 packedData = bytes(packedData, 'utf-8') |
|
94 header = struct.pack( |
|
95 b'!II', len(packedData), adler32(packedData) & 0xffffffff) |
|
96 self.connection.sendall(header) |
|
97 self.connection.sendall(packedData) |
|
98 |
|
99 def __receive(self, length): |
|
100 """ |
|
101 Private method to receive the given length of bytes. |
|
102 |
|
103 @param length bytes to receive |
|
104 @type int |
|
105 @return received bytes or None if connection closed |
|
106 @rtype bytes |
|
107 """ |
|
108 data = b'' |
|
109 while len(data) < length: |
|
110 newData = self.connection.recv(length - len(data)) |
|
111 if not newData: |
|
112 return None |
|
113 data += newData |
|
114 return data |
|
115 |
|
116 def __peek(self, length): |
|
117 """ |
|
118 Private method to peek the given length of bytes. |
|
119 |
|
120 @param length bytes to receive |
|
121 @type int |
|
122 @return received bytes |
|
123 @rtype bytes |
|
124 """ |
|
125 data = b'' |
|
126 self.connection.setblocking(False) |
|
127 try: |
|
128 with contextlib.suppress(OSError): |
|
129 data = self.connection.recv(length, socket.MSG_PEEK) |
|
130 finally: |
|
131 self.connection.setblocking(True) |
|
132 |
|
133 return data |
|
134 |
|
135 def __cancelled(self): |
|
136 """ |
|
137 Private method to check for a job cancellation. |
|
138 |
|
139 @return flag indicating a cancellation |
|
140 @rtype bool |
|
141 """ |
|
142 msg = self.__peek(struct.calcsize(b'!II') + 6) |
|
143 if msg[-6:] == b"CANCEL": |
|
144 # get rid of the message data |
|
145 self.__receive(struct.calcsize(b'!II') + 6) |
|
146 return True |
|
147 else: |
|
148 return False |
|
149 |
|
150 def run(self): |
|
151 """ |
|
152 Public method implementing the main loop of the client. |
|
153 |
|
154 @exception RuntimeError raised if hashes don't match |
|
155 """ |
|
156 try: |
|
157 while True: |
|
158 header = self.__receive(struct.calcsize(b'!II')) |
|
159 # Leave main loop if connection was closed. |
|
160 if not header: |
|
161 break |
|
162 |
|
163 length, datahash = struct.unpack(b'!II', header) |
|
164 messageType = self.__receive(6) |
|
165 packedData = self.__receive(length) |
|
166 |
|
167 if messageType != b"JOB ": |
|
168 continue |
|
169 |
|
170 if adler32(packedData) & 0xffffffff != datahash: |
|
171 raise RuntimeError('Hashes not equal') |
|
172 |
|
173 packedData = packedData.decode('utf-8') |
|
174 |
|
175 fx, fn, data = json.loads(packedData) |
|
176 if fx == 'INIT': |
|
177 ret = self.__initClientService(fn, *data) |
|
178 elif fx.startswith("batch_"): |
|
179 callback = self.batchServices.get(fx) |
|
180 if callback: |
|
181 try: |
|
182 callback(data, self.__send, fx, self.__cancelled, |
|
183 maxProcesses=self.__maxProcs) |
|
184 except TypeError: |
|
185 # for backward compatibility |
|
186 callback(data, self.__send, fx, self.__cancelled) |
|
187 ret = "__DONE__" |
|
188 else: |
|
189 ret = 'Unknown batch service.' |
|
190 else: |
|
191 callback = self.services.get(fx) |
|
192 if callback: |
|
193 ret = callback(fn, *data) |
|
194 else: |
|
195 ret = 'Unknown service.' |
|
196 |
|
197 if isinstance(ret, Exception): |
|
198 ret = str(ret) |
|
199 |
|
200 self.__send(fx, fn, ret) |
|
201 except OSError: |
|
202 pass |
|
203 except Exception: |
|
204 exctype, excval, exctb = sys.exc_info() |
|
205 tbinfofile = io.StringIO() |
|
206 traceback.print_tb(exctb, None, tbinfofile) |
|
207 tbinfofile.seek(0) |
|
208 tbinfo = tbinfofile.read() |
|
209 del exctb |
|
210 self.__send( |
|
211 'EXCEPTION', '?', [str(exctype), str(excval), tbinfo]) |
|
212 |
|
213 finally: |
|
214 # Give time to process latest response on server side |
|
215 time.sleep(0.5) |
|
216 self.connection.shutdown(socket.SHUT_RDWR) |
|
217 self.connection.close() |
|
218 |
|
219 if __name__ == '__main__': |
|
220 if len(sys.argv) != 5: |
|
221 print('Host, port, max. processes and Python library path parameters' |
|
222 ' are missing. Aborting...') |
|
223 sys.exit(1) |
|
224 |
|
225 host, port, maxProcs, pyLibraryPath = sys.argv[1:] |
|
226 |
|
227 # insert pyLibraryPath into the search path because external stuff might |
|
228 # be installed in the eric (virtual) environment |
|
229 sys.path.insert(1, pyLibraryPath) |
|
230 |
|
231 backgroundClient = BackgroundClient(host, int(port), int(maxProcs)) |
|
232 # Start the main loop |
|
233 backgroundClient.run() |
|
234 |
|
235 # |
|
236 # eflag: noqa = M801 |