Sat, 01 May 2021 14:27:20 +0200
Merged with default branch to prepare release 21.5.
# -*- coding: utf-8 -*- # Copyright (c) 2013 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing a SSL error handler. """ import contextlib import enum import platform from PyQt5.QtCore import QObject, QByteArray from PyQt5.QtNetwork import ( QSslCertificate, QSslConfiguration, QSslSocket, QSslError, QSsl ) from E5Gui import E5MessageBox import Preferences import Utilities import Globals class E5SslErrorState(enum.Enum): """ Class defining the SSL error handling states. """ NOT_IGNORED = 0 SYSTEM_IGNORED = 1 USER_IGNORED = 2 class E5SslErrorHandler(QObject): """ Class implementing a handler for SSL errors. It also initializes the default SSL configuration with certificates permanently accepted by the user already. """ def __init__(self, parent=None): """ Constructor @param parent reference to the parent object (QObject) """ super().__init__(parent) caList = self.__getSystemCaCertificates() if Preferences.Prefs.settings.contains("Help/CaCertificatesDict"): # port old entries stored under 'Help' certificateDict = Globals.toDict( Preferences.Prefs.settings.value("Help/CaCertificatesDict")) Preferences.Prefs.settings.setValue( "Ssl/CaCertificatesDict", certificateDict) Preferences.Prefs.settings.remove("Help/CaCertificatesDict") else: certificateDict = Globals.toDict( Preferences.Prefs.settings.value("Ssl/CaCertificatesDict")) for server in certificateDict: for cert in QSslCertificate.fromData(certificateDict[server]): if cert not in caList: caList.append(cert) sslCfg = QSslConfiguration.defaultConfiguration() sslCfg.setCaCertificates(caList) try: sslProtocol = QSsl.SslProtocol.TlsV1_1OrLater if Globals.isWindowsPlatform() and platform.win32_ver()[0] == '7': sslProtocol = QSsl.SslProtocol.SecureProtocols except AttributeError: sslProtocol = QSsl.SslProtocol.SecureProtocols sslCfg.setProtocol(sslProtocol) with contextlib.suppress(AttributeError): sslCfg.setSslOption(QSsl.SslOption.SslOptionDisableCompression, True) QSslConfiguration.setDefaultConfiguration(sslCfg) def sslErrorsReplySlot(self, reply, errors): """ Public slot to handle SSL errors for a network reply. @param reply reference to the reply object (QNetworkReply) @param errors list of SSL errors (list of QSslError) """ self.sslErrorsReply(reply, errors) def sslErrorsReply(self, reply, errors): """ Public slot to handle SSL errors for a network reply. @param reply reference to the reply object (QNetworkReply) @param errors list of SSL errors (list of QSslError) @return tuple indicating to ignore the SSL errors (one of NotIgnored, SystemIgnored or UserIgnored) and indicating a change of the default SSL configuration (boolean) """ url = reply.url() ignore, defaultChanged = self.sslErrors(errors, url.host(), url.port()) if ignore: if defaultChanged: reply.setSslConfiguration( QSslConfiguration.defaultConfiguration()) reply.ignoreSslErrors() else: reply.abort() return ignore, defaultChanged def sslErrors(self, errors, server, port=-1): """ Public method to handle SSL errors. @param errors list of SSL errors @type list of QSslError @param server name of the server @type str @param port value of the port @type int @return tuple indicating to ignore the SSL errors and indicating a change of the default SSL configuration @rtype tuple of (E5SslErrorState, bool) """ caMerge = {} certificateDict = Globals.toDict( Preferences.Prefs.settings.value("Ssl/CaCertificatesDict")) for caServer in certificateDict: caMerge[caServer] = QSslCertificate.fromData( certificateDict[caServer]) caNew = [] errorStrings = [] if port != -1: server += ":{0:d}".format(port) if errors: for err in errors: if err.error() == QSslError.SslError.NoError: continue if server in caMerge and err.certificate() in caMerge[server]: continue errorStrings.append(err.errorString()) if not err.certificate().isNull(): cert = err.certificate() if cert not in caNew: caNew.append(cert) if not errorStrings: return E5SslErrorState.SYSTEM_IGNORED, False errorString = '.</li><li>'.join(errorStrings) ret = E5MessageBox.yesNo( None, self.tr("SSL Errors"), self.tr("""<p>SSL Errors for <br /><b>{0}</b>""" """<ul><li>{1}</li></ul></p>""" """<p>Do you want to ignore these errors?</p>""") .format(server, errorString), icon=E5MessageBox.Warning) if ret: caRet = False if len(caNew) > 0: certinfos = [] for cert in caNew: certinfos.append(self.__certToString(cert)) caRet = E5MessageBox.yesNo( None, self.tr("Certificates"), self.tr( """<p>Certificates:<br/>{0}<br/>""" """Do you want to accept all these certificates?""" """</p>""") .format("".join(certinfos))) if caRet: if server not in caMerge: caMerge[server] = [] for cert in caNew: caMerge[server].append(cert) sslCfg = QSslConfiguration.defaultConfiguration() caList = sslCfg.caCertificates() for cert in caNew: caList.append(cert) sslCfg.setCaCertificates(caList) try: sslCfg.setProtocol(QSsl.SslProtocol.TlsV1_1OrLater) except AttributeError: sslCfg.setProtocol(QSsl.SslProtocol.SecureProtocols) with contextlib.suppress(AttributeError): sslCfg.setSslOption( QSsl.SslOption.SslOptionDisableCompression, True) QSslConfiguration.setDefaultConfiguration(sslCfg) certificateDict = {} for server in caMerge: pems = QByteArray() for cert in caMerge[server]: pems.append(cert.toPem() + b'\n') certificateDict[server] = pems Preferences.Prefs.settings.setValue( "Ssl/CaCertificatesDict", certificateDict) return E5SslErrorState.USER_IGNORED, caRet else: return E5SslErrorState.NOT_IGNORED, False def __certToString(self, cert): """ Private method to convert a certificate to a formatted string. @param cert certificate to convert (QSslCertificate) @return formatted string (string) """ result = "<p>" result += self.tr( "Name: {0}" ).format( Utilities.html_encode( Utilities.decodeString( ", ".join(cert.subjectInfo( QSslCertificate.SubjectInfo.CommonName)) ) ) ) result += self.tr( "<br/>Organization: {0}" ).format( Utilities.html_encode( Utilities.decodeString( ", ".join(cert.subjectInfo( QSslCertificate.SubjectInfo.Organization)) ) ) ) result += self.tr( "<br/>Issuer: {0}" ).format( Utilities.html_encode( Utilities.decodeString( ", ".join(cert.issuerInfo( QSslCertificate.SubjectInfo.CommonName)) ) ) ) result += self.tr( "<br/>Not valid before: {0}<br/>Valid Until: {1}" ).format( Utilities.html_encode( cert.effectiveDate().toString("yyyy-MM-dd") ), Utilities.html_encode( cert.expiryDate().toString("yyyy-MM-dd") ) ) result += "</p>" return result def __getSystemCaCertificates(self): """ Private method to get the list of system certificates. @return list of system certificates (list of QSslCertificate) """ caList = QSslCertificate.fromData(Globals.toByteArray( Preferences.Prefs.settings.value("Ssl/SystemCertificates"))) if not caList: caList = QSslSocket.systemCaCertificates() return caList