diff -r 799196d0b05d -r 12ebd3934fef eric7/E5Network/E5XmlRpcClient.py --- a/eric7/E5Network/E5XmlRpcClient.py Sat May 22 12:54:57 2021 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,139 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (c) 2015 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> -# - -""" -Module implementing a xmlrpc client for Qt. -""" - -import xmlrpc.client as xmlrpc - -from PyQt6.QtCore import Qt, QObject, QUrl, QByteArray, QEventLoop -from PyQt6.QtGui import QGuiApplication, QCursor -from PyQt6.QtNetwork import ( - QNetworkAccessManager, QNetworkRequest, QNetworkReply -) - -from E5Network.E5NetworkProxyFactory import proxyAuthenticationRequired -try: - from E5Network.E5SslErrorHandler import E5SslErrorHandler, E5SslErrorState - SSL_AVAILABLE = True -except ImportError: - SSL_AVAILABLE = False - - -class E5XmlRpcClient(QObject): - """ - Class implementing a xmlrpc client for Qt. - """ - def __init__(self, url, parent=None): - """ - Constructor - - @param url xmlrpc handler URL (string or QUrl) - @param parent parent object (QObject) - """ - super().__init__(parent) - - # attributes for the network objects - self.__networkManager = QNetworkAccessManager(self) - self.__networkManager.proxyAuthenticationRequired.connect( - proxyAuthenticationRequired) - self.__networkManager.finished.connect(self.__replyFinished) - if SSL_AVAILABLE: - self.__sslErrorHandler = E5SslErrorHandler(self) - self.__networkManager.sslErrors.connect(self.__sslErrors) - - self.__callmap = {} - - self.__request = QNetworkRequest(QUrl(url)) - self.__request.setRawHeader(b"User-Agent", b"E5XmlRpcClient/1.0") - self.__request.setHeader( - QNetworkRequest.KnownHeaders.ContentTypeHeader, "text/xml") - - def setUrl(self, url): - """ - Public slot to set the xmlrpc handler URL. - - @param url xmlrpc handler URL (string or QUrl) - """ - url = QUrl(url) - if url.isValid(): - self.__request.setUrl(url) - - def call(self, method, args, responseCallback, errorCallback): - """ - Public method to call the remote server. - - @param method name of the remote method to be called (string) - @param args tuple of method arguments (tuple) - @param responseCallback method to be called with the returned - result as a tuple (function) - @param errorCallback method to be called in case of an error - with error code and error string (function) - @exception TypeError raised to indicate an illegal 'args' parameter - type - """ - if not isinstance(args, tuple): - raise TypeError("argument 'args' must be tuple") - - QGuiApplication.setOverrideCursor(QCursor(Qt.CursorShape.WaitCursor)) - QGuiApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) - - data = xmlrpc.dumps(args, method).encode("utf-8") - reply = self.__networkManager.post( - self.__request, QByteArray(data)) - self.__callmap[reply] = (responseCallback, errorCallback) - - def abort(self): - """ - Public method to abort all calls. - """ - for reply in list(self.__callmap): - if reply.isRunning(): - reply.abort() - - def __sslErrors(self, reply, errors): - """ - Private slot to handle SSL errors. - - @param reply reference to the reply object (QNetworkReply) - @param errors list of SSL errors (list of QSslError) - """ - QGuiApplication.restoreOverrideCursor() - QGuiApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) - - ignored = self.__sslErrorHandler.sslErrorsReply(reply, errors)[0] - if ignored == E5SslErrorState.NOT_IGNORED and reply in self.__callmap: - self.__callmap[reply][1](xmlrpc.TRANSPORT_ERROR, self.tr( - "SSL Error")) - - def __replyFinished(self, reply): - """ - Private slot handling the receipt of a reply. - - @param reply reference to the finished reply (QNetworkReply) - """ - if reply not in self.__callmap: - return - - QGuiApplication.restoreOverrideCursor() - QGuiApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) - - if reply.error() != QNetworkReply.NetworkError.NoError: - self.__callmap[reply][1](xmlrpc.TRANSPORT_ERROR, - reply.errorString()) - else: - data = bytes(reply.readAll()).decode("utf-8") - try: - data = xmlrpc.loads(data)[0] - self.__callmap[reply][0](data) - except xmlrpc.Fault as fault: - self.__callmap[reply][1](fault.faultCode, fault.faultString) - - reply.deleteLater() - del self.__callmap[reply]