--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/eric7/WebBrowser/AdBlock/AdBlockManager.py Sat May 15 18:45:04 2021 +0200 @@ -0,0 +1,675 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2009 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the AdBlock manager. +""" + +import os +import contextlib + +from PyQt5.QtCore import ( + pyqtSignal, QObject, QUrl, QUrlQuery, QFile, QByteArray, QMutex +) +from PyQt5.QtWebEngineCore import QWebEngineUrlRequestInfo + +from E5Gui import E5MessageBox + +from E5Utilities.E5MutexLocker import E5MutexLocker + +from .AdBlockSubscription import AdBlockSubscription +from .AdBlockUrlInterceptor import AdBlockUrlInterceptor +from .AdBlockMatcher import AdBlockMatcher + +from Utilities.AutoSaver import AutoSaver +import Utilities +import Preferences + + +class AdBlockManager(QObject): + """ + Class implementing the AdBlock manager. + + @signal rulesChanged() emitted after some rule has changed + @signal requiredSubscriptionLoaded(subscription) emitted to indicate + loading of a required subscription is finished (AdBlockSubscription) + @signal enabledChanged(enabled) emitted to indicate a change of the + enabled state + """ + rulesChanged = pyqtSignal() + requiredSubscriptionLoaded = pyqtSignal(AdBlockSubscription) + enabledChanged = pyqtSignal(bool) + + def __init__(self, parent=None): + """ + Constructor + + @param parent reference to the parent object + @type QObject + """ + super().__init__(parent) + + self.__loaded = False + self.__subscriptionsLoaded = False + self.__enabled = False + self.__adBlockDialog = None + self.__adBlockExceptionsDialog = None + self.__adBlockNetwork = None + self.__adBlockPage = None + self.__subscriptions = [] + self.__exceptedHosts = Preferences.getWebBrowser("AdBlockExceptions") + self.__saveTimer = AutoSaver(self, self.save) + self.__limitedEasyList = Preferences.getWebBrowser( + "AdBlockUseLimitedEasyList") + + self.__defaultSubscriptionUrlString = ( + "abp:subscribe?location=" + "https://easylist-downloads.adblockplus.org/easylist.txt&" + "title=EasyList" + ) + self.__additionalDefaultSubscriptionUrlStrings = ( + "abp:subscribe?location=https://raw.githubusercontent.com/" + "hoshsadiq/adblock-nocoin-list/master/nocoin.txt&" + "title=NoCoin", + ) + self.__customSubscriptionUrlString = ( + bytes(self.__customSubscriptionUrl().toEncoded()).decode() + ) + + self.__mutex = QMutex() + self.__matcher = AdBlockMatcher(self) + + self.rulesChanged.connect(self.__saveTimer.changeOccurred) + self.rulesChanged.connect(self.__rulesChanged) + + self.__interceptor = AdBlockUrlInterceptor(self) + + from WebBrowser.WebBrowserWindow import WebBrowserWindow + WebBrowserWindow.networkManager().installUrlInterceptor( + self.__interceptor) + + def __rulesChanged(self): + """ + Private slot handling a change of the AdBlock rules. + """ + from WebBrowser.WebBrowserWindow import WebBrowserWindow + WebBrowserWindow.mainWindow().reloadUserStyleSheet() + self.__updateMatcher() + + def close(self): + """ + Public method to close the open search engines manager. + """ + self.__adBlockDialog and self.__adBlockDialog.close() + (self.__adBlockExceptionsDialog and + self.__adBlockExceptionsDialog.close()) + + self.__saveTimer.saveIfNeccessary() + + def isEnabled(self): + """ + Public method to check, if blocking ads is enabled. + + @return flag indicating the enabled state + @rtype bool + """ + if not self.__loaded: + self.load() + + return self.__enabled + + def setEnabled(self, enabled): + """ + Public slot to set the enabled state. + + @param enabled flag indicating the enabled state + @type bool + """ + if self.isEnabled() == enabled: + return + + from WebBrowser.WebBrowserWindow import WebBrowserWindow + self.__enabled = enabled + for mainWindow in WebBrowserWindow.mainWindows(): + mainWindow.adBlockIcon().setEnabled(enabled) + if enabled: + self.__loadSubscriptions() + + self.rulesChanged.emit() + self.enabledChanged.emit(enabled) + + def block(self, info): + """ + Public method to check, if a request should be blocked. + + @param info request info object + @type QWebEngineUrlRequestInfo + @return flag indicating to block the request + @rtype bool + """ + with E5MutexLocker(self.__mutex): + if not self.isEnabled(): + return False + + urlString = bytes(info.requestUrl().toEncoded()).decode().lower() + urlDomain = info.requestUrl().host().lower() + urlScheme = info.requestUrl().scheme().lower() + + if ( + not self.canRunOnScheme(urlScheme) or + not self.__canBeBlocked(info.firstPartyUrl()) + ): + return False + + res = False + blockedRule = self.__matcher.match(info, urlDomain, urlString) + + if blockedRule: + res = True + if ( + info.resourceType() == + QWebEngineUrlRequestInfo.ResourceType + .ResourceTypeMainFrame + ): + url = QUrl("eric:adblock") + query = QUrlQuery() + query.addQueryItem("rule", blockedRule.filter()) + query.addQueryItem( + "subscription", blockedRule.subscription().title()) + url.setQuery(query) + info.redirect(url) + else: + info.block(True) + + return res + + def canRunOnScheme(self, scheme): + """ + Public method to check, if AdBlock can be performed on the scheme. + + @param scheme scheme to check + @type str + @return flag indicating, that AdBlock can be performed + @rtype bool + """ + return scheme not in ["data", "eric", "qthelp", "qrc", "file", "abp"] + + def page(self): + """ + Public method to get a reference to the page block object. + + @return reference to the page block object + @rtype AdBlockPage + """ + if self.__adBlockPage is None: + from .AdBlockPage import AdBlockPage + self.__adBlockPage = AdBlockPage(self) + return self.__adBlockPage + + def __customSubscriptionLocation(self): + """ + Private method to generate the path for custom subscriptions. + + @return URL for custom subscriptions + @rtype QUrl + """ + dataDir = os.path.join(Utilities.getConfigDir(), "web_browser", + "subscriptions") + if not os.path.exists(dataDir): + os.makedirs(dataDir) + fileName = os.path.join(dataDir, "adblock_subscription_custom") + return QUrl.fromLocalFile(fileName) + + def __customSubscriptionUrl(self): + """ + Private method to generate the URL for custom subscriptions. + + @return URL for custom subscriptions + @rtype QUrl + """ + location = self.__customSubscriptionLocation() + encodedUrl = bytes(location.toEncoded()).decode() + url = QUrl("abp:subscribe?location={0}&title={1}".format( + encodedUrl, self.tr("Custom Rules"))) + return url + + def customRules(self): + """ + Public method to get a subscription for custom rules. + + @return subscription object for custom rules + @rtype AdBlockSubscription + """ + location = self.__customSubscriptionLocation() + for subscription in self.__subscriptions: + if subscription.location() == location: + return subscription + + url = self.__customSubscriptionUrl() + customAdBlockSubscription = AdBlockSubscription(url, True, self) + self.addSubscription(customAdBlockSubscription) + return customAdBlockSubscription + + def subscriptions(self): + """ + Public method to get all subscriptions. + + @return list of subscriptions + @rtype list of AdBlockSubscription + """ + if not self.__loaded: + self.load() + + return self.__subscriptions[:] + + def subscription(self, location): + """ + Public method to get a subscription based on its location. + + @param location location of the subscription to search for + @type str + @return subscription or None + @rtype AdBlockSubscription + """ + if location != "": + for subscription in self.__subscriptions: + if subscription.location().toString() == location: + return subscription + + return None + + def updateAllSubscriptions(self): + """ + Public method to update all subscriptions. + """ + for subscription in self.__subscriptions: + subscription.updateNow() + + def removeSubscription(self, subscription, emitSignal=True): + """ + Public method to remove an AdBlock subscription. + + @param subscription AdBlock subscription to be removed + @type AdBlockSubscription + @param emitSignal flag indicating to send a signal + @type bool + """ + if subscription is None: + return + + if subscription.url().toString().startswith( + (self.__defaultSubscriptionUrlString, + self.__customSubscriptionUrlString)): + return + + with contextlib.suppress(ValueError): + self.__subscriptions.remove(subscription) + rulesFileName = subscription.rulesFileName() + QFile.remove(rulesFileName) + requiresSubscriptions = self.getRequiresSubscriptions(subscription) + for requiresSubscription in requiresSubscriptions: + self.removeSubscription(requiresSubscription, False) + if emitSignal: + self.rulesChanged.emit() + + def addSubscriptionFromUrl(self, url): + """ + Public method to ad an AdBlock subscription given the abp URL. + + @param url URL to subscribe an AdBlock subscription + @type QUrl + @return flag indicating success + @rtype bool + """ + if url.path() != "subscribe": + return False + + title = QUrl.fromPercentEncoding( + QByteArray(QUrlQuery(url).queryItemValue("title").encode())) + if not title: + return False + + res = E5MessageBox.yesNo( + None, + self.tr("Subscribe?"), + self.tr( + """<p>Subscribe to this AdBlock subscription?</p>""" + """<p>{0}</p>""").format(title)) + if res: + from .AdBlockSubscription import AdBlockSubscription + from WebBrowser.WebBrowserWindow import WebBrowserWindow + + dlg = WebBrowserWindow.adBlockManager().showDialog() + subscription = AdBlockSubscription( + url, False, + WebBrowserWindow.adBlockManager()) + WebBrowserWindow.adBlockManager().addSubscription(subscription) + dlg.addSubscription(subscription, False) + dlg.setFocus() + dlg.raise_() + + return res + + def addSubscription(self, subscription): + """ + Public method to add an AdBlock subscription. + + @param subscription AdBlock subscription to be added + @type AdBlockSubscription + """ + if subscription is None: + return + + self.__subscriptions.insert(-1, subscription) + + subscription.rulesChanged.connect(self.rulesChanged) + subscription.changed.connect(self.rulesChanged) + subscription.enabledChanged.connect(self.rulesChanged) + + self.rulesChanged.emit() + + def save(self): + """ + Public method to save the AdBlock subscriptions. + """ + if not self.__loaded: + return + + Preferences.setWebBrowser("AdBlockEnabled", self.__enabled) + if self.__subscriptionsLoaded: + subscriptions = [] + requiresSubscriptions = [] + # intermediate store for subscription requiring others + for subscription in self.__subscriptions: + if subscription is None: + continue + urlString = bytes(subscription.url().toEncoded()).decode() + if "requiresLocation" in urlString: + requiresSubscriptions.append(urlString) + else: + subscriptions.append(urlString) + subscription.saveRules() + for subscription in requiresSubscriptions: + subscriptions.insert(-1, subscription) # custom should be last + Preferences.setWebBrowser("AdBlockSubscriptions", subscriptions) + + def load(self): + """ + Public method to load the AdBlock subscriptions. + """ + if self.__loaded: + return + + self.__loaded = True + + self.__enabled = Preferences.getWebBrowser("AdBlockEnabled") + if self.__enabled: + self.__loadSubscriptions() + + def __loadSubscriptions(self): + """ + Private method to load the set of subscriptions. + """ + if self.__subscriptionsLoaded: + return + + subscriptions = Preferences.getWebBrowser("AdBlockSubscriptions") + if subscriptions: + for subscription in subscriptions: + if subscription.startswith(self.__customSubscriptionUrlString): + break + else: + subscriptions.append(self.__customSubscriptionUrlString) + else: + subscriptions = ( + [self.__defaultSubscriptionUrlString] + + self.__additionalDefaultSubscriptionUrlStrings + + [self.__customSubscriptionUrlString] + ) + for subscription in subscriptions: + url = QUrl.fromEncoded(subscription.encode("utf-8")) + adBlockSubscription = AdBlockSubscription( + url, + subscription.startswith(self.__customSubscriptionUrlString), + self, + subscription.startswith(self.__defaultSubscriptionUrlString)) + adBlockSubscription.rulesChanged.connect(self.rulesChanged) + adBlockSubscription.changed.connect(self.rulesChanged) + adBlockSubscription.enabledChanged.connect(self.rulesChanged) + adBlockSubscription.rulesEnabledChanged.connect( + self.__updateMatcher) + adBlockSubscription.rulesEnabledChanged.connect( + self.__saveTimer.changeOccurred) + self.__subscriptions.append(adBlockSubscription) + + self.__subscriptionsLoaded = True + + self.__updateMatcher() + + def loadRequiredSubscription(self, location, title): + """ + Public method to load a subscription required by another one. + + @param location location of the required subscription + @type str + @param title title of the required subscription + @type str + """ + # Step 1: check, if the subscription is in the list of subscriptions + urlString = "abp:subscribe?location={0}&title={1}".format( + location, title) + for subscription in self.__subscriptions: + if subscription.url().toString().startswith(urlString): + # We found it! + return + + # Step 2: if it is not, get it + url = QUrl.fromEncoded(urlString.encode("utf-8")) + adBlockSubscription = AdBlockSubscription(url, False, self) + self.addSubscription(adBlockSubscription) + self.requiredSubscriptionLoaded.emit(adBlockSubscription) + + def getRequiresSubscriptions(self, subscription): + """ + Public method to get a list of subscriptions, that require the given + one. + + @param subscription subscription to check for + @type AdBlockSubscription + @return list of subscription requiring the given one + @rtype list of AdBlockSubscription + """ + subscriptions = [] + location = subscription.location().toString() + for subscription in self.__subscriptions: + if subscription.requiresLocation() == location: + subscriptions.append(subscription) + + return subscriptions + + def showDialog(self): + """ + Public slot to show the AdBlock subscription management dialog. + + @return reference to the dialog + @rtype AdBlockDialog + """ + if self.__adBlockDialog is None: + from .AdBlockDialog import AdBlockDialog + self.__adBlockDialog = AdBlockDialog(self) + + self.__adBlockDialog.show() + return self.__adBlockDialog + + def elementHidingRules(self, url): + """ + Public method to get the element hiding rules. + + + @param url URL to get hiding rules for + @type QUrl + @return element hiding rules + @rtype str + """ + if ( + not self.isEnabled() or + not self.canRunOnScheme(url.scheme()) or + not self.__canBeBlocked(url) + ): + return "" + + return self.__matcher.elementHidingRules() + + def elementHidingRulesForDomain(self, url): + """ + Public method to get the element hiding rules for a domain. + + @param url URL to get hiding rules for + @type QUrl + @return element hiding rules + @rtype str + """ + if ( + not self.isEnabled() or + not self.canRunOnScheme(url.scheme()) or + not self.__canBeBlocked(url) + ): + return "" + + return self.__matcher.elementHidingRulesForDomain(url.host()) + + def exceptions(self): + """ + Public method to get a list of excepted hosts. + + @return list of excepted hosts + @rtype list of str + """ + return self.__exceptedHosts + + def setExceptions(self, hosts): + """ + Public method to set the list of excepted hosts. + + @param hosts list of excepted hosts + @type list of str + """ + self.__exceptedHosts = [host.lower() for host in hosts] + Preferences.setWebBrowser("AdBlockExceptions", self.__exceptedHosts) + + def addException(self, host): + """ + Public method to add an exception. + + @param host to be excepted + @type str + """ + host = host.lower() + if host and host not in self.__exceptedHosts: + self.__exceptedHosts.append(host) + Preferences.setWebBrowser( + "AdBlockExceptions", self.__exceptedHosts) + + def removeException(self, host): + """ + Public method to remove an exception. + + @param host to be removed from the list of exceptions + @type str + """ + host = host.lower() + if host in self.__exceptedHosts: + self.__exceptedHosts.remove(host) + Preferences.setWebBrowser( + "AdBlockExceptions", self.__exceptedHosts) + + def isHostExcepted(self, host): + """ + Public slot to check, if a host is excepted. + + @param host host to check + @type str + @return flag indicating an exception + @rtype bool + """ + host = host.lower() + return host in self.__exceptedHosts + + def showExceptionsDialog(self): + """ + Public method to show the AdBlock Exceptions dialog. + + @return reference to the exceptions dialog + @rtype AdBlockExceptionsDialog + """ + if self.__adBlockExceptionsDialog is None: + from .AdBlockExceptionsDialog import AdBlockExceptionsDialog + self.__adBlockExceptionsDialog = AdBlockExceptionsDialog() + + self.__adBlockExceptionsDialog.load(self.__exceptedHosts) + self.__adBlockExceptionsDialog.show() + return self.__adBlockExceptionsDialog + + def useLimitedEasyList(self): + """ + Public method to test, if limited EasyList rules shall be used. + + @return flag indicating limited EasyList rules + @rtype bool + """ + return self.__limitedEasyList + + def setUseLimitedEasyList(self, limited): + """ + Public method to set the limited EasyList flag. + + @param limited flag indicating to use limited EasyList + @type bool + """ + self.__limitedEasyList = limited + + for subscription in self.__subscriptions: + if subscription.url().toString().startswith( + self.__defaultSubscriptionUrlString): + subscription.updateNow() + + Preferences.setWebBrowser("AdBlockUseLimitedEasyList", limited) + + def getDefaultSubscriptionUrl(self): + """ + Public method to get the default subscription URL. + + @return default subscription URL + @rtype str + """ + return self.__defaultSubscriptionUrlString + + def __updateMatcher(self): + """ + Private slot to update the adblock matcher. + """ + from WebBrowser.WebBrowserWindow import WebBrowserWindow + WebBrowserWindow.networkManager().removeUrlInterceptor( + self.__interceptor) + + if self.__enabled: + self.__matcher.update() + else: + self.__matcher.clear() + + WebBrowserWindow.networkManager().installUrlInterceptor( + self.__interceptor) + + def __canBeBlocked(self, url): + """ + Private method to check, if the given URL could be blocked (i.e. is + not whitelisted). + + @param url URL to be checked + @type QUrl + @return flag indicating that the given URL can be blocked + @rtype bool + """ + return not self.__matcher.adBlockDisabledForUrl(url)