Cooperation/Connection.py

changeset 149
a134031209be
child 155
375e3c884874
equal deleted inserted replaced
148:727a907b8305 149:a134031209be
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2010 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing a class representing a peer connection.
8 """
9
10 from PyQt4.QtCore import pyqtSignal, QTimer, QTime, QByteArray
11 from PyQt4.QtNetwork import QTcpSocket
12
13 MaxBufferSize = 1024 * 1024
14 TransferTimeout = 30 * 1000
15 PongTimeout = 60 * 1000
16 PingInterval = 5 * 1000
17 SeparatorToken = '|||'
18
19
20 class Connection(QTcpSocket):
21 """
22 Class representing a peer connection.
23
24 @signal readyForUse() emitted when the connection is ready for use
25 @signal newMessage(user, message) emitted after a new message has
26 arrived (string, string)
27 @signal getParticipants() emitted after a get participants message has arrived
28 @signal participants(participants) emitted after the list of participants has
29 arrived (list of strings of "host:port")
30 """
31 WaitingForGreeting = 0
32 ReadingGreeting = 1
33 ReadyForUse = 2
34
35 PlainText = 0
36 Ping = 1
37 Pong = 2
38 Greeting = 3
39 GetParticipants = 4
40 Participants = 5
41 Undefined = 99
42
43 ProtocolMessage = "MESSAGE"
44 ProtocolPing = "PING"
45 ProtocolPong = "PONG"
46 ProtocolGreeting = "GREETING"
47 ProtocolGetParticipants = "GET_PARTICIPANTS"
48 ProtocolParticipants = "PARTICIPANTS"
49
50 readyForUse = pyqtSignal()
51 newMessage = pyqtSignal(str, str)
52 getParticipants = pyqtSignal()
53 participants = pyqtSignal(list)
54
55 def __init__(self, parent = None):
56 """
57 Constructor
58
59 @param parent referenec to the parent object (QObject)
60 """
61 QTcpSocket.__init__(self, parent)
62
63 self.__greetingMessage = self.trUtf8("undefined")
64 self.__username = self.trUtf8("unknown")
65 self.__serverPort = 0
66 self.__state = Connection.WaitingForGreeting
67 self.__currentDataType = Connection.Undefined
68 self.__numBytesForCurrentDataType = -1
69 self.__transferTimerId = 0
70 self.__isGreetingMessageSent = False
71 self.__pingTimer = QTimer()
72 self.__pingTimer.setInterval(PingInterval)
73 self.__pongTime = QTime()
74 self.__buffer = QByteArray()
75
76 self.readyRead.connect(self.__processReadyRead)
77 self.disconnected.connect(self.__pingTimer.stop)
78 self.__pingTimer.timeout.connect(self.__sendPing)
79 self.connected.connect(self.__sendGreetingMessage)
80
81 def name(self):
82 """
83 Public method to get the connection name.
84
85 @return connection name (string)
86 """
87 return self.__username
88
89 def serverPort(self):
90 """
91 Public method to get the server port.
92
93 @return server port (integer)
94 """
95 return self.__serverPort
96
97 def setGreetingMessage(self, message, serverPort):
98 """
99 Public method to set the greeting message.
100
101 @param message greeting message (string)
102 """
103 self.__greetingMessage = "{0}:{1}".format(message, serverPort)
104
105 def sendMessage(self, message):
106 """
107 Public method to send a message.
108
109 @param message message to be sent (string)
110 @return flag indicating a successful send (boolean)
111 """
112 if message == "":
113 return False
114
115 msg = QByteArray(message.encode("utf-8"))
116 data = QByteArray("{0}{1}{2}{1}".format(
117 Connection.ProtocolMessage, SeparatorToken, msg.size())) + msg
118 return self.write(data) == data.size()
119
120 def timerEvent(self, evt):
121 """
122 Protected method to handle timer events.
123
124 @param evt reference to the timer event (QTimerEvent)
125 """
126 if evt.timerId() == self.__transferTimerId:
127 self.abort()
128 self.killTimer(self.__transferTimerId)
129 self.__transferTimerId = 0
130
131 def __processReadyRead(self):
132 """
133 Private slot to handle the readyRead signal.
134 """
135 if self.__state == Connection.WaitingForGreeting:
136 if not self.__readProtocolHeader():
137 return
138 if self.__currentDataType != Connection.Greeting:
139 self.abort()
140 return
141 self.__state = Connection.ReadingGreeting
142
143 if self.__state == Connection.ReadingGreeting:
144 if not self.__hasEnoughData():
145 return
146
147 self.__buffer = QByteArray(self.read(self.__numBytesForCurrentDataType))
148 if self.__buffer.size() != self.__numBytesForCurrentDataType:
149 self.abort()
150 return
151
152 user, serverPort = str(self.__buffer, encoding = "utf-8").split(":")
153 self.__serverPort = int(serverPort)
154 self.__username = "{0}@{1}:{2}".format(
155 user,
156 self.peerAddress().toString(),
157 self.peerPort()
158 )
159 self.__currentDataType = Connection.Undefined
160 self.__numBytesForCurrentDataType = 0
161 self.__buffer.clear()
162
163 if not self.isValid():
164 self.abort()
165 return
166
167 if not self.__isGreetingMessageSent:
168 self.__sendGreetingMessage()
169
170 self.__pingTimer.start()
171 self.__pongTime.start()
172 self.__state = Connection.ReadyForUse
173 self.readyForUse.emit()
174
175 while self.bytesAvailable():
176 if self.__currentDataType == Connection.Undefined:
177 if not self.__readProtocolHeader():
178 return
179
180 if not self.__hasEnoughData():
181 return
182
183 self.__processData()
184
185 def __sendPing(self):
186 """
187 Private slot to send a ping message.
188 """
189 if self.__pongTime.elapsed() > PongTimeout:
190 self.abort()
191 return
192
193 self.write("{0}{1}1{1}p".format(Connection.ProtocolPing, SeparatorToken))
194
195 def __sendGreetingMessage(self):
196 """
197 Private slot to send a greeting message.
198 """
199 greeting = QByteArray(self.__greetingMessage.encode("utf-8"))
200 data = QByteArray("{0}{1}{2}{1}".format(
201 Connection.ProtocolGreeting, SeparatorToken, greeting.size())) + greeting
202 if self.write(data) == data.size():
203 self.__isGreetingMessageSent = True
204
205 def __readDataIntoBuffer(self, maxSize = MaxBufferSize):
206 """
207 Private method to read some data into the buffer.
208
209 @param maxSize maximum size of data to read (integer)
210 @return size of data read (integer)
211 """
212 if maxSize > MaxBufferSize:
213 return 0
214
215 numBytesBeforeRead = self.__buffer.size()
216 if numBytesBeforeRead == MaxBufferSize:
217 self.abort()
218 return 0
219
220 while self.bytesAvailable() and self.__buffer.size() < maxSize:
221 self.__buffer.append(self.read(1))
222 if self.__buffer.endsWith(SeparatorToken):
223 break
224
225 return self.__buffer.size() - numBytesBeforeRead
226
227 def __dataLengthForCurrentDataType(self):
228 """
229 Private method to get the data length for the current data type.
230
231 @return data length (integer)
232 """
233 if self.bytesAvailable() <= 0 or \
234 self.__readDataIntoBuffer() <= 0 or \
235 not self.__buffer.endsWith(SeparatorToken):
236 return 0
237
238 self.__buffer.chop(len(SeparatorToken))
239 number = self.__buffer.toInt()[0]
240 self.__buffer.clear()
241 return number
242
243 def __readProtocolHeader(self):
244 """
245 Private method to read the protocol header.
246
247 @return flag indicating a successful read (boolean)
248 """
249 if self.__transferTimerId:
250 self.killTimer(self.__transferTimerId)
251 self.__transferTimerId = 0
252
253 if self.__readDataIntoBuffer() <= 0:
254 self.__transferTimerId = self.startTimer(TransferTimeout)
255 return False
256
257 self.__buffer.chop(len(SeparatorToken))
258 if self.__buffer == Connection.ProtocolPing:
259 self.__currentDataType = Connection.Ping
260 elif self.__buffer == Connection.ProtocolPong:
261 self.__currentDataType = Connection.Pong
262 elif self.__buffer == Connection.ProtocolMessage:
263 self.__currentDataType = Connection.PlainText
264 elif self.__buffer == Connection.ProtocolGreeting:
265 self.__currentDataType = Connection.Greeting
266 elif self.__buffer == Connection.ProtocolGetParticipants:
267 self.__currentDataType = Connection.GetParticipants
268 elif self.__buffer == Connection.ProtocolParticipants:
269 self.__currentDataType = Connection.Participants
270 else:
271 self.__currentDataType = Connection.Undefined
272 self.abort()
273 return False
274
275 self.__buffer.clear()
276 self.__numBytesForCurrentDataType = self.__dataLengthForCurrentDataType()
277 return True
278
279 def __hasEnoughData(self):
280 """
281 Private method to check, if enough data is available.
282
283 @return flag indicating availability of enough data (boolean)
284 """
285 if self.__transferTimerId:
286 self.killTimer(self.__transferTimerId)
287 self.__transferTimerId = 0
288
289 if self.__numBytesForCurrentDataType <= 0:
290 self.__numBytesForCurrentDataType = self.__dataLengthForCurrentDataType()
291
292 if self.bytesAvailable() < self.__numBytesForCurrentDataType or \
293 self.__numBytesForCurrentDataType <= 0:
294 self.__transferTimerId = self.startTimer(TransferTimeout)
295 return False
296
297 return True
298
299 def __processData(self):
300 """
301 Private method to process the received data.
302 """
303 self.__buffer = QByteArray(self.read(self.__numBytesForCurrentDataType))
304 if self.__buffer.size() != self.__numBytesForCurrentDataType:
305 self.abort()
306 return
307
308 if self.__currentDataType == Connection.PlainText:
309 self.newMessage.emit(self.__username, str(self.__buffer, encoding = "utf-8"))
310 elif self.__currentDataType == Connection.Ping:
311 self.write("{0}{1}1{1}p".format(Connection.ProtocolPong, SeparatorToken))
312 elif self.__currentDataType == Connection.Pong:
313 self.__pongTime.restart()
314 elif self.__currentDataType == Connection.GetParticipants:
315 self.getParticipants.emit()
316 elif self.__currentDataType == Connection.Participants:
317 msg = str(self.__buffer, encoding = "utf-8")
318 if msg == "<empty>":
319 participantsList = []
320 else:
321 participantsList = msg.split(SeparatorToken)
322 self.participants.emit(participantsList[:])
323
324 self.__currentDataType = Connection.Undefined
325 self.__numBytesForCurrentDataType = 0
326 self.__buffer.clear()
327
328 def sendGetParticipants(self):
329 """
330 Public method to request a list of participants.
331 """
332 self.write(
333 "{0}{1}1{1}l".format(Connection.ProtocolGetParticipants, SeparatorToken)
334 )
335
336 def sendParticipants(self, participants):
337 """
338 Public method to send the list of participants.
339
340 @param participants list of participants (list of strings of "host:port")
341 """
342 if participants:
343 message = SeparatorToken.join(participants)
344 else:
345 message = "<empty>"
346 msg = QByteArray(message.encode("utf-8"))
347 data = QByteArray("{0}{1}{2}{1}".format(
348 Connection.ProtocolParticipants, SeparatorToken, msg.size())) + msg
349 self.write(data)

eric ide

mercurial