Utilities/BackgroundClient.py

changeset 4221
c9fdc07753a7
parent 4218
f542ad1f76c5
child 4563
881340f4bd0c
child 4632
ca310db386ed
equal deleted inserted replaced
4220:4df8f9fc7ea9 4221:c9fdc07753a7
84 def __receive(self, length): 84 def __receive(self, length):
85 """ 85 """
86 Private methode to receive the given length of bytes. 86 Private methode to receive the given length of bytes.
87 87
88 @param length bytes to receive (int) 88 @param length bytes to receive (int)
89 @return received bytes or None if connection closed (str) 89 @return received bytes or None if connection closed (bytes)
90 """ 90 """
91 data = b'' 91 data = b''
92 while len(data) < length: 92 while len(data) < length:
93 newData = self.connection.recv(length - len(data)) 93 newData = self.connection.recv(length - len(data))
94 if not newData: 94 if not newData:
95 return None 95 return None
96 data += newData 96 data += newData
97 return data 97 return data
98 98
99 def __peek(self, length):
100 """
101 Private methode to peek the given length of bytes.
102
103 @param length bytes to receive (int)
104 @return received bytes (bytes)
105 """
106 data = b''
107 self.connection.setblocking(False)
108 try:
109 data = self.connection.recv(length, socket.MSG_PEEK)
110 except socket.error:
111 pass
112 self.connection.setblocking(True)
113 return data
114
115 def __cancelled(self):
116 """
117 Private method to check for a job cancellation.
118
119 @return flag indicating a cancellation (boolean)
120 """
121 msg = self.__peek(struct.calcsize(b'!II') + 6)
122 if msg[-6:] == b"CANCEL":
123 # get rid of the message data
124 self.__peek(struct.calcsize(b'!II') + 6)
125 return True
126 else:
127 return False
128
99 def run(self): 129 def run(self):
100 """ 130 """
101 Public method implementing the main loop of the client. 131 Public method implementing the main loop of the client.
102 """ 132 """
103 try: 133 try:
104 while True: 134 while True:
105 header = self.__receive(8) 135 header = self.__receive(struct.calcsize(b'!II'))
106 # Leave main loop if connection was closed. 136 # Leave main loop if connection was closed.
107 if not header: 137 if not header:
108 break 138 break
109 139
110 length, datahash = struct.unpack(b'!II', header) 140 length, datahash = struct.unpack(b'!II', header)
141 messageType = self.__receive(6)
111 packedData = self.__receive(length) 142 packedData = self.__receive(length)
143
144 if messageType != b"JOB ":
145 continue
112 146
113 assert adler32(packedData) & 0xffffffff == datahash, \ 147 assert adler32(packedData) & 0xffffffff == datahash, \
114 'Hashes not equal' 148 'Hashes not equal'
115 if sys.version_info[0] == 3: 149 if sys.version_info[0] == 3:
116 packedData = packedData.decode('utf-8') 150 packedData = packedData.decode('utf-8')
119 if fx == 'INIT': 153 if fx == 'INIT':
120 ret = self.__initClientService(fn, *data) 154 ret = self.__initClientService(fn, *data)
121 elif fx.startswith("batch_"): 155 elif fx.startswith("batch_"):
122 callback = self.batchServices.get(fx) 156 callback = self.batchServices.get(fx)
123 if callback: 157 if callback:
124 callback(data, self.__send, fx) 158 callback(data, self.__send, fx, self.__cancelled)
125 ret = "__DONE__" 159 ret = "__DONE__"
126 else: 160 else:
127 ret = 'Unknown batch service.' 161 ret = 'Unknown batch service.'
128 else: 162 else:
129 callback = self.services.get(fx) 163 callback = self.services.get(fx)

eric ide

mercurial