eric7/WebBrowser/AdBlock/AdBlockManager.py

branch
eric7
changeset 8312
800c432b34c8
parent 8243
cc717c2ae956
child 8318
962bce857696
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/eric7/WebBrowser/AdBlock/AdBlockManager.py	Sat May 15 18:45:04 2021 +0200
@@ -0,0 +1,675 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2009 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the AdBlock manager.
+"""
+
+import os
+import contextlib
+
+from PyQt5.QtCore import (
+    pyqtSignal, QObject, QUrl, QUrlQuery, QFile, QByteArray, QMutex
+)
+from PyQt5.QtWebEngineCore import QWebEngineUrlRequestInfo
+
+from E5Gui import E5MessageBox
+
+from E5Utilities.E5MutexLocker import E5MutexLocker
+
+from .AdBlockSubscription import AdBlockSubscription
+from .AdBlockUrlInterceptor import AdBlockUrlInterceptor
+from .AdBlockMatcher import AdBlockMatcher
+
+from Utilities.AutoSaver import AutoSaver
+import Utilities
+import Preferences
+
+
+class AdBlockManager(QObject):
+    """
+    Class implementing the AdBlock manager.
+    
+    @signal rulesChanged() emitted after some rule has changed
+    @signal requiredSubscriptionLoaded(subscription) emitted to indicate
+        loading of a required subscription is finished (AdBlockSubscription)
+    @signal enabledChanged(enabled) emitted to indicate a change of the
+        enabled state
+    """
+    rulesChanged = pyqtSignal()
+    requiredSubscriptionLoaded = pyqtSignal(AdBlockSubscription)
+    enabledChanged = pyqtSignal(bool)
+    
+    def __init__(self, parent=None):
+        """
+        Constructor
+        
+        @param parent reference to the parent object
+        @type QObject
+        """
+        super().__init__(parent)
+        
+        self.__loaded = False
+        self.__subscriptionsLoaded = False
+        self.__enabled = False
+        self.__adBlockDialog = None
+        self.__adBlockExceptionsDialog = None
+        self.__adBlockNetwork = None
+        self.__adBlockPage = None
+        self.__subscriptions = []
+        self.__exceptedHosts = Preferences.getWebBrowser("AdBlockExceptions")
+        self.__saveTimer = AutoSaver(self, self.save)
+        self.__limitedEasyList = Preferences.getWebBrowser(
+            "AdBlockUseLimitedEasyList")
+        
+        self.__defaultSubscriptionUrlString = (
+            "abp:subscribe?location="
+            "https://easylist-downloads.adblockplus.org/easylist.txt&"
+            "title=EasyList"
+        )
+        self.__additionalDefaultSubscriptionUrlStrings = (
+            "abp:subscribe?location=https://raw.githubusercontent.com/"
+            "hoshsadiq/adblock-nocoin-list/master/nocoin.txt&"
+            "title=NoCoin",
+        )
+        self.__customSubscriptionUrlString = (
+            bytes(self.__customSubscriptionUrl().toEncoded()).decode()
+        )
+        
+        self.__mutex = QMutex()
+        self.__matcher = AdBlockMatcher(self)
+        
+        self.rulesChanged.connect(self.__saveTimer.changeOccurred)
+        self.rulesChanged.connect(self.__rulesChanged)
+        
+        self.__interceptor = AdBlockUrlInterceptor(self)
+        
+        from WebBrowser.WebBrowserWindow import WebBrowserWindow
+        WebBrowserWindow.networkManager().installUrlInterceptor(
+            self.__interceptor)
+    
+    def __rulesChanged(self):
+        """
+        Private slot handling a change of the AdBlock rules.
+        """
+        from WebBrowser.WebBrowserWindow import WebBrowserWindow
+        WebBrowserWindow.mainWindow().reloadUserStyleSheet()
+        self.__updateMatcher()
+    
+    def close(self):
+        """
+        Public method to close the open search engines manager.
+        """
+        self.__adBlockDialog and self.__adBlockDialog.close()
+        (self.__adBlockExceptionsDialog and
+         self.__adBlockExceptionsDialog.close())
+        
+        self.__saveTimer.saveIfNeccessary()
+    
+    def isEnabled(self):
+        """
+        Public method to check, if blocking ads is enabled.
+        
+        @return flag indicating the enabled state
+        @rtype bool
+        """
+        if not self.__loaded:
+            self.load()
+        
+        return self.__enabled
+    
+    def setEnabled(self, enabled):
+        """
+        Public slot to set the enabled state.
+        
+        @param enabled flag indicating the enabled state
+        @type bool
+        """
+        if self.isEnabled() == enabled:
+            return
+        
+        from WebBrowser.WebBrowserWindow import WebBrowserWindow
+        self.__enabled = enabled
+        for mainWindow in WebBrowserWindow.mainWindows():
+            mainWindow.adBlockIcon().setEnabled(enabled)
+        if enabled:
+            self.__loadSubscriptions()
+        
+        self.rulesChanged.emit()
+        self.enabledChanged.emit(enabled)
+    
+    def block(self, info):
+        """
+        Public method to check, if a request should be blocked.
+        
+        @param info request info object
+        @type QWebEngineUrlRequestInfo
+        @return flag indicating to block the request
+        @rtype bool
+        """
+        with E5MutexLocker(self.__mutex):
+            if not self.isEnabled():
+                return False
+            
+            urlString = bytes(info.requestUrl().toEncoded()).decode().lower()
+            urlDomain = info.requestUrl().host().lower()
+            urlScheme = info.requestUrl().scheme().lower()
+            
+            if (
+                not self.canRunOnScheme(urlScheme) or
+                not self.__canBeBlocked(info.firstPartyUrl())
+            ):
+                return False
+            
+            res = False
+            blockedRule = self.__matcher.match(info, urlDomain, urlString)
+            
+            if blockedRule:
+                res = True
+                if (
+                    info.resourceType() ==
+                        QWebEngineUrlRequestInfo.ResourceType
+                        .ResourceTypeMainFrame
+                ):
+                    url = QUrl("eric:adblock")
+                    query = QUrlQuery()
+                    query.addQueryItem("rule", blockedRule.filter())
+                    query.addQueryItem(
+                        "subscription", blockedRule.subscription().title())
+                    url.setQuery(query)
+                    info.redirect(url)
+                else:
+                    info.block(True)
+            
+            return res
+    
+    def canRunOnScheme(self, scheme):
+        """
+        Public method to check, if AdBlock can be performed on the scheme.
+        
+        @param scheme scheme to check
+        @type str
+        @return flag indicating, that AdBlock can be performed
+        @rtype bool
+        """
+        return scheme not in ["data", "eric", "qthelp", "qrc", "file", "abp"]
+    
+    def page(self):
+        """
+        Public method to get a reference to the page block object.
+        
+        @return reference to the page block object
+        @rtype AdBlockPage
+        """
+        if self.__adBlockPage is None:
+            from .AdBlockPage import AdBlockPage
+            self.__adBlockPage = AdBlockPage(self)
+        return self.__adBlockPage
+    
+    def __customSubscriptionLocation(self):
+        """
+        Private method to generate the path for custom subscriptions.
+        
+        @return URL for custom subscriptions
+        @rtype QUrl
+        """
+        dataDir = os.path.join(Utilities.getConfigDir(), "web_browser",
+                               "subscriptions")
+        if not os.path.exists(dataDir):
+            os.makedirs(dataDir)
+        fileName = os.path.join(dataDir, "adblock_subscription_custom")
+        return QUrl.fromLocalFile(fileName)
+    
+    def __customSubscriptionUrl(self):
+        """
+        Private method to generate the URL for custom subscriptions.
+        
+        @return URL for custom subscriptions
+        @rtype QUrl
+        """
+        location = self.__customSubscriptionLocation()
+        encodedUrl = bytes(location.toEncoded()).decode()
+        url = QUrl("abp:subscribe?location={0}&title={1}".format(
+            encodedUrl, self.tr("Custom Rules")))
+        return url
+    
+    def customRules(self):
+        """
+        Public method to get a subscription for custom rules.
+        
+        @return subscription object for custom rules
+        @rtype AdBlockSubscription
+        """
+        location = self.__customSubscriptionLocation()
+        for subscription in self.__subscriptions:
+            if subscription.location() == location:
+                return subscription
+        
+        url = self.__customSubscriptionUrl()
+        customAdBlockSubscription = AdBlockSubscription(url, True, self)
+        self.addSubscription(customAdBlockSubscription)
+        return customAdBlockSubscription
+    
+    def subscriptions(self):
+        """
+        Public method to get all subscriptions.
+        
+        @return list of subscriptions
+        @rtype list of AdBlockSubscription
+        """
+        if not self.__loaded:
+            self.load()
+        
+        return self.__subscriptions[:]
+    
+    def subscription(self, location):
+        """
+        Public method to get a subscription based on its location.
+        
+        @param location location of the subscription to search for
+        @type str
+        @return subscription or None
+        @rtype AdBlockSubscription
+        """
+        if location != "":
+            for subscription in self.__subscriptions:
+                if subscription.location().toString() == location:
+                    return subscription
+        
+        return None
+    
+    def updateAllSubscriptions(self):
+        """
+        Public method to update all subscriptions.
+        """
+        for subscription in self.__subscriptions:
+            subscription.updateNow()
+    
+    def removeSubscription(self, subscription, emitSignal=True):
+        """
+        Public method to remove an AdBlock subscription.
+        
+        @param subscription AdBlock subscription to be removed
+        @type AdBlockSubscription
+        @param emitSignal flag indicating to send a signal
+        @type bool
+        """
+        if subscription is None:
+            return
+        
+        if subscription.url().toString().startswith(
+            (self.__defaultSubscriptionUrlString,
+             self.__customSubscriptionUrlString)):
+            return
+        
+        with contextlib.suppress(ValueError):
+            self.__subscriptions.remove(subscription)
+            rulesFileName = subscription.rulesFileName()
+            QFile.remove(rulesFileName)
+            requiresSubscriptions = self.getRequiresSubscriptions(subscription)
+            for requiresSubscription in requiresSubscriptions:
+                self.removeSubscription(requiresSubscription, False)
+            if emitSignal:
+                self.rulesChanged.emit()
+    
+    def addSubscriptionFromUrl(self, url):
+        """
+        Public method to ad an AdBlock subscription given the abp URL.
+        
+        @param url URL to subscribe an AdBlock subscription
+        @type QUrl
+        @return flag indicating success
+        @rtype bool
+        """
+        if url.path() != "subscribe":
+            return False
+        
+        title = QUrl.fromPercentEncoding(
+            QByteArray(QUrlQuery(url).queryItemValue("title").encode()))
+        if not title:
+            return False
+        
+        res = E5MessageBox.yesNo(
+            None,
+            self.tr("Subscribe?"),
+            self.tr(
+                """<p>Subscribe to this AdBlock subscription?</p>"""
+                """<p>{0}</p>""").format(title))
+        if res:
+            from .AdBlockSubscription import AdBlockSubscription
+            from WebBrowser.WebBrowserWindow import WebBrowserWindow
+            
+            dlg = WebBrowserWindow.adBlockManager().showDialog()
+            subscription = AdBlockSubscription(
+                url, False,
+                WebBrowserWindow.adBlockManager())
+            WebBrowserWindow.adBlockManager().addSubscription(subscription)
+            dlg.addSubscription(subscription, False)
+            dlg.setFocus()
+            dlg.raise_()
+        
+        return res
+    
+    def addSubscription(self, subscription):
+        """
+        Public method to add an AdBlock subscription.
+        
+        @param subscription AdBlock subscription to be added
+        @type AdBlockSubscription
+        """
+        if subscription is None:
+            return
+        
+        self.__subscriptions.insert(-1, subscription)
+        
+        subscription.rulesChanged.connect(self.rulesChanged)
+        subscription.changed.connect(self.rulesChanged)
+        subscription.enabledChanged.connect(self.rulesChanged)
+        
+        self.rulesChanged.emit()
+    
+    def save(self):
+        """
+        Public method to save the AdBlock subscriptions.
+        """
+        if not self.__loaded:
+            return
+        
+        Preferences.setWebBrowser("AdBlockEnabled", self.__enabled)
+        if self.__subscriptionsLoaded:
+            subscriptions = []
+            requiresSubscriptions = []
+            # intermediate store for subscription requiring others
+            for subscription in self.__subscriptions:
+                if subscription is None:
+                    continue
+                urlString = bytes(subscription.url().toEncoded()).decode()
+                if "requiresLocation" in urlString:
+                    requiresSubscriptions.append(urlString)
+                else:
+                    subscriptions.append(urlString)
+                subscription.saveRules()
+            for subscription in requiresSubscriptions:
+                subscriptions.insert(-1, subscription)  # custom should be last
+            Preferences.setWebBrowser("AdBlockSubscriptions", subscriptions)
+    
+    def load(self):
+        """
+        Public method to load the AdBlock subscriptions.
+        """
+        if self.__loaded:
+            return
+        
+        self.__loaded = True
+        
+        self.__enabled = Preferences.getWebBrowser("AdBlockEnabled")
+        if self.__enabled:
+            self.__loadSubscriptions()
+    
+    def __loadSubscriptions(self):
+        """
+        Private method to load the set of subscriptions.
+        """
+        if self.__subscriptionsLoaded:
+            return
+        
+        subscriptions = Preferences.getWebBrowser("AdBlockSubscriptions")
+        if subscriptions:
+            for subscription in subscriptions:
+                if subscription.startswith(self.__customSubscriptionUrlString):
+                    break
+            else:
+                subscriptions.append(self.__customSubscriptionUrlString)
+        else:
+            subscriptions = (
+                [self.__defaultSubscriptionUrlString] +
+                self.__additionalDefaultSubscriptionUrlStrings +
+                [self.__customSubscriptionUrlString]
+            )
+        for subscription in subscriptions:
+            url = QUrl.fromEncoded(subscription.encode("utf-8"))
+            adBlockSubscription = AdBlockSubscription(
+                url,
+                subscription.startswith(self.__customSubscriptionUrlString),
+                self,
+                subscription.startswith(self.__defaultSubscriptionUrlString))
+            adBlockSubscription.rulesChanged.connect(self.rulesChanged)
+            adBlockSubscription.changed.connect(self.rulesChanged)
+            adBlockSubscription.enabledChanged.connect(self.rulesChanged)
+            adBlockSubscription.rulesEnabledChanged.connect(
+                self.__updateMatcher)
+            adBlockSubscription.rulesEnabledChanged.connect(
+                self.__saveTimer.changeOccurred)
+            self.__subscriptions.append(adBlockSubscription)
+        
+        self.__subscriptionsLoaded = True
+        
+        self.__updateMatcher()
+    
+    def loadRequiredSubscription(self, location, title):
+        """
+        Public method to load a subscription required by another one.
+        
+        @param location location of the required subscription
+        @type str
+        @param title title of the required subscription
+        @type str
+        """
+        # Step 1: check, if the subscription is in the list of subscriptions
+        urlString = "abp:subscribe?location={0}&title={1}".format(
+            location, title)
+        for subscription in self.__subscriptions:
+            if subscription.url().toString().startswith(urlString):
+                # We found it!
+                return
+        
+        # Step 2: if it is not, get it
+        url = QUrl.fromEncoded(urlString.encode("utf-8"))
+        adBlockSubscription = AdBlockSubscription(url, False, self)
+        self.addSubscription(adBlockSubscription)
+        self.requiredSubscriptionLoaded.emit(adBlockSubscription)
+    
+    def getRequiresSubscriptions(self, subscription):
+        """
+        Public method to get a list of subscriptions, that require the given
+        one.
+        
+        @param subscription subscription to check for
+        @type AdBlockSubscription
+        @return list of subscription requiring the given one
+        @rtype list of AdBlockSubscription
+        """
+        subscriptions = []
+        location = subscription.location().toString()
+        for subscription in self.__subscriptions:
+            if subscription.requiresLocation() == location:
+                subscriptions.append(subscription)
+        
+        return subscriptions
+    
+    def showDialog(self):
+        """
+        Public slot to show the AdBlock subscription management dialog.
+        
+        @return reference to the dialog
+        @rtype AdBlockDialog
+        """
+        if self.__adBlockDialog is None:
+            from .AdBlockDialog import AdBlockDialog
+            self.__adBlockDialog = AdBlockDialog(self)
+        
+        self.__adBlockDialog.show()
+        return self.__adBlockDialog
+    
+    def elementHidingRules(self, url):
+        """
+        Public method to get the element hiding rules.
+        
+        
+        @param url URL to get hiding rules for
+        @type QUrl
+        @return element hiding rules
+        @rtype str
+        """
+        if (
+            not self.isEnabled() or
+            not self.canRunOnScheme(url.scheme()) or
+            not self.__canBeBlocked(url)
+        ):
+            return ""
+        
+        return self.__matcher.elementHidingRules()
+    
+    def elementHidingRulesForDomain(self, url):
+        """
+        Public method to get the element hiding rules for a domain.
+        
+        @param url URL to get hiding rules for
+        @type QUrl
+        @return element hiding rules
+        @rtype str
+        """
+        if (
+            not self.isEnabled() or
+            not self.canRunOnScheme(url.scheme()) or
+            not self.__canBeBlocked(url)
+        ):
+            return ""
+        
+        return self.__matcher.elementHidingRulesForDomain(url.host())
+    
+    def exceptions(self):
+        """
+        Public method to get a list of excepted hosts.
+        
+        @return list of excepted hosts
+        @rtype list of str
+        """
+        return self.__exceptedHosts
+    
+    def setExceptions(self, hosts):
+        """
+        Public method to set the list of excepted hosts.
+        
+        @param hosts list of excepted hosts
+        @type list of str
+        """
+        self.__exceptedHosts = [host.lower() for host in hosts]
+        Preferences.setWebBrowser("AdBlockExceptions", self.__exceptedHosts)
+    
+    def addException(self, host):
+        """
+        Public method to add an exception.
+        
+        @param host to be excepted
+        @type str
+        """
+        host = host.lower()
+        if host and host not in self.__exceptedHosts:
+            self.__exceptedHosts.append(host)
+            Preferences.setWebBrowser(
+                "AdBlockExceptions", self.__exceptedHosts)
+    
+    def removeException(self, host):
+        """
+        Public method to remove an exception.
+        
+        @param host to be removed from the list of exceptions
+        @type str
+        """
+        host = host.lower()
+        if host in self.__exceptedHosts:
+            self.__exceptedHosts.remove(host)
+            Preferences.setWebBrowser(
+                "AdBlockExceptions", self.__exceptedHosts)
+    
+    def isHostExcepted(self, host):
+        """
+        Public slot to check, if a host is excepted.
+        
+        @param host host to check
+        @type str
+        @return flag indicating an exception
+        @rtype bool
+        """
+        host = host.lower()
+        return host in self.__exceptedHosts
+    
+    def showExceptionsDialog(self):
+        """
+        Public method to show the AdBlock Exceptions dialog.
+        
+        @return reference to the exceptions dialog
+        @rtype AdBlockExceptionsDialog
+        """
+        if self.__adBlockExceptionsDialog is None:
+            from .AdBlockExceptionsDialog import AdBlockExceptionsDialog
+            self.__adBlockExceptionsDialog = AdBlockExceptionsDialog()
+        
+        self.__adBlockExceptionsDialog.load(self.__exceptedHosts)
+        self.__adBlockExceptionsDialog.show()
+        return self.__adBlockExceptionsDialog
+    
+    def useLimitedEasyList(self):
+        """
+        Public method to test, if limited EasyList rules shall be used.
+        
+        @return flag indicating limited EasyList rules
+        @rtype bool
+        """
+        return self.__limitedEasyList
+    
+    def setUseLimitedEasyList(self, limited):
+        """
+        Public method to set the limited EasyList flag.
+        
+        @param limited flag indicating to use limited EasyList
+        @type bool
+        """
+        self.__limitedEasyList = limited
+        
+        for subscription in self.__subscriptions:
+            if subscription.url().toString().startswith(
+                    self.__defaultSubscriptionUrlString):
+                subscription.updateNow()
+        
+        Preferences.setWebBrowser("AdBlockUseLimitedEasyList", limited)
+    
+    def getDefaultSubscriptionUrl(self):
+        """
+        Public method to get the default subscription URL.
+        
+        @return default subscription URL
+        @rtype str
+        """
+        return self.__defaultSubscriptionUrlString
+    
+    def __updateMatcher(self):
+        """
+        Private slot to update the adblock matcher.
+        """
+        from WebBrowser.WebBrowserWindow import WebBrowserWindow
+        WebBrowserWindow.networkManager().removeUrlInterceptor(
+            self.__interceptor)
+        
+        if self.__enabled:
+            self.__matcher.update()
+        else:
+            self.__matcher.clear()
+        
+        WebBrowserWindow.networkManager().installUrlInterceptor(
+            self.__interceptor)
+    
+    def __canBeBlocked(self, url):
+        """
+        Private method to check, if the given URL could be blocked (i.e. is
+        not whitelisted).
+        
+        @param url URL to be checked
+        @type QUrl
+        @return flag indicating that the given URL can be blocked
+        @rtype bool
+        """
+        return not self.__matcher.adBlockDisabledForUrl(url)

eric ide

mercurial