src/eric7/WebBrowser/AdBlock/AdBlockSubscription.py

branch
eric7
changeset 9221
bf71ee032bb4
parent 9209
b99e7fd55fd3
child 9328
49a0a9cb2505
--- a/src/eric7/WebBrowser/AdBlock/AdBlockSubscription.py	Wed Jul 13 11:16:20 2022 +0200
+++ b/src/eric7/WebBrowser/AdBlock/AdBlockSubscription.py	Wed Jul 13 14:55:47 2022 +0200
@@ -13,8 +13,16 @@
 import base64
 
 from PyQt6.QtCore import (
-    pyqtSignal, Qt, QObject, QByteArray, QDateTime, QUrl, QUrlQuery,
-    QCryptographicHash, QDate, QTime
+    pyqtSignal,
+    Qt,
+    QObject,
+    QByteArray,
+    QDateTime,
+    QUrl,
+    QUrlQuery,
+    QCryptographicHash,
+    QDate,
+    QTime,
 )
 from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest
 
@@ -27,58 +35,61 @@
 class AdBlockSubscription(QObject):
     """
     Class implementing the AdBlock subscription.
-    
+
     @signal changed() emitted after the subscription has changed
     @signal rulesChanged() emitted after the subscription's rules have changed
     @signal enabledChanged(bool) emitted after the enabled state was changed
     @signal rulesEnabledChanged() emitted after a rule enabled state was
         changed
     """
+
     changed = pyqtSignal()
     rulesChanged = pyqtSignal()
     enabledChanged = pyqtSignal(bool)
     rulesEnabledChanged = pyqtSignal()
-    
+
     def __init__(self, url, custom, parent=None, default=False):
         """
         Constructor
-        
+
         @param url AdBlock URL for the subscription (QUrl)
         @param custom flag indicating a custom subscription (boolean)
         @param parent reference to the parent object (QObject)
         @param default flag indicating a default subscription (boolean)
         """
         super().__init__(parent)
-        
+
         self.__custom = custom
         self.__url = url.toEncoded()
         self.__enabled = False
         self.__downloading = None
         self.__defaultSubscription = default
-        
+
         self.__title = ""
         self.__location = QByteArray()
         self.__lastUpdate = QDateTime()
         self.__requiresLocation = ""
         self.__requiresTitle = ""
-        
-        self.__updatePeriod = 0     # update period in hours, 0 = use default
+
+        self.__updatePeriod = 0  # update period in hours, 0 = use default
         self.__remoteModified = QDateTime()
-        
-        self.__rules = []   # list containing all AdBlock rules
-        
+
+        self.__rules = []  # list containing all AdBlock rules
+
         self.__checksumRe = re.compile(
             r"""^\s*!\s*checksum[\s\-:]+([\w\+\/=]+).*\n""",
-            re.IGNORECASE | re.MULTILINE)
+            re.IGNORECASE | re.MULTILINE,
+        )
         self.__expiresRe = re.compile(
-            r"""(?:expires:|expires after)\s*(\d+)\s*(hour|h)?""",
-            re.IGNORECASE)
+            r"""(?:expires:|expires after)\s*(\d+)\s*(hour|h)?""", re.IGNORECASE
+        )
         self.__remoteModifiedRe = re.compile(
             r"""!\s*(?:Last modified|Updated):\s*(\d{1,2})\s*"""
             r"""(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s*"""
             r"""(\d{2,4})\s*((\d{1,2}):(\d{2}))?""",
-            re.IGNORECASE)
-        
+            re.IGNORECASE,
+        )
+
         self.__monthNameToNumber = {
             "Jan": 1,
             "Feb": 2,
@@ -91,60 +102,67 @@
             "Sep": 9,
             "Oct": 10,
             "Nov": 11,
-            "Dec": 12
+            "Dec": 12,
         }
-        
+
         self.__parseUrl(url)
-    
+
     def __parseUrl(self, url):
         """
         Private method to parse the AdBlock URL for the subscription.
-        
+
         @param url AdBlock URL for the subscription
         @type QUrl
         """
         if url.scheme() != "abp":
             return
-        
+
         if url.path() != "subscribe":
             return
-        
+
         urlQuery = QUrlQuery(url)
         self.__title = QUrl.fromPercentEncoding(
-            QByteArray(urlQuery.queryItemValue("title").encode()))
+            QByteArray(urlQuery.queryItemValue("title").encode())
+        )
         self.__enabled = urlQuery.queryItemValue("enabled") != "false"
-        self.__location = QByteArray(QUrl.fromPercentEncoding(
-            QByteArray(urlQuery.queryItemValue("location").encode()))
-            .encode("utf-8"))
-        
+        self.__location = QByteArray(
+            QUrl.fromPercentEncoding(
+                QByteArray(urlQuery.queryItemValue("location").encode())
+            ).encode("utf-8")
+        )
+
         # Check for required subscription
         self.__requiresLocation = QUrl.fromPercentEncoding(
-            QByteArray(urlQuery.queryItemValue(
-                "requiresLocation").encode()))
+            QByteArray(urlQuery.queryItemValue("requiresLocation").encode())
+        )
         self.__requiresTitle = QUrl.fromPercentEncoding(
-            QByteArray(urlQuery.queryItemValue("requiresTitle").encode()))
+            QByteArray(urlQuery.queryItemValue("requiresTitle").encode())
+        )
         if self.__requiresLocation and self.__requiresTitle:
             from WebBrowser.WebBrowserWindow import WebBrowserWindow
+
             WebBrowserWindow.adBlockManager().loadRequiredSubscription(
-                self.__requiresLocation, self.__requiresTitle)
-        
+                self.__requiresLocation, self.__requiresTitle
+            )
+
         lastUpdateString = urlQuery.queryItemValue("lastUpdate")
-        self.__lastUpdate = QDateTime.fromString(lastUpdateString,
-                                                 Qt.DateFormat.ISODate)
-        
+        self.__lastUpdate = QDateTime.fromString(
+            lastUpdateString, Qt.DateFormat.ISODate
+        )
+
         self.__loadRules()
-    
+
     def url(self):
         """
         Public method to generate the URL for this subscription.
-        
+
         @return AdBlock URL for the subscription
         @rtype QUrl
         """
         url = QUrl()
         url.setScheme("abp")
         url.setPath("subscribe")
-        
+
         queryItems = []
         queryItems.append(("location", bytes(self.__location).decode()))
         queryItems.append(("title", self.__title))
@@ -155,124 +173,123 @@
             queryItems.append(("enabled", "false"))
         if self.__lastUpdate.isValid():
             queryItems.append(
-                ("lastUpdate",
-                 self.__lastUpdate.toString(Qt.DateFormat.ISODate))
+                ("lastUpdate", self.__lastUpdate.toString(Qt.DateFormat.ISODate))
             )
-        
+
         query = QUrlQuery()
         query.setQueryItems(queryItems)
         url.setQuery(query)
         return url
-    
+
     def isEnabled(self):
         """
         Public method to check, if the subscription is enabled.
-        
+
         @return flag indicating the enabled status
         @rtype bool
         """
         return self.__enabled
-    
+
     def setEnabled(self, enabled):
         """
         Public method to set the enabled status.
-        
+
         @param enabled flag indicating the enabled status
         @type bool
         """
         if self.__enabled == enabled:
             return
-        
+
         self.__enabled = enabled
         self.enabledChanged.emit(enabled)
-    
+
     def title(self):
         """
         Public method to get the subscription title.
-        
+
         @return subscription title
         @rtype string
         """
         return self.__title
-    
+
     def setTitle(self, title):
         """
         Public method to set the subscription title.
-        
+
         @param title subscription title
         @type str
         """
         if self.__title == title:
             return
-        
+
         self.__title = title
         self.changed.emit()
-    
+
     def location(self):
         """
         Public method to get the subscription location.
-        
+
         @return URL of the subscription location
         @rtype QUrl
         """
         return QUrl.fromEncoded(self.__location)
-    
+
     def setLocation(self, url):
         """
         Public method to set the subscription location.
-        
+
         @param url URL of the subscription location
         @type QUrl
         """
         if url == self.location():
             return
-        
+
         self.__location = url.toEncoded()
         self.__lastUpdate = QDateTime()
         self.changed.emit()
-    
+
     def requiresLocation(self):
         """
         Public method to get the location of a required subscription.
-        
+
         @return location of a required subscription
         @rtype str
         """
         return self.__requiresLocation
-    
+
     def lastUpdate(self):
         """
         Public method to get the date and time of the last update.
-        
+
         @return date and time of the last update
         @rtype QDateTime
         """
         return self.__lastUpdate
-    
+
     def rulesFileName(self):
         """
         Public method to get the name of the rules file.
-        
+
         @return name of the rules file
         @rtype str
         """
         if self.location().scheme() == "file":
             return self.location().toLocalFile()
-        
+
         if self.__location.isEmpty():
             return ""
-        
-        sha1 = bytes(QCryptographicHash.hash(
-            self.__location, QCryptographicHash.Algorithm.Sha1).toHex()
+
+        sha1 = bytes(
+            QCryptographicHash.hash(
+                self.__location, QCryptographicHash.Algorithm.Sha1
+            ).toHex()
         ).decode()
-        dataDir = os.path.join(
-            Utilities.getConfigDir(), "web_browser", "subscriptions")
+        dataDir = os.path.join(Utilities.getConfigDir(), "web_browser", "subscriptions")
         if not os.path.exists(dataDir):
             os.makedirs(dataDir)
-        fileName = os.path.join(
-            dataDir, "adblock_subscription_{0}".format(sha1))
+        fileName = os.path.join(dataDir, "adblock_subscription_{0}".format(sha1))
         return fileName
-    
+
     def __loadRules(self):
         """
         Private method to load the rules of the subscription.
@@ -286,15 +303,17 @@
                         EricMessageBox.warning(
                             None,
                             self.tr("Load subscription rules"),
-                            self.tr("""AdBlock file '{0}' does not start"""
-                                    """ with [Adblock.""")
-                            .format(fileName))
+                            self.tr(
+                                """AdBlock file '{0}' does not start"""
+                                """ with [Adblock."""
+                            ).format(fileName),
+                        )
                         f.close()
                         os.unlink(fileName)
                         self.__lastUpdate = QDateTime()
                     else:
                         from .AdBlockRule import AdBlockRule
-                        
+
                         self.__updatePeriod = 0
                         self.__remoteModified = QDateTime()
                         self.__rules = []
@@ -313,91 +332,98 @@
                                 else:
                                     # days
                                     self.__updatePeriod = int(period) * 24
-                            remoteModified = self.__remoteModifiedRe.search(
-                                line)
+                            remoteModified = self.__remoteModifiedRe.search(line)
                             if remoteModified:
-                                day, month, year, time, hour, minute = (
-                                    remoteModified.groups()
-                                )
+                                (
+                                    day,
+                                    month,
+                                    year,
+                                    time,
+                                    hour,
+                                    minute,
+                                ) = remoteModified.groups()
                                 self.__remoteModified.setDate(
-                                    QDate(int(year),
-                                          self.__monthNameToNumber[month],
-                                          int(day))
+                                    QDate(
+                                        int(year),
+                                        self.__monthNameToNumber[month],
+                                        int(day),
+                                    )
                                 )
                                 if time:
                                     self.__remoteModified.setTime(
-                                        QTime(int(hour), int(minute)))
+                                        QTime(int(hour), int(minute))
+                                    )
                                 else:
                                     # no time given, set it to 23:59
-                                    self.__remoteModified.setTime(
-                                        QTime(23, 59))
+                                    self.__remoteModified.setTime(QTime(23, 59))
                         self.changed.emit()
             except OSError as err:
                 EricMessageBox.warning(
                     None,
                     self.tr("Load subscription rules"),
                     self.tr(
-                        """Unable to read AdBlock file '{0}'.\nReason: {1}""")
-                    .format(fileName, str(err))
+                        """Unable to read AdBlock file '{0}'.\nReason: {1}"""
+                    ).format(fileName, str(err)),
                 )
-        
+
         elif not fileName.endswith("_custom"):
             self.__lastUpdate = QDateTime()
-        
+
         self.checkForUpdate()
-    
+
     def checkForUpdate(self):
         """
         Public method to check for an update.
         """
         updatePeriod = (
             self.__updatePeriod
-            if self.__updatePeriod else
-            Preferences.getWebBrowser("AdBlockUpdatePeriod") * 24
+            if self.__updatePeriod
+            else Preferences.getWebBrowser("AdBlockUpdatePeriod") * 24
         )
         if (
-            not self.__lastUpdate.isValid() or
-            (self.__remoteModified.isValid() and
-             self.__remoteModified.addSecs(updatePeriod * 3600) <
-                QDateTime.currentDateTime()) or
-            self.__lastUpdate.addSecs(updatePeriod * 3600) <
-                QDateTime.currentDateTime()
+            not self.__lastUpdate.isValid()
+            or (
+                self.__remoteModified.isValid()
+                and self.__remoteModified.addSecs(updatePeriod * 3600)
+                < QDateTime.currentDateTime()
+            )
+            or self.__lastUpdate.addSecs(updatePeriod * 3600)
+            < QDateTime.currentDateTime()
         ):
             self.updateNow()
-    
+
     def updateNow(self):
         """
         Public method to update the subscription immediately.
         """
         if self.__downloading is not None:
             return
-        
+
         if not self.location().isValid():
             return
-        
+
         if self.location().scheme() == "file":
             self.__lastUpdate = QDateTime.currentDateTime()
             self.__loadRules()
             return
-        
+
         from WebBrowser.WebBrowserWindow import WebBrowserWindow
-        reply = WebBrowserWindow.networkManager().get(
-            QNetworkRequest(self.location()))
-        reply.finished.connect(
-            lambda: self.__rulesDownloaded(reply))
+
+        reply = WebBrowserWindow.networkManager().get(QNetworkRequest(self.location()))
+        reply.finished.connect(lambda: self.__rulesDownloaded(reply))
         self.__downloading = reply
-    
+
     def __rulesDownloaded(self, reply):
         """
         Private slot to deal with the downloaded rules.
-        
+
         @param reply reference to the network reply
         @type QNetworkReply
         """
         response = bytes(reply.readAll())
         reply.close()
         self.__downloading = None
-        
+
         if reply.error() != QNetworkReply.NetworkError.NoError:
             if not self.__defaultSubscription:
                 # don't show error if we try to load the default
@@ -406,29 +432,34 @@
                     self.tr("Downloading subscription rules"),
                     self.tr(
                         """<p>Subscription rules could not be"""
-                        """ downloaded.</p><p>Error: {0}</p>""")
-                    .format(reply.errorString()))
+                        """ downloaded.</p><p>Error: {0}</p>"""
+                    ).format(reply.errorString()),
+                )
             else:
                 # reset after first download attempt
                 self.__defaultSubscription = False
             return
-        
+
         if not response:
             EricMessageBox.warning(
                 None,
                 self.tr("Downloading subscription rules"),
-                self.tr("""Got empty subscription rules."""))
+                self.tr("""Got empty subscription rules."""),
+            )
             return
-        
+
         fileName = self.rulesFileName()
         try:
             with open(fileName, "wb") as f:
                 from WebBrowser.WebBrowserWindow import WebBrowserWindow
+
                 if (
-                    WebBrowserWindow.adBlockManager().useLimitedEasyList() and
-                    self.url().toString().startswith(
-                        WebBrowserWindow.adBlockManager()
-                        .getDefaultSubscriptionUrl())
+                    WebBrowserWindow.adBlockManager().useLimitedEasyList()
+                    and self.url()
+                    .toString()
+                    .startswith(
+                        WebBrowserWindow.adBlockManager().getDefaultSubscriptionUrl()
+                    )
                 ):
                     limited = True
                     # ignore Third-party advertisers rules for performance
@@ -436,12 +467,14 @@
                     index = response.find(
                         b"!---------------------------"
                         b"Third-party advertisers"
-                        b"---------------------------!")
+                        b"---------------------------!"
+                    )
                     part1 = response[:index]
                     index = response.find(
                         b"!-----------------------"
                         b"Whitelists to fix broken sites"
-                        b"------------------------!")
+                        b"------------------------!"
+                    )
                     part2 = response[index:]
                     f.write(part1)
                     f.write(part2)
@@ -450,7 +483,7 @@
                     f.write(response)
                 f.close()
                 self.__lastUpdate = QDateTime.currentDateTime()
-                
+
                 if limited or self.__validateCheckSum(fileName):
                     self.__loadRules()
                 else:
@@ -459,15 +492,15 @@
             EricMessageBox.warning(
                 None,
                 self.tr("Downloading subscription rules"),
-                self.tr("""Unable to write to AdBlock file '{0}'.""")
-                .file(fileName))
+                self.tr("""Unable to write to AdBlock file '{0}'.""").file(fileName),
+            )
         self.__downloading = None
         reply.deleteLater()
-    
+
     def __validateCheckSum(self, fileName):
         """
         Private method to check the subscription file's checksum.
-        
+
         @param fileName name of the file containing the subscription
         @type str
         @return flag indicating a valid file. A file is considered
@@ -481,25 +514,23 @@
                 data = f.read()
         except (OSError, OSError):
             return False
-        
+
         match = re.search(self.__checksumRe, data)
         if match:
             expectedChecksum = match.group(1)
         else:
             # consider it as valid
             return True
-        
+
         # normalize the data
-        data = re.sub(r"\r", "", data)              # normalize eol
-        data = re.sub(r"\n+", "\n", data)           # remove empty lines
+        data = re.sub(r"\r", "", data)  # normalize eol
+        data = re.sub(r"\n+", "\n", data)  # remove empty lines
         data = re.sub(self.__checksumRe, "", data)  # remove checksum line
-        
+
         # calculate checksum
-        md5 = hashlib.md5()             # secok
+        md5 = hashlib.md5()  # secok
         md5.update(data.encode("utf-8"))
-        calculatedChecksum = (
-            base64.b64encode(md5.digest()).decode().rstrip("=")
-        )
+        calculatedChecksum = base64.b64encode(md5.digest()).decode().rstrip("=")
         if calculatedChecksum == expectedChecksum:
             return True
         else:
@@ -511,11 +542,11 @@
                     """ checksum.<br/>"""
                     """Found: {1}<br/>"""
                     """Calculated: {2}<br/>"""
-                    """Use it anyway?</p>""")
-                .format(self.__title, expectedChecksum,
-                        calculatedChecksum))
+                    """Use it anyway?</p>"""
+                ).format(self.__title, expectedChecksum, calculatedChecksum),
+            )
             return res
-    
+
     def saveRules(self):
         """
         Public method to save the subscription rules.
@@ -523,7 +554,7 @@
         fileName = self.rulesFileName()
         if not fileName:
             return
-        
+
         try:
             with open(fileName, "w", encoding="utf-8") as f:
                 if not self.__rules or not self.__rules[0].isHeader():
@@ -534,13 +565,13 @@
             EricMessageBox.warning(
                 None,
                 self.tr("Saving subscription rules"),
-                self.tr("""Unable to write to AdBlock file '{0}'.""")
-                .format(fileName))
-    
+                self.tr("""Unable to write to AdBlock file '{0}'.""").format(fileName),
+            )
+
     def rule(self, offset):
         """
         Public method to get a specific rule.
-        
+
         @param offset offset of the rule
         @type int
         @return requested rule
@@ -548,22 +579,22 @@
         """
         if offset >= len(self.__rules):
             return None
-        
+
         return self.__rules[offset]
-    
+
     def allRules(self):
         """
         Public method to get the list of rules.
-        
+
         @return list of rules
         @rtype list of AdBlockRule
         """
         return self.__rules[:]
-    
+
     def addRule(self, rule):
         """
         Public method to add a rule.
-        
+
         @param rule reference to the rule to add
         @type AdBlockRule
         @return offset of the rule
@@ -571,26 +602,26 @@
         """
         self.__rules.append(rule)
         self.rulesChanged.emit()
-        
+
         return len(self.__rules) - 1
-    
+
     def removeRule(self, offset):
         """
         Public method to remove a rule given the offset.
-        
+
         @param offset offset of the rule to remove
         @type int
         """
         if offset < 0 or offset > len(self.__rules):
             return
-        
+
         del self.__rules[offset]
         self.rulesChanged.emit()
-    
+
     def replaceRule(self, rule, offset):
         """
         Public method to replace a rule given the offset.
-        
+
         @param rule reference to the rule to set
         @type AdBlockRule
         @param offset offset of the rule to remove
@@ -600,34 +631,34 @@
         """
         if offset >= len(self.__rules):
             return None
-        
+
         self.__rules[offset] = rule
         self.rulesChanged.emit()
-        
+
         return self.__rules[offset]
-    
+
     def canEditRules(self):
         """
         Public method to check, if rules can be edited.
-        
+
         @return flag indicating rules may be edited
         @rtype bool
         """
         return self.__custom
-    
+
     def canBeRemoved(self):
         """
         Public method to check, if the subscription can be removed.
-        
+
         @return flag indicating removal is allowed
         @rtype bool
         """
         return not self.__custom and not self.__defaultSubscription
-    
+
     def setRuleEnabled(self, offset, enabled):
         """
         Public method to enable a specific rule.
-        
+
         @param offset offset of the rule
         @type int
         @param enabled new enabled state
@@ -637,13 +668,14 @@
         """
         if offset >= len(self.__rules):
             return None
-        
+
         rule = self.__rules[offset]
         rule.setEnabled(enabled)
         self.rulesEnabledChanged.emit()
-        
+
         if rule.isCSSRule():
             from WebBrowser.WebBrowserWindow import WebBrowserWindow
+
             WebBrowserWindow.mainWindow().reloadUserStyleSheet()
-        
+
         return rule

eric ide

mercurial