src/eric7/MicroPython/MicroPythonWebreplSocket.py

Sun, 16 Mar 2025 12:53:12 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 16 Mar 2025 12:53:12 +0100
branch
eric7
changeset 11170
6d6199d668fb
parent 11090
f5f5f5803935
permissions
-rw-r--r--

Added the Adafruit Feather nRF52840 to the list of known NRF52 boards and changed the list of known CircuitPython boards to be more explicit with respect to Adafruit boards (i.e. VID 0x239A).

# -*- coding: utf-8 -*-

# Copyright (c) 2023 - 2025 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a websocket class to be connect to the MicroPython webrepl
interface.
"""

from PyQt6.QtCore import (
    QCoreApplication,
    QEventLoop,
    QMutex,
    QTime,
    QTimer,
    QUrl,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtNetwork import QAbstractSocket
from PyQt6.QtWebSockets import QWebSocket

from eric7.EricUtilities.EricMutexLocker import EricMutexLocker


class MicroPythonWebreplSocket(QWebSocket):
    """
    Class implementing a websocket client to be connected to the MicroPython webrepl
    interface.

    @signal readyRead() emitted to signal the availability of data
    """

    readyRead = pyqtSignal()

    def __init__(self, timeout=10000, parent=None):
        """
        Constructor

        @param timeout timout in milliseconds to be set
        @type int
        @param parent reference to the parent object
        @type QObject
        """
        super().__init__(parent=parent)

        self.__connected = False
        self.__timeout = timeout  # 10s default timeout
        self.__timedOut = False

        self.__mutex = QMutex()
        self.__buffer = b""
        self.textMessageReceived.connect(self.__textDataReceived)

    @pyqtSlot(str)
    def __textDataReceived(self, strMessage):
        """
        Private slot handling a received text message.

        @param strMessage received text message
        @type str
        """
        with EricMutexLocker(self.__mutex):
            self.__buffer += strMessage.encode("utf-8")

        self.readyRead.emit()

    def setTimeout(self, timeout):
        """
        Public method to set the socket timeout value.

        @param timeout timout in milliseconds to be set
        @type int
        """
        self.__timeout = timeout

    def waitForConnected(self):
        """
        Public method to wait for the websocket being connected.

        @return flag indicating the connect result
        @rtype bool
        """
        loop = QEventLoop()
        self.connected.connect(loop.quit)
        self.errorOccurred.connect(loop.quit)

        def timeout():
            loop.quit()
            self.__timedOut = True

        self.__timedOut = False
        timer = QTimer()
        timer.setSingleShot(True)
        timer.timeout.connect(timeout)
        timer.start(self.__timeout)

        loop.exec()
        timer.stop()
        if self.state() == QAbstractSocket.SocketState.ConnectedState:
            self.__connected = True
            return True
        else:
            self.__connected = False
            return False

    def connectToDevice(self, host, port):
        """
        Public method to connect to the given host and port.

        @param host host name or IP address
        @type str
        @param port port number
        @type int
        @return flag indicating success and an error message
        @rtype tuple of (bool, str)
        """
        if self.__connected:
            self.disconnectFromDevice()

        url = QUrl(f"ws://{host}:{port}")
        self.open(url)
        ok = self.waitForConnected()
        if not ok:
            return False, self.tr("Connection to device webrepl failed.")

        self.__connected = True
        return True, ""

    def disconnect(self):
        """
        Public method to disconnect the websocket.
        """
        if self.__connected:
            self.close()
            self.__connected = False

    def isConnected(self):
        """
        Public method to check the connected state of the websocket.

        @return flag indicating the connected state
        @rtype bool
        """
        return self.__connected

    def hasTimedOut(self):
        """
        Public method to check, if the last 'readUntil()' has timed out.

        @return flag indicating a timeout
        @rtype bool
        """
        return self.__timedOut

    def login(self, password):
        """
        Public method to login to the webrepl console of the device.

        @param password password
        @type str
        @return flag indicating a successful login and an error indication
        @rtype tuple of (bool, str)
        """
        self.readUntil(expected=b": ")
        self.writeTextMessage(password.encode("utf-8") + b"\r")
        data = self.readUntil([b">>> ", b"denied\r\n"])
        error = (
            self.tr("WebRepl login failed (access denied).")
            if data.endswith(b"denied\r\n")
            else ""
        )

        return error == "", error

    def writeTextMessage(self, data):
        """
        Public method write some text data to the webrepl server of the connected
        device.

        @param data text data to be sent
        @type bytes
        """
        self.sendTextMessage(data.decode("utf-8"))
        self.flush()

    def readAll(self, timeout=0):
        """
        Public method to read all available data.

        @param timeout timeout in milliseconds (0 for no timeout)
            (defaults to 0)
        @type int (optional)
        @return received data
        @rtype bytes
        """
        QCoreApplication.processEvents(
            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
        )
        if timeout > 0:
            # receive data for 'timeout' milliseconds
            loop = QEventLoop()
            QTimer.singleShot(timeout, loop.quit)
            loop.exec()

        # return all buffered data
        with EricMutexLocker(self.__mutex):
            data = self.__buffer
            self.__buffer = b""

        return data

    def readUntil(self, expected=b"\n", size=None, timeout=0):
        r"""
        Public method to read data until an expected sequence is found
        (default: \n) or a specific size is exceeded.

        @param expected expected bytes sequence
        @type bytes
        @param size maximum data to be read (defaults to None)
        @type int (optional)
        @param timeout timeout in milliseconds (0 for configured default)
            (defaults to 0)
        @type int (optional)
        @return bytes read from the device including the expected sequence
        @rtype bytes
        """
        data = b""
        self.__timedOut = False

        if timeout == 0:
            timeout = self.__timeout

        if not isinstance(expected, list):
            expected = [expected]

        t = QTime.currentTime()
        while True:
            QCoreApplication.processEvents(
                QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents, 500
            )
            with EricMutexLocker(self.__mutex):
                if any(e in self.__buffer for e in expected):
                    for e in expected:
                        index = self.__buffer.find(e)
                        if index >= 0:
                            endIndex = index + len(e)
                            data = self.__buffer[:endIndex]
                            self.__buffer = self.__buffer[endIndex:]
                            break
                    break
                if size is not None and len(self.__buffer) >= size:
                    data = self.__buffer[:size]
                    self.__buffer = self.__buffer[size:]
                    break
                if t.msecsTo(QTime.currentTime()) > timeout:
                    self.__timedOut = True
                    data = self.__buffer
                    self.__buffer = b""
                    break

        return data

eric ide

mercurial