src/eric7/WebBrowser/AdBlock/AdBlockManager.py

Wed, 13 Jul 2022 14:55:47 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Wed, 13 Jul 2022 14:55:47 +0200
branch
eric7
changeset 9221
bf71ee032bb4
parent 9209
b99e7fd55fd3
child 9413
80c06d472826
permissions
-rw-r--r--

Reformatted the source code using the 'Black' utility.

# -*- coding: utf-8 -*-

# Copyright (c) 2009 - 2022 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the AdBlock manager.
"""

import os
import contextlib

from PyQt6.QtCore import pyqtSignal, QObject, QUrl, QUrlQuery, QByteArray, QMutex
from PyQt6.QtWebEngineCore import QWebEngineUrlRequestInfo

from EricWidgets import EricMessageBox

from EricUtilities.EricMutexLocker import EricMutexLocker

from .AdBlockSubscription import AdBlockSubscription
from .AdBlockUrlInterceptor import AdBlockUrlInterceptor
from .AdBlockMatcher import AdBlockMatcher

from Utilities.AutoSaver import AutoSaver
import Utilities
import Preferences


class AdBlockManager(QObject):
    """
    Class implementing the AdBlock manager.

    @signal rulesChanged() emitted after some rule has changed
    @signal requiredSubscriptionLoaded(subscription) emitted to indicate
        loading of a required subscription is finished (AdBlockSubscription)
    @signal enabledChanged(enabled) emitted to indicate a change of the
        enabled state
    """

    rulesChanged = pyqtSignal()
    requiredSubscriptionLoaded = pyqtSignal(AdBlockSubscription)
    enabledChanged = pyqtSignal(bool)

    def __init__(self, parent=None):
        """
        Constructor

        @param parent reference to the parent object
        @type QObject
        """
        super().__init__(parent)

        self.__loaded = False
        self.__subscriptionsLoaded = False
        self.__enabled = False
        self.__adBlockDialog = None
        self.__adBlockExceptionsDialog = None
        self.__adBlockNetwork = None
        self.__adBlockPage = None
        self.__subscriptions = []
        self.__exceptedHosts = Preferences.getWebBrowser("AdBlockExceptions")
        self.__saveTimer = AutoSaver(self, self.save)
        self.__limitedEasyList = Preferences.getWebBrowser("AdBlockUseLimitedEasyList")

        self.__defaultSubscriptionUrlString = (
            "abp:subscribe?location="
            "https://easylist-downloads.adblockplus.org/easylist.txt&"
            "title=EasyList"
        )
        self.__additionalDefaultSubscriptionUrlStrings = (
            "abp:subscribe?location=https://raw.githubusercontent.com/"
            "hoshsadiq/adblock-nocoin-list/master/nocoin.txt&"
            "title=NoCoin",
        )
        self.__customSubscriptionUrlString = bytes(
            self.__customSubscriptionUrl().toEncoded()
        ).decode()

        self.__mutex = QMutex()
        self.__matcher = AdBlockMatcher(self)

        self.rulesChanged.connect(self.__saveTimer.changeOccurred)
        self.rulesChanged.connect(self.__rulesChanged)

        self.__interceptor = AdBlockUrlInterceptor(self)

        from WebBrowser.WebBrowserWindow import WebBrowserWindow

        WebBrowserWindow.networkManager().installUrlInterceptor(self.__interceptor)

    def __rulesChanged(self):
        """
        Private slot handling a change of the AdBlock rules.
        """
        from WebBrowser.WebBrowserWindow import WebBrowserWindow

        WebBrowserWindow.mainWindow().reloadUserStyleSheet()
        self.__updateMatcher()

    def close(self):
        """
        Public method to close the open search engines manager.
        """
        self.__adBlockDialog and self.__adBlockDialog.close()
        (self.__adBlockExceptionsDialog and self.__adBlockExceptionsDialog.close())

        self.__saveTimer.saveIfNeccessary()

    def isEnabled(self):
        """
        Public method to check, if blocking ads is enabled.

        @return flag indicating the enabled state
        @rtype bool
        """
        if not self.__loaded:
            self.load()

        return self.__enabled

    def setEnabled(self, enabled):
        """
        Public slot to set the enabled state.

        @param enabled flag indicating the enabled state
        @type bool
        """
        if self.isEnabled() == enabled:
            return

        from WebBrowser.WebBrowserWindow import WebBrowserWindow

        self.__enabled = enabled
        for mainWindow in WebBrowserWindow.mainWindows():
            mainWindow.adBlockIcon().setEnabled(enabled)
        if enabled:
            self.__loadSubscriptions()

        self.rulesChanged.emit()
        self.enabledChanged.emit(enabled)

    def block(self, info):
        """
        Public method to check, if a request should be blocked.

        @param info request info object
        @type QWebEngineUrlRequestInfo
        @return flag indicating to block the request
        @rtype bool
        """
        with EricMutexLocker(self.__mutex):
            if not self.isEnabled():
                return False

            urlString = bytes(info.requestUrl().toEncoded()).decode().lower()
            urlDomain = info.requestUrl().host().lower()
            urlScheme = info.requestUrl().scheme().lower()

            if not self.canRunOnScheme(urlScheme) or not self.__canBeBlocked(
                info.firstPartyUrl()
            ):
                return False

            res = False
            blockedRule = self.__matcher.match(info, urlDomain, urlString)

            if blockedRule:
                res = True
                if (
                    info.resourceType()
                    == QWebEngineUrlRequestInfo.ResourceType.ResourceTypeMainFrame
                ):
                    url = QUrl("eric:adblock")
                    query = QUrlQuery()
                    query.addQueryItem("rule", blockedRule.filter())
                    query.addQueryItem(
                        "subscription", blockedRule.subscription().title()
                    )
                    url.setQuery(query)
                    info.redirect(url)
                else:
                    info.block(True)

            return res

    def canRunOnScheme(self, scheme):
        """
        Public method to check, if AdBlock can be performed on the scheme.

        @param scheme scheme to check
        @type str
        @return flag indicating, that AdBlock can be performed
        @rtype bool
        """
        return scheme not in ["data", "eric", "qthelp", "qrc", "file", "abp"]

    def page(self):
        """
        Public method to get a reference to the page block object.

        @return reference to the page block object
        @rtype AdBlockPage
        """
        if self.__adBlockPage is None:
            from .AdBlockPage import AdBlockPage

            self.__adBlockPage = AdBlockPage(self)
        return self.__adBlockPage

    def __customSubscriptionLocation(self):
        """
        Private method to generate the path for custom subscriptions.

        @return URL for custom subscriptions
        @rtype QUrl
        """
        dataDir = os.path.join(Utilities.getConfigDir(), "web_browser", "subscriptions")
        if not os.path.exists(dataDir):
            os.makedirs(dataDir)
        fileName = os.path.join(dataDir, "adblock_subscription_custom")
        return QUrl.fromLocalFile(fileName)

    def __customSubscriptionUrl(self):
        """
        Private method to generate the URL for custom subscriptions.

        @return URL for custom subscriptions
        @rtype QUrl
        """
        location = self.__customSubscriptionLocation()
        encodedUrl = bytes(location.toEncoded()).decode()
        url = QUrl(
            "abp:subscribe?location={0}&title={1}".format(
                encodedUrl, self.tr("Custom Rules")
            )
        )
        return url

    def customRules(self):
        """
        Public method to get a subscription for custom rules.

        @return subscription object for custom rules
        @rtype AdBlockSubscription
        """
        location = self.__customSubscriptionLocation()
        for subscription in self.__subscriptions:
            if subscription.location() == location:
                return subscription

        url = self.__customSubscriptionUrl()
        customAdBlockSubscription = AdBlockSubscription(url, True, self)
        self.addSubscription(customAdBlockSubscription)
        return customAdBlockSubscription

    def subscriptions(self):
        """
        Public method to get all subscriptions.

        @return list of subscriptions
        @rtype list of AdBlockSubscription
        """
        if not self.__loaded:
            self.load()

        return self.__subscriptions[:]

    def subscription(self, location):
        """
        Public method to get a subscription based on its location.

        @param location location of the subscription to search for
        @type str
        @return subscription or None
        @rtype AdBlockSubscription
        """
        if location != "":
            for subscription in self.__subscriptions:
                if subscription.location().toString() == location:
                    return subscription

        return None

    def updateAllSubscriptions(self):
        """
        Public method to update all subscriptions.
        """
        for subscription in self.__subscriptions:
            subscription.updateNow()

    def removeSubscription(self, subscription, emitSignal=True):
        """
        Public method to remove an AdBlock subscription.

        @param subscription AdBlock subscription to be removed
        @type AdBlockSubscription
        @param emitSignal flag indicating to send a signal
        @type bool
        """
        if subscription is None:
            return

        if (
            subscription.url()
            .toString()
            .startswith(
                (
                    self.__defaultSubscriptionUrlString,
                    self.__customSubscriptionUrlString,
                )
            )
        ):
            return

        with contextlib.suppress(ValueError):
            self.__subscriptions.remove(subscription)
            rulesFileName = subscription.rulesFileName()
            os.unlink(rulesFileName)
            requiresSubscriptions = self.getRequiresSubscriptions(subscription)
            for requiresSubscription in requiresSubscriptions:
                self.removeSubscription(requiresSubscription, False)
            if emitSignal:
                self.rulesChanged.emit()

    def addSubscriptionFromUrl(self, url):
        """
        Public method to ad an AdBlock subscription given the abp URL.

        @param url URL to subscribe an AdBlock subscription
        @type QUrl
        @return flag indicating success
        @rtype bool
        """
        if url.path() != "subscribe":
            return False

        title = QUrl.fromPercentEncoding(
            QByteArray(QUrlQuery(url).queryItemValue("title").encode())
        )
        if not title:
            return False

        res = EricMessageBox.yesNo(
            None,
            self.tr("Subscribe?"),
            self.tr(
                """<p>Subscribe to this AdBlock subscription?</p>""" """<p>{0}</p>"""
            ).format(title),
        )
        if res:
            from .AdBlockSubscription import AdBlockSubscription
            from WebBrowser.WebBrowserWindow import WebBrowserWindow

            dlg = WebBrowserWindow.adBlockManager().showDialog()
            subscription = AdBlockSubscription(
                url, False, WebBrowserWindow.adBlockManager()
            )
            WebBrowserWindow.adBlockManager().addSubscription(subscription)
            dlg.addSubscription(subscription, False)
            dlg.setFocus()
            dlg.raise_()

        return res

    def addSubscription(self, subscription):
        """
        Public method to add an AdBlock subscription.

        @param subscription AdBlock subscription to be added
        @type AdBlockSubscription
        """
        if subscription is None:
            return

        self.__subscriptions.insert(-1, subscription)

        subscription.rulesChanged.connect(self.rulesChanged)
        subscription.changed.connect(self.rulesChanged)
        subscription.enabledChanged.connect(self.rulesChanged)

        self.rulesChanged.emit()

    def save(self):
        """
        Public method to save the AdBlock subscriptions.
        """
        if not self.__loaded:
            return

        Preferences.setWebBrowser("AdBlockEnabled", self.__enabled)
        if self.__subscriptionsLoaded:
            subscriptions = []
            requiresSubscriptions = []
            # intermediate store for subscription requiring others
            for subscription in self.__subscriptions:
                if subscription is None:
                    continue
                urlString = bytes(subscription.url().toEncoded()).decode()
                if "requiresLocation" in urlString:
                    requiresSubscriptions.append(urlString)
                else:
                    subscriptions.append(urlString)
                subscription.saveRules()
            for subscription in requiresSubscriptions:
                subscriptions.insert(-1, subscription)  # custom should be last
            Preferences.setWebBrowser("AdBlockSubscriptions", subscriptions)

    def load(self):
        """
        Public method to load the AdBlock subscriptions.
        """
        if self.__loaded:
            return

        self.__loaded = True

        self.__enabled = Preferences.getWebBrowser("AdBlockEnabled")
        if self.__enabled:
            self.__loadSubscriptions()

    def __loadSubscriptions(self):
        """
        Private method to load the set of subscriptions.
        """
        if self.__subscriptionsLoaded:
            return

        subscriptions = Preferences.getWebBrowser("AdBlockSubscriptions")
        if subscriptions:
            for subscription in subscriptions:
                if subscription.startswith(self.__customSubscriptionUrlString):
                    break
            else:
                subscriptions.append(self.__customSubscriptionUrlString)
        else:
            subscriptions = (
                [self.__defaultSubscriptionUrlString]
                + list(self.__additionalDefaultSubscriptionUrlStrings)
                + [self.__customSubscriptionUrlString]
            )
        for subscription in subscriptions:
            url = QUrl.fromEncoded(subscription.encode("utf-8"))
            adBlockSubscription = AdBlockSubscription(
                url,
                subscription.startswith(self.__customSubscriptionUrlString),
                self,
                subscription.startswith(self.__defaultSubscriptionUrlString),
            )
            adBlockSubscription.rulesChanged.connect(self.rulesChanged)
            adBlockSubscription.changed.connect(self.rulesChanged)
            adBlockSubscription.enabledChanged.connect(self.rulesChanged)
            adBlockSubscription.rulesEnabledChanged.connect(self.__updateMatcher)
            adBlockSubscription.rulesEnabledChanged.connect(
                self.__saveTimer.changeOccurred
            )
            self.__subscriptions.append(adBlockSubscription)

        self.__subscriptionsLoaded = True

        self.__updateMatcher()

    def loadRequiredSubscription(self, location, title):
        """
        Public method to load a subscription required by another one.

        @param location location of the required subscription
        @type str
        @param title title of the required subscription
        @type str
        """
        # Step 1: check, if the subscription is in the list of subscriptions
        urlString = "abp:subscribe?location={0}&title={1}".format(location, title)
        for subscription in self.__subscriptions:
            if subscription.url().toString().startswith(urlString):
                # We found it!
                return

        # Step 2: if it is not, get it
        url = QUrl.fromEncoded(urlString.encode("utf-8"))
        adBlockSubscription = AdBlockSubscription(url, False, self)
        self.addSubscription(adBlockSubscription)
        self.requiredSubscriptionLoaded.emit(adBlockSubscription)

    def getRequiresSubscriptions(self, subscription):
        """
        Public method to get a list of subscriptions, that require the given
        one.

        @param subscription subscription to check for
        @type AdBlockSubscription
        @return list of subscription requiring the given one
        @rtype list of AdBlockSubscription
        """
        subscriptions = []
        location = subscription.location().toString()
        for subscription in self.__subscriptions:
            if subscription.requiresLocation() == location:
                subscriptions.append(subscription)

        return subscriptions

    def showDialog(self):
        """
        Public slot to show the AdBlock subscription management dialog.

        @return reference to the dialog
        @rtype AdBlockDialog
        """
        if self.__adBlockDialog is None:
            from .AdBlockDialog import AdBlockDialog

            self.__adBlockDialog = AdBlockDialog(self)

        self.__adBlockDialog.show()
        return self.__adBlockDialog

    def elementHidingRules(self, url):
        """
        Public method to get the element hiding rules.


        @param url URL to get hiding rules for
        @type QUrl
        @return element hiding rules
        @rtype str
        """
        if (
            not self.isEnabled()
            or not self.canRunOnScheme(url.scheme())
            or not self.__canBeBlocked(url)
        ):
            return ""

        return self.__matcher.elementHidingRules()

    def elementHidingRulesForDomain(self, url):
        """
        Public method to get the element hiding rules for a domain.

        @param url URL to get hiding rules for
        @type QUrl
        @return element hiding rules
        @rtype str
        """
        if (
            not self.isEnabled()
            or not self.canRunOnScheme(url.scheme())
            or not self.__canBeBlocked(url)
        ):
            return ""

        return self.__matcher.elementHidingRulesForDomain(url.host())

    def exceptions(self):
        """
        Public method to get a list of excepted hosts.

        @return list of excepted hosts
        @rtype list of str
        """
        return self.__exceptedHosts

    def setExceptions(self, hosts):
        """
        Public method to set the list of excepted hosts.

        @param hosts list of excepted hosts
        @type list of str
        """
        self.__exceptedHosts = [host.lower() for host in hosts]
        Preferences.setWebBrowser("AdBlockExceptions", self.__exceptedHosts)

    def addException(self, host):
        """
        Public method to add an exception.

        @param host to be excepted
        @type str
        """
        host = host.lower()
        if host and host not in self.__exceptedHosts:
            self.__exceptedHosts.append(host)
            Preferences.setWebBrowser("AdBlockExceptions", self.__exceptedHosts)

    def removeException(self, host):
        """
        Public method to remove an exception.

        @param host to be removed from the list of exceptions
        @type str
        """
        host = host.lower()
        if host in self.__exceptedHosts:
            self.__exceptedHosts.remove(host)
            Preferences.setWebBrowser("AdBlockExceptions", self.__exceptedHosts)

    def isHostExcepted(self, host):
        """
        Public slot to check, if a host is excepted.

        @param host host to check
        @type str
        @return flag indicating an exception
        @rtype bool
        """
        host = host.lower()
        return host in self.__exceptedHosts

    def showExceptionsDialog(self):
        """
        Public method to show the AdBlock Exceptions dialog.

        @return reference to the exceptions dialog
        @rtype AdBlockExceptionsDialog
        """
        if self.__adBlockExceptionsDialog is None:
            from .AdBlockExceptionsDialog import AdBlockExceptionsDialog

            self.__adBlockExceptionsDialog = AdBlockExceptionsDialog()

        self.__adBlockExceptionsDialog.load(self.__exceptedHosts)
        self.__adBlockExceptionsDialog.show()
        return self.__adBlockExceptionsDialog

    def useLimitedEasyList(self):
        """
        Public method to test, if limited EasyList rules shall be used.

        @return flag indicating limited EasyList rules
        @rtype bool
        """
        return self.__limitedEasyList

    def setUseLimitedEasyList(self, limited):
        """
        Public method to set the limited EasyList flag.

        @param limited flag indicating to use limited EasyList
        @type bool
        """
        self.__limitedEasyList = limited

        for subscription in self.__subscriptions:
            if (
                subscription.url()
                .toString()
                .startswith(self.__defaultSubscriptionUrlString)
            ):
                subscription.updateNow()

        Preferences.setWebBrowser("AdBlockUseLimitedEasyList", limited)

    def getDefaultSubscriptionUrl(self):
        """
        Public method to get the default subscription URL.

        @return default subscription URL
        @rtype str
        """
        return self.__defaultSubscriptionUrlString

    def __updateMatcher(self):
        """
        Private slot to update the adblock matcher.
        """
        from WebBrowser.WebBrowserWindow import WebBrowserWindow

        WebBrowserWindow.networkManager().removeUrlInterceptor(self.__interceptor)

        if self.__enabled:
            self.__matcher.update()
        else:
            self.__matcher.clear()

        WebBrowserWindow.networkManager().installUrlInterceptor(self.__interceptor)

    def __canBeBlocked(self, url):
        """
        Private method to check, if the given URL could be blocked (i.e. is
        not whitelisted).

        @param url URL to be checked
        @type QUrl
        @return flag indicating that the given URL can be blocked
        @rtype bool
        """
        return not self.__matcher.adBlockDisabledForUrl(url)

eric ide

mercurial