Helpviewer/Network/NetworkAccessManager.py

Fri, 31 Dec 2010 15:50:33 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Fri, 31 Dec 2010 15:50:33 +0100
branch
5_0_x
changeset 792
a13346916170
parent 749
35c614c84e23
permissions
-rw-r--r--

Updated copyright notice.

# -*- coding: utf-8 -*-

# Copyright (c) 2009 - 2011 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a QNetworkAccessManager subclass.
"""

import os

from PyQt4.QtCore import *
from PyQt4.QtGui import QDialog, QMessageBox
from PyQt4.QtNetwork import QNetworkAccessManager, QNetworkRequest
try:
    from PyQt4.QtNetwork import QSslCertificate, QSslConfiguration, QSslSocket
    SSL_AVAILABLE = True
except ImportError:
    SSL_AVAILABLE = False

from E5Network.E5NetworkProxyFactory import E5NetworkProxyFactory

from UI.AuthenticationDialog import AuthenticationDialog

from Helpviewer.HelpLanguagesDialog import HelpLanguagesDialog
import Helpviewer.HelpWindow

from .NetworkProtocolUnknownErrorReply import NetworkProtocolUnknownErrorReply
from .NetworkDiskCache import NetworkDiskCache

from .QtHelpAccessHandler import QtHelpAccessHandler
from .PyrcAccessHandler import PyrcAccessHandler
from .AboutAccessHandler import AboutAccessHandler

from Helpviewer.AdBlock.AdBlockAccessHandler import AdBlockAccessHandler

import Preferences
import Utilities

class NetworkAccessManager(QNetworkAccessManager):
    """
    Class implementing a QNetworkAccessManager subclass.
    
    @signal requestCreated(QNetworkAccessManager::Operation, const QNetworkRequest&, QNetworkReply*)
        emitted after the request has been created
    """
    def __init__(self, engine, parent = None):
        """
        Constructor
        
        @param engine reference to the help engine (QHelpEngine)
        @param parent reference to the parent object (QObject)
        """
        QNetworkAccessManager.__init__(self, parent)
        
        self.__adblockNetwork = None
        
        self.__schemeHandlers = {}  # dictionary of scheme handlers
        
        self.__proxyFactory = E5NetworkProxyFactory()
        self.setProxyFactory(self.__proxyFactory)
        
        self.__setDiskCache()
        self.languagesChanged()
        
        if SSL_AVAILABLE:
            sslCfg = QSslConfiguration.defaultConfiguration()
            caList = sslCfg.caCertificates()
            caNew = QSslCertificate.fromData(Preferences.toByteArray(
                Preferences.Prefs.settings.value("Help/CaCertificates")))
            for cert in caNew:
                caList.append(cert)
            sslCfg.setCaCertificates(caList)
            QSslConfiguration.setDefaultConfiguration(sslCfg)
            
            self.connect(self, 
                SIGNAL('sslErrors(QNetworkReply *, const QList<QSslError> &)'), 
                self.__sslErrors)
        
        self.connect(self, 
            SIGNAL('proxyAuthenticationRequired(const QNetworkProxy&, QAuthenticator*)'),
            self.__proxyAuthenticationRequired)
        self.connect(self, 
            SIGNAL('authenticationRequired(QNetworkReply *, QAuthenticator *)'), 
            self.__authenticationRequired)
        
        # register scheme handlers
        self.setSchemeHandler("qthelp", QtHelpAccessHandler(engine, self))
        self.setSchemeHandler("pyrc", PyrcAccessHandler(self))
        self.setSchemeHandler("about", AboutAccessHandler(self))
        self.setSchemeHandler("abp", AdBlockAccessHandler(self))
    
    def setSchemeHandler(self, scheme, handler):
        """
        Public method to register a scheme handler.
        
        @param scheme access scheme (string)
        @param handler reference to the scheme handler object (SchemeAccessHandler)
        """
        self.__schemeHandlers[scheme] = handler
    
    def createRequest(self, op, request, outgoingData = None):
        """
        Protected method to create a request.
        
        @param op the operation to be performed (QNetworkAccessManager.Operation)
        @param request reference to the request object (QNetworkRequest)
        @param outgoingData reference to an IODevice containing data to be sent
            (QIODevice)
        @return reference to the created reply object (QNetworkReply)
        """
        scheme = request.url().scheme()
        if scheme == "https" and (not SSL_AVAILABLE or not QSslSocket.supportsSsl()):
            return NetworkProtocolUnknownErrorReply(scheme, self)
        
        if op == QNetworkAccessManager.PostOperation and outgoingData is not None:
            outgoingDataByteArray = outgoingData.peek(1024 * 1024)
            Helpviewer.HelpWindow.HelpWindow.passwordManager().post(
                request, outgoingDataByteArray)
        
        reply = None
        if scheme in self.__schemeHandlers:
            reply = self.__schemeHandlers[scheme]\
                        .createRequest(op, request, outgoingData)
        if reply is not None:
            return reply
        
        if not self.__acceptLanguage.isEmpty():
            req = QNetworkRequest(request)
            req.setRawHeader("Accept-Language", self.__acceptLanguage)
        else:
            req = request
        
        # AdBlock code
        if op == QNetworkAccessManager.GetOperation:
            if self.__adblockNetwork is None:
                self.__adblockNetwork = \
                    Helpviewer.HelpWindow.HelpWindow.adblockManager().network()
            reply = self.__adblockNetwork.block(req)
            if reply is not None:
                reply.setParent(self)
                return reply
        
        reply = QNetworkAccessManager.createRequest(self, op, req, outgoingData)
        self.emit(SIGNAL("requestCreated(QNetworkAccessManager::Operation, const QNetworkRequest&, QNetworkReply*)"), 
                  op, req, reply)
        
        return reply
    
    def __authenticationRequired(self, reply, auth):
        """
        Private slot to handle an authentication request.
        
        @param reply reference to the reply object (QNetworkReply)
        @param auth reference to the authenticator object (QAuthenticator)
        """
        urlRoot = "{0}://{1}"\
            .format(reply.url().scheme(), reply.url().authority())
        if not auth.realm():
            info = self.trUtf8("<b>Enter username and password for '{0}'</b>")\
                .format(urlRoot)
        else:
            info = self.trUtf8("<b>Enter username and password for '{0}', "
                               "realm '{1}'</b>").format(urlRoot, auth.realm())
        
        dlg = AuthenticationDialog(info, auth.user(), 
                                   Preferences.getHelp("SavePasswords"), 
                                   Preferences.getHelp("SavePasswords"))
        if Preferences.getHelp("SavePasswords"):
            username, password = \
                Helpviewer.HelpWindow.HelpWindow.passwordManager().getLogin(
                    reply.url(), auth.realm())
            if username:
                dlg.setData(username, password)
        if dlg.exec_() == QDialog.Accepted:
            username, password = dlg.getData()
            auth.setUser(username)
            auth.setPassword(password)
            if Preferences.getHelp("SavePasswords"):
                Helpviewer.HelpWindow.HelpWindow.passwordManager().setLogin(
                    reply.url(), auth.realm(), username, password)
    
    def __proxyAuthenticationRequired(self, proxy, auth):
        """
        Private slot to handle a proxy authentication request.
        
        @param proxy reference to the proxy object (QNetworkProxy)
        @param auth reference to the authenticator object (QAuthenticator)
        """
        info = self.trUtf8("<b>Connect to proxy '{0}' using:</b>")\
            .format(Qt.escape(proxy.hostName()))
        
        dlg = AuthenticationDialog(info, proxy.user(), True)
        if dlg.exec_() == QDialog.Accepted:
            username, password = dlg.getData()
            auth.setUser(username)
            auth.setPassword(password)
            if dlg.shallSave():
                Preferences.setUI("ProxyUser", username)
                Preferences.setUI("ProxyPassword", password)
                proxy.setUser(username)
                proxy.setPassword(password)
    
    def __sslErrors(self, reply, errors):
        """
        Private slot to handle SSL errors.
        
        @param reply reference to the reply object (QNetworkReply)
        @param errors list of SSL errors (list of QSslError)
        """
        caMerge = QSslCertificate.fromData(Preferences.toByteArray(
            Preferences.Prefs.settings.value("Help/CaCertificates")))
        caNew = []
        
        errorStrings = []
        for err in errors:
            if err.certificate() in caMerge:
                continue
            errorStrings.append(err.errorString())
            if not err.certificate().isNull():
                caNew.append(err.certificate())
        if not errorStrings:
            reply.ignoreSslErrors()
            return
        
        errorString = '.</li><li>'.join(errorStrings)
        ret = QMessageBox.warning(None,
            self.trUtf8("SSL Errors"),
            self.trUtf8("""<p>SSL Errors for <br /><b>{0}</b>"""
                        """<ul><li>{1}</li></ul></p>"""
                        """<p>Do you want to ignore these errors?</p>""")\
                .format(reply.url().toString(), errorString),
            QMessageBox.StandardButtons(
                QMessageBox.No | \
                QMessageBox.Yes),
            QMessageBox.No)
        
        if ret == QMessageBox.Yes:
            if len(caNew) > 0:
                certinfos = []
                for cert in caNew:
                    certinfos.append(self.__certToString(cert))
                ret = QMessageBox.question(None,
                    self.trUtf8("Certificates"),
                    self.trUtf8("""<p>Certificates:<br/>{0}<br/>"""
                                """Do you want to accept all these certificates?</p>""")\
                        .format("".join(certinfos)),
                    QMessageBox.StandardButtons(\
                        QMessageBox.No | \
                        QMessageBox.Yes),
                    QMessageBox.No)
                if ret == QMessageBox.Yes:
                    for cert in caNew:
                        caMerge.append(cert)
                    
                    sslCfg = QSslConfiguration.defaultConfiguration()
                    caList = sslCfg.caCertificates()
                    for cert in caNew:
                        caList.append(cert)
                    sslCfg.setCaCertificates(caList)
                    QSslConfiguration.setDefaultConfiguration(sslCfg)
                    reply.setSslConfiguration(sslCfg)
                    
                    pems = QByteArray()
                    for cert in caMerge:
                        pems.append(cert.toPem() + '\n')
                    Preferences.Prefs.settings.setValue("Help/CaCertificates", pems)
                else:
                    reply.abort()
                    return
            
            reply.ignoreSslErrors()
        
        else:
            reply.abort()
    
    def __certToString(self, cert):
        """
        Private method to convert a certificate to a formatted string.
        
        @param cert certificate to convert (QSslCertificate)
        @return formatted string (string)
        """
        result = "<p>"
        
        result += cert.subjectInfo(QSslCertificate.CommonName)
        
        result += self.trUtf8("<br/>Issuer: {0}")\
            .format(cert.issuerInfo(QSslCertificate.CommonName))
        
        result += self.trUtf8("<br/>Not valid before: {0}<br/>Valid Until: {1}")\
            .format(cert.effectiveDate().toString("yyyy-MM-dd"), 
                    cert.expiryDate().toString("yyyy-MM-dd"))
        
        result += "</p>"
        
        return result
    
    def preferencesChanged(self):
        """
        Public slot to signal a change of preferences.
        """
        self.__setDiskCache()
    
    def languagesChanged(self):
        """
        Public slot to (re-)load the list of accepted languages.
        """
        languages = Preferences.toList(
            Preferences.Prefs.settings.value("Help/AcceptLanguages", 
                HelpLanguagesDialog.defaultAcceptLanguages()))
        self.__acceptLanguage = HelpLanguagesDialog.httpString(languages)
    
    def __setDiskCache(self):
        """
        Private method to set the disk cache.
        """
        if NetworkDiskCache is not None:
            if Preferences.getHelp("DiskCacheEnabled"):
                diskCache = NetworkDiskCache(self)
                location = os.path.join(Utilities.getConfigDir(), "browser", 'cache')
                size = Preferences.getHelp("DiskCacheSize") * 1024 * 1024
                diskCache.setCacheDirectory(location)
                diskCache.setMaximumCacheSize(size)
            else:
                diskCache = None
            self.setCache(diskCache)

eric ide

mercurial