eric6/WebBrowser/AdBlock/AdBlockManager.py

Wed, 30 Dec 2020 11:00:44 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Wed, 30 Dec 2020 11:00:44 +0100
branch
maintenance
changeset 7924
8a96736d465e
parent 7781
607a6098cb44
child 8143
2c730d5fd177
permissions
-rw-r--r--

Updated copyright for 2021.

# -*- coding: utf-8 -*-

# Copyright (c) 2009 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the AdBlock manager.
"""

import os

from PyQt5.QtCore import (
    pyqtSignal, QObject, QUrl, QUrlQuery, QFile, QByteArray, QMutex
)
from PyQt5.QtWebEngineCore import QWebEngineUrlRequestInfo

from E5Gui import E5MessageBox

from E5Utilities.E5MutexLocker import E5MutexLocker

from .AdBlockSubscription import AdBlockSubscription
from .AdBlockUrlInterceptor import AdBlockUrlInterceptor
from .AdBlockMatcher import AdBlockMatcher

from Utilities.AutoSaver import AutoSaver
import Utilities
import Preferences


class AdBlockManager(QObject):
    """
    Class implementing the AdBlock manager.
    
    @signal rulesChanged() emitted after some rule has changed
    @signal requiredSubscriptionLoaded(subscription) emitted to indicate
        loading of a required subscription is finished (AdBlockSubscription)
    @signal enabledChanged(enabled) emitted to indicate a change of the
        enabled state
    """
    rulesChanged = pyqtSignal()
    requiredSubscriptionLoaded = pyqtSignal(AdBlockSubscription)
    enabledChanged = pyqtSignal(bool)
    
    def __init__(self, parent=None):
        """
        Constructor
        
        @param parent reference to the parent object
        @type QObject
        """
        super(AdBlockManager, self).__init__(parent)
        
        self.__loaded = False
        self.__subscriptionsLoaded = False
        self.__enabled = False
        self.__adBlockDialog = None
        self.__adBlockExceptionsDialog = None
        self.__adBlockNetwork = None
        self.__adBlockPage = None
        self.__subscriptions = []
        self.__exceptedHosts = Preferences.getWebBrowser("AdBlockExceptions")
        self.__saveTimer = AutoSaver(self, self.save)
        self.__limitedEasyList = Preferences.getWebBrowser(
            "AdBlockUseLimitedEasyList")
        
        self.__defaultSubscriptionUrlString = (
            "abp:subscribe?location="
            "https://easylist-downloads.adblockplus.org/easylist.txt&"
            "title=EasyList"
        )
        self.__additionalDefaultSubscriptionUrlStrings = (
            "abp:subscribe?location=https://raw.githubusercontent.com/"
            "hoshsadiq/adblock-nocoin-list/master/nocoin.txt&"
            "title=NoCoin",
        )
        self.__customSubscriptionUrlString = (
            bytes(self.__customSubscriptionUrl().toEncoded()).decode()
        )
        
        self.__mutex = QMutex()
        self.__matcher = AdBlockMatcher(self)
        
        self.rulesChanged.connect(self.__saveTimer.changeOccurred)
        self.rulesChanged.connect(self.__rulesChanged)
        
        self.__interceptor = AdBlockUrlInterceptor(self)
        
        from WebBrowser.WebBrowserWindow import WebBrowserWindow
        WebBrowserWindow.networkManager().installUrlInterceptor(
            self.__interceptor)
    
    def __rulesChanged(self):
        """
        Private slot handling a change of the AdBlock rules.
        """
        from WebBrowser.WebBrowserWindow import WebBrowserWindow
        WebBrowserWindow.mainWindow().reloadUserStyleSheet()
        self.__updateMatcher()
    
    def close(self):
        """
        Public method to close the open search engines manager.
        """
        self.__adBlockDialog and self.__adBlockDialog.close()
        (self.__adBlockExceptionsDialog and
         self.__adBlockExceptionsDialog.close())
        
        self.__saveTimer.saveIfNeccessary()
    
    def isEnabled(self):
        """
        Public method to check, if blocking ads is enabled.
        
        @return flag indicating the enabled state
        @rtype bool
        """
        if not self.__loaded:
            self.load()
        
        return self.__enabled
    
    def setEnabled(self, enabled):
        """
        Public slot to set the enabled state.
        
        @param enabled flag indicating the enabled state
        @type bool
        """
        if self.isEnabled() == enabled:
            return
        
        from WebBrowser.WebBrowserWindow import WebBrowserWindow
        self.__enabled = enabled
        for mainWindow in WebBrowserWindow.mainWindows():
            mainWindow.adBlockIcon().setEnabled(enabled)
        if enabled:
            self.__loadSubscriptions()
        
        self.rulesChanged.emit()
        self.enabledChanged.emit(enabled)
    
    def block(self, info):
        """
        Public method to check, if a request should be blocked.
        
        @param info request info object
        @type QWebEngineUrlRequestInfo
        @return flag indicating to block the request
        @rtype bool
        """
        with E5MutexLocker(self.__mutex):
            if not self.isEnabled():
                return False
            
            urlString = bytes(info.requestUrl().toEncoded()).decode().lower()
            urlDomain = info.requestUrl().host().lower()
            urlScheme = info.requestUrl().scheme().lower()
            
            if (
                not self.canRunOnScheme(urlScheme) or
                not self.__canBeBlocked(info.firstPartyUrl())
            ):
                return False
            
            res = False
            blockedRule = self.__matcher.match(info, urlDomain, urlString)
            
            if blockedRule:
                res = True
                if (
                    info.resourceType() ==
                        QWebEngineUrlRequestInfo.ResourceTypeMainFrame
                ):
                    url = QUrl("eric:adblock")
                    query = QUrlQuery()
                    query.addQueryItem("rule", blockedRule.filter())
                    query.addQueryItem(
                        "subscription", blockedRule.subscription().title())
                    url.setQuery(query)
                    info.redirect(url)
                else:
                    info.block(True)
            
            return res
    
    def canRunOnScheme(self, scheme):
        """
        Public method to check, if AdBlock can be performed on the scheme.
        
        @param scheme scheme to check
        @type str
        @return flag indicating, that AdBlock can be performed
        @rtype bool
        """
        return scheme not in ["data", "eric", "qthelp", "qrc", "file", "abp"]
    
    def page(self):
        """
        Public method to get a reference to the page block object.
        
        @return reference to the page block object
        @rtype AdBlockPage
        """
        if self.__adBlockPage is None:
            from .AdBlockPage import AdBlockPage
            self.__adBlockPage = AdBlockPage(self)
        return self.__adBlockPage
    
    def __customSubscriptionLocation(self):
        """
        Private method to generate the path for custom subscriptions.
        
        @return URL for custom subscriptions
        @rtype QUrl
        """
        dataDir = os.path.join(Utilities.getConfigDir(), "web_browser",
                               "subscriptions")
        if not os.path.exists(dataDir):
            os.makedirs(dataDir)
        fileName = os.path.join(dataDir, "adblock_subscription_custom")
        return QUrl.fromLocalFile(fileName)
    
    def __customSubscriptionUrl(self):
        """
        Private method to generate the URL for custom subscriptions.
        
        @return URL for custom subscriptions
        @rtype QUrl
        """
        location = self.__customSubscriptionLocation()
        encodedUrl = bytes(location.toEncoded()).decode()
        url = QUrl("abp:subscribe?location={0}&title={1}".format(
            encodedUrl, self.tr("Custom Rules")))
        return url
    
    def customRules(self):
        """
        Public method to get a subscription for custom rules.
        
        @return subscription object for custom rules
        @rtype AdBlockSubscription
        """
        location = self.__customSubscriptionLocation()
        for subscription in self.__subscriptions:
            if subscription.location() == location:
                return subscription
        
        url = self.__customSubscriptionUrl()
        customAdBlockSubscription = AdBlockSubscription(url, True, self)
        self.addSubscription(customAdBlockSubscription)
        return customAdBlockSubscription
    
    def subscriptions(self):
        """
        Public method to get all subscriptions.
        
        @return list of subscriptions
        @rtype list of AdBlockSubscription
        """
        if not self.__loaded:
            self.load()
        
        return self.__subscriptions[:]
    
    def subscription(self, location):
        """
        Public method to get a subscription based on its location.
        
        @param location location of the subscription to search for
        @type str
        @return subscription or None
        @rtype AdBlockSubscription
        """
        if location != "":
            for subscription in self.__subscriptions:
                if subscription.location().toString() == location:
                    return subscription
        
        return None
    
    def updateAllSubscriptions(self):
        """
        Public method to update all subscriptions.
        """
        for subscription in self.__subscriptions:
            subscription.updateNow()
    
    def removeSubscription(self, subscription, emitSignal=True):
        """
        Public method to remove an AdBlock subscription.
        
        @param subscription AdBlock subscription to be removed
        @type AdBlockSubscription
        @param emitSignal flag indicating to send a signal
        @type bool
        """
        if subscription is None:
            return
        
        if subscription.url().toString().startswith(
            (self.__defaultSubscriptionUrlString,
             self.__customSubscriptionUrlString)):
            return
        
        try:
            self.__subscriptions.remove(subscription)
            rulesFileName = subscription.rulesFileName()
            QFile.remove(rulesFileName)
            requiresSubscriptions = self.getRequiresSubscriptions(subscription)
            for requiresSubscription in requiresSubscriptions:
                self.removeSubscription(requiresSubscription, False)
            if emitSignal:
                self.rulesChanged.emit()
        except ValueError:
            pass
    
    def addSubscriptionFromUrl(self, url):
        """
        Public method to ad an AdBlock subscription given the abp URL.
        
        @param url URL to subscribe an AdBlock subscription
        @type QUrl
        @return flag indicating success
        @rtype bool
        """
        if url.path() != "subscribe":
            return False
        
        title = QUrl.fromPercentEncoding(
            QByteArray(QUrlQuery(url).queryItemValue("title").encode()))
        if not title:
            return False
        
        res = E5MessageBox.yesNo(
            None,
            self.tr("Subscribe?"),
            self.tr(
                """<p>Subscribe to this AdBlock subscription?</p>"""
                """<p>{0}</p>""").format(title))
        if res:
            from .AdBlockSubscription import AdBlockSubscription
            from WebBrowser.WebBrowserWindow import WebBrowserWindow
            
            dlg = WebBrowserWindow.adBlockManager().showDialog()
            subscription = AdBlockSubscription(
                url, False,
                WebBrowserWindow.adBlockManager())
            WebBrowserWindow.adBlockManager().addSubscription(subscription)
            dlg.addSubscription(subscription, False)
            dlg.setFocus()
            dlg.raise_()
        
        return res
    
    def addSubscription(self, subscription):
        """
        Public method to add an AdBlock subscription.
        
        @param subscription AdBlock subscription to be added
        @type AdBlockSubscription
        """
        if subscription is None:
            return
        
        self.__subscriptions.insert(-1, subscription)
        
        subscription.rulesChanged.connect(self.rulesChanged)
        subscription.changed.connect(self.rulesChanged)
        subscription.enabledChanged.connect(self.rulesChanged)
        
        self.rulesChanged.emit()
    
    def save(self):
        """
        Public method to save the AdBlock subscriptions.
        """
        if not self.__loaded:
            return
        
        Preferences.setWebBrowser("AdBlockEnabled", self.__enabled)
        if self.__subscriptionsLoaded:
            subscriptions = []
            requiresSubscriptions = []
            # intermediate store for subscription requiring others
            for subscription in self.__subscriptions:
                if subscription is None:
                    continue
                urlString = bytes(subscription.url().toEncoded()).decode()
                if "requiresLocation" in urlString:
                    requiresSubscriptions.append(urlString)
                else:
                    subscriptions.append(urlString)
                subscription.saveRules()
            for subscription in requiresSubscriptions:
                subscriptions.insert(-1, subscription)  # custom should be last
            Preferences.setWebBrowser("AdBlockSubscriptions", subscriptions)
    
    def load(self):
        """
        Public method to load the AdBlock subscriptions.
        """
        if self.__loaded:
            return
        
        self.__loaded = True
        
        self.__enabled = Preferences.getWebBrowser("AdBlockEnabled")
        if self.__enabled:
            self.__loadSubscriptions()
    
    def __loadSubscriptions(self):
        """
        Private method to load the set of subscriptions.
        """
        if self.__subscriptionsLoaded:
            return
        
        subscriptions = Preferences.getWebBrowser("AdBlockSubscriptions")
        if subscriptions:
            for subscription in subscriptions:
                if subscription.startswith(self.__customSubscriptionUrlString):
                    break
            else:
                subscriptions.append(self.__customSubscriptionUrlString)
        else:
            subscriptions = (
                [self.__defaultSubscriptionUrlString] +
                self.__additionalDefaultSubscriptionUrlStrings +
                [self.__customSubscriptionUrlString]
            )
        for subscription in subscriptions:
            url = QUrl.fromEncoded(subscription.encode("utf-8"))
            adBlockSubscription = AdBlockSubscription(
                url,
                subscription.startswith(self.__customSubscriptionUrlString),
                self,
                subscription.startswith(self.__defaultSubscriptionUrlString))
            adBlockSubscription.rulesChanged.connect(self.rulesChanged)
            adBlockSubscription.changed.connect(self.rulesChanged)
            adBlockSubscription.enabledChanged.connect(self.rulesChanged)
            adBlockSubscription.rulesEnabledChanged.connect(
                self.__updateMatcher)
            adBlockSubscription.rulesEnabledChanged.connect(
                self.__saveTimer.changeOccurred)
            self.__subscriptions.append(adBlockSubscription)
        
        self.__subscriptionsLoaded = True
        
        self.__updateMatcher()
    
    def loadRequiredSubscription(self, location, title):
        """
        Public method to load a subscription required by another one.
        
        @param location location of the required subscription
        @type str
        @param title title of the required subscription
        @type str
        """
        # Step 1: check, if the subscription is in the list of subscriptions
        urlString = "abp:subscribe?location={0}&title={1}".format(
            location, title)
        for subscription in self.__subscriptions:
            if subscription.url().toString().startswith(urlString):
                # We found it!
                return
        
        # Step 2: if it is not, get it
        url = QUrl.fromEncoded(urlString.encode("utf-8"))
        adBlockSubscription = AdBlockSubscription(url, False, self)
        self.addSubscription(adBlockSubscription)
        self.requiredSubscriptionLoaded.emit(adBlockSubscription)
    
    def getRequiresSubscriptions(self, subscription):
        """
        Public method to get a list of subscriptions, that require the given
        one.
        
        @param subscription subscription to check for
        @type AdBlockSubscription
        @return list of subscription requiring the given one
        @rtype list of AdBlockSubscription
        """
        subscriptions = []
        location = subscription.location().toString()
        for subscription in self.__subscriptions:
            if subscription.requiresLocation() == location:
                subscriptions.append(subscription)
        
        return subscriptions
    
    def showDialog(self):
        """
        Public slot to show the AdBlock subscription management dialog.
        
        @return reference to the dialog
        @rtype AdBlockDialog
        """
        if self.__adBlockDialog is None:
            from .AdBlockDialog import AdBlockDialog
            self.__adBlockDialog = AdBlockDialog(self)
        
        self.__adBlockDialog.show()
        return self.__adBlockDialog
    
    def elementHidingRules(self, url):
        """
        Public method to get the element hiding rules.
        
        
        @param url URL to get hiding rules for
        @type QUrl
        @return element hiding rules
        @rtype str
        """
        if (
            not self.isEnabled() or
            not self.canRunOnScheme(url.scheme()) or
            not self.__canBeBlocked(url)
        ):
            return ""
        
        return self.__matcher.elementHidingRules()
    
    def elementHidingRulesForDomain(self, url):
        """
        Public method to get the element hiding rules for a domain.
        
        @param url URL to get hiding rules for
        @type QUrl
        @return element hiding rules
        @rtype str
        """
        if (
            not self.isEnabled() or
            not self.canRunOnScheme(url.scheme()) or
            not self.__canBeBlocked(url)
        ):
            return ""
        
        return self.__matcher.elementHidingRulesForDomain(url.host())
    
    def exceptions(self):
        """
        Public method to get a list of excepted hosts.
        
        @return list of excepted hosts
        @rtype list of str
        """
        return self.__exceptedHosts
    
    def setExceptions(self, hosts):
        """
        Public method to set the list of excepted hosts.
        
        @param hosts list of excepted hosts
        @type list of str
        """
        self.__exceptedHosts = [host.lower() for host in hosts]
        Preferences.setWebBrowser("AdBlockExceptions", self.__exceptedHosts)
    
    def addException(self, host):
        """
        Public method to add an exception.
        
        @param host to be excepted
        @type str
        """
        host = host.lower()
        if host and host not in self.__exceptedHosts:
            self.__exceptedHosts.append(host)
            Preferences.setWebBrowser(
                "AdBlockExceptions", self.__exceptedHosts)
    
    def removeException(self, host):
        """
        Public method to remove an exception.
        
        @param host to be removed from the list of exceptions
        @type str
        """
        host = host.lower()
        if host in self.__exceptedHosts:
            self.__exceptedHosts.remove(host)
            Preferences.setWebBrowser(
                "AdBlockExceptions", self.__exceptedHosts)
    
    def isHostExcepted(self, host):
        """
        Public slot to check, if a host is excepted.
        
        @param host host to check
        @type str
        @return flag indicating an exception
        @rtype bool
        """
        host = host.lower()
        return host in self.__exceptedHosts
    
    def showExceptionsDialog(self):
        """
        Public method to show the AdBlock Exceptions dialog.
        
        @return reference to the exceptions dialog
        @rtype AdBlockExceptionsDialog
        """
        if self.__adBlockExceptionsDialog is None:
            from .AdBlockExceptionsDialog import AdBlockExceptionsDialog
            self.__adBlockExceptionsDialog = AdBlockExceptionsDialog()
        
        self.__adBlockExceptionsDialog.load(self.__exceptedHosts)
        self.__adBlockExceptionsDialog.show()
        return self.__adBlockExceptionsDialog
    
    def useLimitedEasyList(self):
        """
        Public method to test, if limited EasyList rules shall be used.
        
        @return flag indicating limited EasyList rules
        @rtype bool
        """
        return self.__limitedEasyList
    
    def setUseLimitedEasyList(self, limited):
        """
        Public method to set the limited EasyList flag.
        
        @param limited flag indicating to use limited EasyList
        @type bool
        """
        self.__limitedEasyList = limited
        
        for subscription in self.__subscriptions:
            if subscription.url().toString().startswith(
                    self.__defaultSubscriptionUrlString):
                subscription.updateNow()
        
        Preferences.setWebBrowser("AdBlockUseLimitedEasyList", limited)
    
    def getDefaultSubscriptionUrl(self):
        """
        Public method to get the default subscription URL.
        
        @return default subscription URL
        @rtype str
        """
        return self.__defaultSubscriptionUrlString
    
    def __updateMatcher(self):
        """
        Private slot to update the adblock matcher.
        """
        from WebBrowser.WebBrowserWindow import WebBrowserWindow
        WebBrowserWindow.networkManager().removeUrlInterceptor(
            self.__interceptor)
        
        if self.__enabled:
            self.__matcher.update()
        else:
            self.__matcher.clear()
        
        WebBrowserWindow.networkManager().installUrlInterceptor(
            self.__interceptor)
    
    def __canBeBlocked(self, url):
        """
        Private method to check, if the given URL could be blocked (i.e. is
        not whitelisted).
        
        @param url URL to be checked
        @type QUrl
        @return flag indicating that the given URL can be blocked
        @rtype bool
        """
        return not self.__matcher.adBlockDisabledForUrl(url)

eric ide

mercurial