84 def __receive(self, length): |
84 def __receive(self, length): |
85 """ |
85 """ |
86 Private methode to receive the given length of bytes. |
86 Private methode to receive the given length of bytes. |
87 |
87 |
88 @param length bytes to receive (int) |
88 @param length bytes to receive (int) |
89 @return received bytes or None if connection closed (str) |
89 @return received bytes or None if connection closed (bytes) |
90 """ |
90 """ |
91 data = b'' |
91 data = b'' |
92 while len(data) < length: |
92 while len(data) < length: |
93 newData = self.connection.recv(length - len(data)) |
93 newData = self.connection.recv(length - len(data)) |
94 if not newData: |
94 if not newData: |
95 return None |
95 return None |
96 data += newData |
96 data += newData |
97 return data |
97 return data |
98 |
98 |
|
99 def __peek(self, length): |
|
100 """ |
|
101 Private methode to peek the given length of bytes. |
|
102 |
|
103 @param length bytes to receive (int) |
|
104 @return received bytes (bytes) |
|
105 """ |
|
106 data = b'' |
|
107 self.connection.setblocking(False) |
|
108 try: |
|
109 data = self.connection.recv(length, socket.MSG_PEEK) |
|
110 except socket.error: |
|
111 pass |
|
112 self.connection.setblocking(True) |
|
113 return data |
|
114 |
|
115 def __cancelled(self): |
|
116 """ |
|
117 Private method to check for a job cancellation. |
|
118 |
|
119 @return flag indicating a cancellation (boolean) |
|
120 """ |
|
121 msg = self.__peek(struct.calcsize(b'!II') + 6) |
|
122 if msg[-6:] == b"CANCEL": |
|
123 # get rid of the message data |
|
124 self.__peek(struct.calcsize(b'!II') + 6) |
|
125 return True |
|
126 else: |
|
127 return False |
|
128 |
99 def run(self): |
129 def run(self): |
100 """ |
130 """ |
101 Public method implementing the main loop of the client. |
131 Public method implementing the main loop of the client. |
102 """ |
132 """ |
103 try: |
133 try: |
104 while True: |
134 while True: |
105 header = self.__receive(8) |
135 header = self.__receive(struct.calcsize(b'!II')) |
106 # Leave main loop if connection was closed. |
136 # Leave main loop if connection was closed. |
107 if not header: |
137 if not header: |
108 break |
138 break |
109 |
139 |
110 length, datahash = struct.unpack(b'!II', header) |
140 length, datahash = struct.unpack(b'!II', header) |
|
141 messageType = self.__receive(6) |
111 packedData = self.__receive(length) |
142 packedData = self.__receive(length) |
|
143 |
|
144 if messageType != b"JOB ": |
|
145 continue |
112 |
146 |
113 assert adler32(packedData) & 0xffffffff == datahash, \ |
147 assert adler32(packedData) & 0xffffffff == datahash, \ |
114 'Hashes not equal' |
148 'Hashes not equal' |
115 if sys.version_info[0] == 3: |
149 if sys.version_info[0] == 3: |
116 packedData = packedData.decode('utf-8') |
150 packedData = packedData.decode('utf-8') |
119 if fx == 'INIT': |
153 if fx == 'INIT': |
120 ret = self.__initClientService(fn, *data) |
154 ret = self.__initClientService(fn, *data) |
121 elif fx.startswith("batch_"): |
155 elif fx.startswith("batch_"): |
122 callback = self.batchServices.get(fx) |
156 callback = self.batchServices.get(fx) |
123 if callback: |
157 if callback: |
124 callback(data, self.__send, fx) |
158 callback(data, self.__send, fx, self.__cancelled) |
125 ret = "__DONE__" |
159 ret = "__DONE__" |
126 else: |
160 else: |
127 ret = 'Unknown batch service.' |
161 ret = 'Unknown batch service.' |
128 else: |
162 else: |
129 callback = self.services.get(fx) |
163 callback = self.services.get(fx) |