|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2013 - 2019 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 # pylint: disable=C0103 |
|
6 |
|
7 """ |
|
8 Module implementing a Qt free version of a background client for the various |
|
9 checkers and other python interpreter dependent functions. |
|
10 """ |
|
11 |
|
12 from __future__ import unicode_literals |
|
13 try: |
|
14 bytes = unicode |
|
15 import StringIO as io # __IGNORE_EXCEPTION__ |
|
16 except NameError: |
|
17 import io # __IGNORE_WARNING__ |
|
18 |
|
19 import json |
|
20 import socket |
|
21 import struct |
|
22 import sys |
|
23 import time |
|
24 import traceback |
|
25 from zlib import adler32 |
|
26 |
|
27 |
|
28 class BackgroundClient(object): |
|
29 """ |
|
30 Class implementing the main part of the background client. |
|
31 """ |
|
32 def __init__(self, host, port, maxProcs): |
|
33 """ |
|
34 Constructor of the BackgroundClient class. |
|
35 |
|
36 @param host ip address the background service is listening |
|
37 @type str |
|
38 @param port port of the background service |
|
39 @type int |
|
40 @param maxProcs maximum number of CPUs (processes) to use |
|
41 (0 = determined automatically) |
|
42 @type int |
|
43 """ |
|
44 self.services = {} |
|
45 self.batchServices = {} |
|
46 |
|
47 self.connection = socket.create_connection((host, port)) |
|
48 ver = b'Python2' if sys.version_info[0] == 2 else b'Python3' |
|
49 self.connection.sendall(ver) |
|
50 self.__maxProcs = maxProcs |
|
51 |
|
52 def __initClientService(self, fn, path, module): |
|
53 """ |
|
54 Private method to import the given module and register it as service. |
|
55 |
|
56 @param fn service name to register (str) |
|
57 @param path contains the path to the module (str) |
|
58 @param module name to import (str) |
|
59 @return text result of the import action (str) |
|
60 """ |
|
61 sys.path.insert(1, path) |
|
62 try: |
|
63 importedModule = __import__(module, globals(), locals(), [], 0) |
|
64 self.services[fn] = importedModule.initService() |
|
65 try: |
|
66 self.batchServices["batch_" + fn] = \ |
|
67 importedModule.initBatchService() |
|
68 except AttributeError: |
|
69 pass |
|
70 return 'ok' |
|
71 except ImportError: |
|
72 return 'Import Error' |
|
73 |
|
74 def __send(self, fx, fn, data): |
|
75 """ |
|
76 Private method to send a job response back to the BackgroundService. |
|
77 |
|
78 @param fx remote function name to execute (str) |
|
79 @param fn filename for identification (str) |
|
80 @param data return value(s) (any basic datatype) |
|
81 """ |
|
82 packedData = json.dumps([fx, fn, data]) |
|
83 if sys.version_info[0] == 3: |
|
84 packedData = bytes(packedData, 'utf-8') |
|
85 header = struct.pack( |
|
86 b'!II', len(packedData), adler32(packedData) & 0xffffffff) |
|
87 self.connection.sendall(header) |
|
88 self.connection.sendall(packedData) |
|
89 |
|
90 def __receive(self, length): |
|
91 """ |
|
92 Private methode to receive the given length of bytes. |
|
93 |
|
94 @param length bytes to receive (int) |
|
95 @return received bytes or None if connection closed (bytes) |
|
96 """ |
|
97 data = b'' |
|
98 while len(data) < length: |
|
99 newData = self.connection.recv(length - len(data)) |
|
100 if not newData: |
|
101 return None |
|
102 data += newData |
|
103 return data |
|
104 |
|
105 def __peek(self, length): |
|
106 """ |
|
107 Private methode to peek the given length of bytes. |
|
108 |
|
109 @param length bytes to receive (int) |
|
110 @return received bytes (bytes) |
|
111 """ |
|
112 data = b'' |
|
113 self.connection.setblocking(False) |
|
114 try: |
|
115 data = self.connection.recv(length, socket.MSG_PEEK) |
|
116 except socket.error: |
|
117 pass |
|
118 finally: |
|
119 self.connection.setblocking(True) |
|
120 |
|
121 return data |
|
122 |
|
123 def __cancelled(self): |
|
124 """ |
|
125 Private method to check for a job cancellation. |
|
126 |
|
127 @return flag indicating a cancellation (boolean) |
|
128 """ |
|
129 msg = self.__peek(struct.calcsize(b'!II') + 6) |
|
130 if msg[-6:] == b"CANCEL": |
|
131 # get rid of the message data |
|
132 self.__receive(struct.calcsize(b'!II') + 6) |
|
133 return True |
|
134 else: |
|
135 return False |
|
136 |
|
137 def run(self): |
|
138 """ |
|
139 Public method implementing the main loop of the client. |
|
140 """ |
|
141 try: |
|
142 while True: |
|
143 header = self.__receive(struct.calcsize(b'!II')) |
|
144 # Leave main loop if connection was closed. |
|
145 if not header: |
|
146 break |
|
147 |
|
148 length, datahash = struct.unpack(b'!II', header) |
|
149 messageType = self.__receive(6) |
|
150 packedData = self.__receive(length) |
|
151 |
|
152 if messageType != b"JOB ": |
|
153 continue |
|
154 |
|
155 assert adler32(packedData) & 0xffffffff == datahash, \ |
|
156 'Hashes not equal' |
|
157 if sys.version_info[0] == 3: |
|
158 packedData = packedData.decode('utf-8') |
|
159 |
|
160 fx, fn, data = json.loads(packedData) |
|
161 if fx == 'INIT': |
|
162 ret = self.__initClientService(fn, *data) |
|
163 elif fx.startswith("batch_"): |
|
164 callback = self.batchServices.get(fx) |
|
165 if callback: |
|
166 try: |
|
167 callback(data, self.__send, fx, self.__cancelled, |
|
168 maxProcesses=self.__maxProcs) |
|
169 except TypeError: |
|
170 # for backward compatibility |
|
171 callback(data, self.__send, fx, self.__cancelled) |
|
172 ret = "__DONE__" |
|
173 else: |
|
174 ret = 'Unknown batch service.' |
|
175 else: |
|
176 callback = self.services.get(fx) |
|
177 if callback: |
|
178 ret = callback(fn, *data) |
|
179 else: |
|
180 ret = 'Unknown service.' |
|
181 |
|
182 self.__send(fx, fn, ret) |
|
183 except socket.error: |
|
184 pass |
|
185 except Exception: |
|
186 exctype, excval, exctb = sys.exc_info() |
|
187 tbinfofile = io.StringIO() |
|
188 traceback.print_tb(exctb, None, tbinfofile) |
|
189 tbinfofile.seek(0) |
|
190 tbinfo = tbinfofile.read() |
|
191 del exctb |
|
192 self.__send( |
|
193 'EXCEPTION', '?', [str(exctype), str(excval), tbinfo]) |
|
194 |
|
195 finally: |
|
196 # Give time to process latest response on server side |
|
197 time.sleep(0.5) |
|
198 self.connection.shutdown(socket.SHUT_RDWR) |
|
199 self.connection.close() |
|
200 |
|
201 if __name__ == '__main__': |
|
202 if len(sys.argv) != 4: |
|
203 print('Host, port and max. processes parameters are missing. Abort.') |
|
204 sys.exit(1) |
|
205 |
|
206 host, port, maxProcs = sys.argv[1:] |
|
207 backgroundClient = BackgroundClient(host, int(port), int(maxProcs)) |
|
208 # Start the main loop |
|
209 backgroundClient.run() |
|
210 |
|
211 # |
|
212 # eflag: noqa = M801 |