Cooperation/Connection.py

changeset 149
a134031209be
child 155
375e3c884874
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Cooperation/Connection.py	Sun Mar 21 19:34:15 2010 +0000
@@ -0,0 +1,349 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2010 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing a class representing a peer connection.
+"""
+
+from PyQt4.QtCore import pyqtSignal, QTimer, QTime, QByteArray
+from PyQt4.QtNetwork import QTcpSocket
+
+MaxBufferSize   = 1024 * 1024
+TransferTimeout =   30 * 1000
+PongTimeout     =   60 * 1000
+PingInterval    =    5 * 1000
+SeparatorToken  = '|||'
+
+
+class Connection(QTcpSocket):
+    """
+    Class representing a peer connection.
+    
+    @signal readyForUse() emitted when the connection is ready for use
+    @signal newMessage(user, message) emitted after a new message has 
+            arrived (string, string)
+    @signal getParticipants() emitted after a get participants message has arrived
+    @signal participants(participants) emitted after the list of participants has
+            arrived (list of strings of "host:port")
+    """
+    WaitingForGreeting  = 0
+    ReadingGreeting     = 1
+    ReadyForUse         = 2
+    
+    PlainText       = 0
+    Ping            = 1
+    Pong            = 2
+    Greeting        = 3
+    GetParticipants = 4
+    Participants    = 5
+    Undefined       = 99
+    
+    ProtocolMessage         = "MESSAGE"
+    ProtocolPing            = "PING"
+    ProtocolPong            = "PONG"
+    ProtocolGreeting        = "GREETING"
+    ProtocolGetParticipants = "GET_PARTICIPANTS"
+    ProtocolParticipants    = "PARTICIPANTS"
+    
+    readyForUse     = pyqtSignal()
+    newMessage      = pyqtSignal(str, str)
+    getParticipants = pyqtSignal()
+    participants    = pyqtSignal(list)
+    
+    def __init__(self, parent = None):
+        """
+        Constructor
+        
+        @param parent referenec to the parent object (QObject)
+        """
+        QTcpSocket.__init__(self, parent)
+        
+        self.__greetingMessage = self.trUtf8("undefined")
+        self.__username = self.trUtf8("unknown")
+        self.__serverPort = 0
+        self.__state = Connection.WaitingForGreeting
+        self.__currentDataType = Connection.Undefined
+        self.__numBytesForCurrentDataType = -1
+        self.__transferTimerId = 0
+        self.__isGreetingMessageSent = False
+        self.__pingTimer = QTimer()
+        self.__pingTimer.setInterval(PingInterval)
+        self.__pongTime = QTime()
+        self.__buffer = QByteArray()
+        
+        self.readyRead.connect(self.__processReadyRead)
+        self.disconnected.connect(self.__pingTimer.stop)
+        self.__pingTimer.timeout.connect(self.__sendPing)
+        self.connected.connect(self.__sendGreetingMessage)
+    
+    def name(self):
+        """
+        Public method to get the connection name.
+        
+        @return connection name (string)
+        """
+        return self.__username
+    
+    def serverPort(self):
+        """
+        Public method to get the server port.
+        
+        @return server port (integer)
+        """
+        return self.__serverPort
+    
+    def setGreetingMessage(self, message, serverPort):
+        """
+        Public method to set the greeting message.
+        
+        @param message greeting message (string)
+        """
+        self.__greetingMessage = "{0}:{1}".format(message, serverPort)
+    
+    def sendMessage(self, message):
+        """
+        Public method to send a message.
+        
+        @param message message to be sent (string)
+        @return flag indicating a successful send (boolean)
+        """
+        if message == "":
+            return False
+        
+        msg = QByteArray(message.encode("utf-8"))
+        data = QByteArray("{0}{1}{2}{1}".format(
+            Connection.ProtocolMessage, SeparatorToken, msg.size())) + msg
+        return self.write(data) == data.size()
+    
+    def timerEvent(self, evt):
+        """
+        Protected method to handle timer events.
+        
+        @param evt reference to the timer event (QTimerEvent)
+        """
+        if evt.timerId() == self.__transferTimerId:
+            self.abort()
+            self.killTimer(self.__transferTimerId)
+            self.__transferTimerId = 0
+    
+    def __processReadyRead(self):
+        """
+        Private slot to handle the readyRead signal.
+        """
+        if self.__state == Connection.WaitingForGreeting:
+            if not self.__readProtocolHeader():
+                return
+            if self.__currentDataType != Connection.Greeting:
+                self.abort()
+                return
+            self.__state = Connection.ReadingGreeting
+        
+        if self.__state == Connection.ReadingGreeting:
+            if not self.__hasEnoughData():
+                return
+            
+            self.__buffer = QByteArray(self.read(self.__numBytesForCurrentDataType))
+            if self.__buffer.size() != self.__numBytesForCurrentDataType:
+                self.abort()
+                return
+            
+            user, serverPort = str(self.__buffer, encoding = "utf-8").split(":")
+            self.__serverPort = int(serverPort)
+            self.__username = "{0}@{1}:{2}".format(
+                user, 
+                self.peerAddress().toString(), 
+                self.peerPort()
+            )
+            self.__currentDataType = Connection.Undefined
+            self.__numBytesForCurrentDataType = 0
+            self.__buffer.clear()
+            
+            if not self.isValid():
+                self.abort()
+                return
+            
+            if not self.__isGreetingMessageSent:
+                self.__sendGreetingMessage()
+            
+            self.__pingTimer.start()
+            self.__pongTime.start()
+            self.__state = Connection.ReadyForUse
+            self.readyForUse.emit()
+        
+        while self.bytesAvailable():
+            if self.__currentDataType == Connection.Undefined:
+                if not self.__readProtocolHeader():
+                    return
+            
+            if not self.__hasEnoughData():
+                return
+            
+            self.__processData()
+    
+    def __sendPing(self):
+        """
+        Private slot to send a ping message.
+        """
+        if self.__pongTime.elapsed() > PongTimeout:
+            self.abort()
+            return
+        
+        self.write("{0}{1}1{1}p".format(Connection.ProtocolPing, SeparatorToken))
+    
+    def __sendGreetingMessage(self):
+        """
+        Private slot to send a greeting message.
+        """
+        greeting = QByteArray(self.__greetingMessage.encode("utf-8"))
+        data = QByteArray("{0}{1}{2}{1}".format(
+            Connection.ProtocolGreeting, SeparatorToken, greeting.size())) + greeting
+        if self.write(data) == data.size():
+            self.__isGreetingMessageSent = True
+    
+    def __readDataIntoBuffer(self, maxSize = MaxBufferSize):
+        """
+        Private method to read some data into the buffer.
+        
+        @param maxSize maximum size of data to read (integer)
+        @return size of data read (integer)
+        """
+        if maxSize > MaxBufferSize:
+            return 0
+        
+        numBytesBeforeRead = self.__buffer.size()
+        if numBytesBeforeRead == MaxBufferSize:
+            self.abort()
+            return 0
+        
+        while self.bytesAvailable() and self.__buffer.size() < maxSize:
+            self.__buffer.append(self.read(1))
+            if self.__buffer.endsWith(SeparatorToken):
+                break
+        
+        return self.__buffer.size() - numBytesBeforeRead
+    
+    def __dataLengthForCurrentDataType(self):
+        """
+        Private method to get the data length for the current data type.
+        
+        @return data length (integer)
+        """
+        if self.bytesAvailable() <= 0 or \
+           self.__readDataIntoBuffer() <= 0 or \
+           not self.__buffer.endsWith(SeparatorToken):
+            return 0
+        
+        self.__buffer.chop(len(SeparatorToken))
+        number = self.__buffer.toInt()[0]
+        self.__buffer.clear()
+        return number
+    
+    def __readProtocolHeader(self):
+        """
+        Private method to read the protocol header.
+        
+        @return flag indicating a successful read (boolean)
+        """
+        if self.__transferTimerId:
+            self.killTimer(self.__transferTimerId)
+            self.__transferTimerId = 0
+        
+        if self.__readDataIntoBuffer() <= 0:
+            self.__transferTimerId = self.startTimer(TransferTimeout)
+            return False
+        
+        self.__buffer.chop(len(SeparatorToken))
+        if self.__buffer == Connection.ProtocolPing:
+            self.__currentDataType = Connection.Ping
+        elif self.__buffer == Connection.ProtocolPong:
+            self.__currentDataType = Connection.Pong
+        elif self.__buffer == Connection.ProtocolMessage:
+            self.__currentDataType = Connection.PlainText
+        elif self.__buffer == Connection.ProtocolGreeting:
+            self.__currentDataType = Connection.Greeting
+        elif self.__buffer == Connection.ProtocolGetParticipants:
+            self.__currentDataType = Connection.GetParticipants
+        elif self.__buffer == Connection.ProtocolParticipants:
+            self.__currentDataType = Connection.Participants
+        else:
+            self.__currentDataType = Connection.Undefined
+            self.abort()
+            return False
+        
+        self.__buffer.clear()
+        self.__numBytesForCurrentDataType = self.__dataLengthForCurrentDataType()
+        return True
+    
+    def __hasEnoughData(self):
+        """
+        Private method to check, if enough data is available.
+        
+        @return flag indicating availability of enough data (boolean)
+        """
+        if self.__transferTimerId:
+            self.killTimer(self.__transferTimerId)
+            self.__transferTimerId = 0
+        
+        if self.__numBytesForCurrentDataType <= 0:
+            self.__numBytesForCurrentDataType = self.__dataLengthForCurrentDataType()
+        
+        if self.bytesAvailable() < self.__numBytesForCurrentDataType or \
+           self.__numBytesForCurrentDataType <= 0:
+            self.__transferTimerId = self.startTimer(TransferTimeout)
+            return False
+        
+        return True
+    
+    def __processData(self):
+        """
+        Private method to process the received data.
+        """
+        self.__buffer = QByteArray(self.read(self.__numBytesForCurrentDataType))
+        if self.__buffer.size() != self.__numBytesForCurrentDataType:
+            self.abort()
+            return
+        
+        if self.__currentDataType == Connection.PlainText:
+            self.newMessage.emit(self.__username, str(self.__buffer, encoding = "utf-8"))
+        elif self.__currentDataType == Connection.Ping:
+            self.write("{0}{1}1{1}p".format(Connection.ProtocolPong, SeparatorToken))
+        elif self.__currentDataType == Connection.Pong:
+            self.__pongTime.restart()
+        elif self.__currentDataType == Connection.GetParticipants:
+            self.getParticipants.emit()
+        elif self.__currentDataType == Connection.Participants:
+            msg = str(self.__buffer, encoding = "utf-8")
+            if msg == "<empty>":
+                participantsList = []
+            else:
+                participantsList = msg.split(SeparatorToken)
+            self.participants.emit(participantsList[:])
+        
+        self.__currentDataType = Connection.Undefined
+        self.__numBytesForCurrentDataType = 0
+        self.__buffer.clear()
+    
+    def sendGetParticipants(self):
+        """
+        Public method to request a list of participants.
+        """
+        self.write(
+            "{0}{1}1{1}l".format(Connection.ProtocolGetParticipants, SeparatorToken)
+        )
+    
+    def sendParticipants(self, participants):
+        """
+        Public method to send the list of participants.
+        
+        @param participants list of participants (list of strings of "host:port")
+        """
+        if participants:
+            message = SeparatorToken.join(participants)
+        else:
+            message = "<empty>"
+        msg = QByteArray(message.encode("utf-8"))
+        data = QByteArray("{0}{1}{2}{1}".format(
+            Connection.ProtocolParticipants, SeparatorToken, msg.size())) + msg
+        self.write(data)

eric ide

mercurial