eric6/WebBrowser/SafeBrowsing/SafeBrowsingManager.py

changeset 6942
2602857055c5
parent 6645
ad476851d7e0
child 7192
a22eee00b052
diff -r f99d60d6b59b -r 2602857055c5 eric6/WebBrowser/SafeBrowsing/SafeBrowsingManager.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/eric6/WebBrowser/SafeBrowsing/SafeBrowsingManager.py	Sun Apr 14 15:09:21 2019 +0200
@@ -0,0 +1,619 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2017 - 2019 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the interface for Google Safe Browsing.
+"""
+
+#
+# Some part of this code were ported from gglsbl.client and adapted
+# to Qt.
+#
+# https://github.com/afilipovich/gglsbl
+#
+
+from __future__ import unicode_literals
+
+import os
+import base64
+
+from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QCoreApplication, \
+    QUrl, QDateTime, QTimer
+
+import Preferences
+import Utilities
+
+import UI.PixmapCache
+
+from .SafeBrowsingAPIClient import SafeBrowsingAPIClient
+from .SafeBrowsingCache import SafeBrowsingCache
+from .SafeBrowsingThreatList import ThreatList, HashPrefixList
+from .SafeBrowsingUrl import SafeBrowsingUrl
+from .SafeBrowsingUtilities import toHex
+
+
+class SafeBrowsingManager(QObject):
+    """
+    Class implementing the interface for Google Safe Browsing.
+    
+    @signal progressMessage(message,maximum) emitted to give a message for the
+        action about to be performed and the maximum value
+    @signal progress(current) emitted to signal the current progress
+    """
+    progressMessage = pyqtSignal(str, int)
+    progress = pyqtSignal(int)
+    
+    enabled = (
+        Preferences.getWebBrowser("SafeBrowsingEnabled") and
+        bool(Preferences.getWebBrowser("SafeBrowsingApiKey"))
+    )
+    
+    def __init__(self):
+        """
+        Constructor
+        """
+        super(SafeBrowsingManager, self).__init__()
+        
+        self.__apiKey = Preferences.getWebBrowser("SafeBrowsingApiKey")
+        if self.__apiKey:
+            self.__apiClient = SafeBrowsingAPIClient(self.__apiKey,
+                                                     parent=self)
+        else:
+            self.__apiClient = None
+        
+        gsbCachePath = os.path.join(
+            Utilities.getConfigDir(), "web_browser", "safe_browsing")
+        self.__cache = SafeBrowsingCache(gsbCachePath, self)
+        
+        self.__gsbDialog = None
+        self.__setPlatforms()
+        self.__setLookupMethod()
+        
+        self.__updatingThreatLists = False
+        self.__threatListsUpdateTimer = QTimer(self)
+        self.__threatListsUpdateTimer.setSingleShot(True)
+        self.__threatListsUpdateTimer.timeout.connect(
+            self.__threatListsUpdateTimerTimeout)
+        self.__setAutoUpdateThreatLists()
+    
+    def configurationChanged(self):
+        """
+        Public method to handle changes of the settings.
+        """
+        apiKey = Preferences.getWebBrowser("SafeBrowsingApiKey")
+        if apiKey != self.__apiKey:
+            self.__apiKey = apiKey
+            if self.__apiKey:
+                if self.__apiClient:
+                    self.__apiClient.setApiKey(self.__apiKey)
+                else:
+                    self.__apiClient = SafeBrowsingAPIClient(self.__apiKey,
+                                                             parent=self)
+        
+        SafeBrowsingManager.enabled = (
+            Preferences.getWebBrowser("SafeBrowsingEnabled") and
+            bool(self.__apiKey))
+        
+        self.__setPlatforms()
+        self.__setLookupMethod()
+        self.__setAutoUpdateThreatLists()
+    
+    def __setPlatforms(self):
+        """
+        Private method to set the platforms to be checked against.
+        """
+        self.__platforms = None
+        if Preferences.getWebBrowser("SafeBrowsingFilterPlatform"):
+            if Utilities.isWindowsPlatform():
+                platform = "windows"
+            elif Utilities.isMacPlatform():
+                platform = "macos"
+            else:
+                # treat all other platforms like linux
+                platform = "linux"
+            self.__platforms = SafeBrowsingAPIClient.getPlatformTypes(platform)
+    
+    def __setLookupMethod(self):
+        """
+        Private method to set the lookup method (Update API or Lookup API).
+        """
+        self.__useLookupApi = Preferences.getWebBrowser(
+            "SafeBrowsingUseLookupApi")
+    
+    @classmethod
+    def isEnabled(cls):
+        """
+        Class method to check, if safe browsing is enabled.
+        
+        @return flag indicating the enabled state
+        @rtype bool
+        """
+        return cls.enabled
+    
+    def close(self):
+        """
+        Public method to close the safe browsing interface.
+        """
+        self.__cache.close()
+    
+    def fairUseDelayExpired(self):
+        """
+        Public method to check, if the fair use wait period has expired.
+        
+        @return flag indicating expiration
+        @rtype bool
+        """
+        return self.isEnabled() and self.__apiClient.fairUseDelayExpired()
+    
+    def __showNotificationMessage(self, message, timeout=5):
+        """
+        Private method to show some message in a notification widget.
+        
+        If desktop notifications have been disabled, the message will
+        be shown in the status bar of the main window (either the main
+        web browser window or the eric main window)
+        
+        @param message message to be shown
+        @type str
+        @param timeout amount of time in seconds the message should be shown
+            (0 = indefinitely)
+        @type int
+        """
+        from WebBrowser.WebBrowserWindow import WebBrowserWindow
+        
+        if WebBrowserWindow.notificationsEnabled():
+            WebBrowserWindow.showNotification(
+                UI.PixmapCache.getPixmap("safeBrowsing48.png"),
+                self.tr("Google Safe Browsing"),
+                message,
+                timeout=timeout,
+            )
+        else:
+            statusBar = WebBrowserWindow.globalStatusBar()
+            if statusBar is not None:
+                statusBar.showMessage(message, timeout * 1000)
+    
+    def __setAutoUpdateThreatLists(self):
+        """
+        Private method to set auto update for the threat lists.
+        """
+        autoUpdateEnabled = \
+            Preferences.getWebBrowser("SafeBrowsingAutoUpdate") and \
+            not Preferences.getWebBrowser("SafeBrowsingUseLookupApi")
+        if autoUpdateEnabled and self.isEnabled():
+            nextUpdateDateTime = Preferences.getWebBrowser(
+                "SafeBrowsingUpdateDateTime")
+            if nextUpdateDateTime.isValid():
+                interval = \
+                    QDateTime.currentDateTime().secsTo(nextUpdateDateTime) + 2
+                # 2 seconds extra wait time; interval in milliseconds
+                
+                if interval < 5:
+                    interval = 5
+                    # minimum 5 seconds interval
+            else:
+                interval = 5
+                # just wait 5 seconds
+            self.__threatListsUpdateTimer.start(interval * 1000)
+        else:
+            if self.__threatListsUpdateTimer.isActive():
+                self.__threatListsUpdateTimer.stop()
+    
+    @pyqtSlot()
+    def __threatListsUpdateTimerTimeout(self):
+        """
+        Private slot to perform the auto update of the threat lists.
+        """
+        ok = False
+        if self.isEnabled():
+            self.__showNotificationMessage(
+                self.tr("Updating threat lists..."), 0)
+            ok = self.updateHashPrefixCache()[0]
+            if ok:
+                self.__showNotificationMessage(
+                    self.tr("Updating threat lists done"))
+            else:
+                self.__showNotificationMessage(
+                    self.tr("Updating threat lists failed"))
+        
+        if ok:
+            nextUpdateDateTime = \
+                self.__apiClient.getFairUseDelayExpirationDateTime()
+            Preferences.setWebBrowser("SafeBrowsingUpdateDateTime",
+                                      nextUpdateDateTime)
+            self.__threatListsUpdateTimer.start(
+                (QDateTime.currentDateTime().secsTo(nextUpdateDateTime) + 2) *
+                1000)
+            # 2 seconds extra wait time; interval in milliseconds
+        else:
+            Preferences.setWebBrowser("SafeBrowsingUpdateDateTime",
+                                      QDateTime())
+    
+    def updateHashPrefixCache(self):
+        """
+        Public method to load or update the locally cached threat lists.
+        
+        @return flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        if not self.isEnabled():
+            return False, self.tr("Safe Browsing is disabled.")
+        
+        if not self.__apiClient.fairUseDelayExpired():
+            return False, \
+                self.tr("The fair use wait period has not expired yet."
+                        "Expiration will be at {0}.").format(
+                    self.__apiClient.getFairUseDelayExpirationDateTime()
+                    .toString("yyyy-MM-dd, HH:mm:ss"))
+        
+        self.__updatingThreatLists = True
+        ok = True
+        errorMessage = ""
+        
+        # step 1: remove expired hashes
+        self.__cache.cleanupFullHashes()
+        QCoreApplication.processEvents()
+        
+        # step 2: update threat lists
+        threatListsForRemove = {}
+        for threatList, _clientState in self.__cache.getThreatLists():
+            threatListsForRemove[repr(threatList)] = threatList
+        threatLists, error = self.__apiClient.getThreatLists()
+        if error:
+            return False, error
+        
+        maximum = len(threatLists)
+        current = 0
+        self.progressMessage.emit(self.tr("Updating threat lists"), maximum)
+        for entry in threatLists:
+            current += 1
+            self.progress.emit(current)
+            QCoreApplication.processEvents()
+            threatList = ThreatList.fromApiEntry(entry)
+            if self.__platforms is None or \
+                    threatList.platformType in self.__platforms:
+                self.__cache.addThreatList(threatList)
+                key = repr(threatList)
+                if key in threatListsForRemove:
+                    del threatListsForRemove[key]
+        maximum = len(threatListsForRemove.values())
+        current = 0
+        self.progressMessage.emit(self.tr("Deleting obsolete threat lists"),
+                                  maximum)
+        for threatList in threatListsForRemove.values():
+            current += 1
+            self.progress.emit(current)
+            QCoreApplication.processEvents()
+            self.__cache.deleteHashPrefixList(threatList)
+            self.__cache.deleteThreatList(threatList)
+        del threatListsForRemove
+        
+        # step 3: update threats
+        threatLists = self.__cache.getThreatLists()
+        clientStates = {}
+        for threatList, clientState in threatLists:
+            clientStates[threatList.asTuple()] = clientState
+        threatsUpdateResponses, error = \
+            self.__apiClient.getThreatsUpdate(clientStates)
+        if error:
+            return False, error
+        
+        maximum = len(threatsUpdateResponses)
+        current = 0
+        self.progressMessage.emit(self.tr("Updating hash prefixes"), maximum)
+        for response in threatsUpdateResponses:
+            current += 1
+            self.progress.emit(current)
+            QCoreApplication.processEvents()
+            responseThreatList = ThreatList.fromApiEntry(response)
+            if response["responseType"] == "FULL_UPDATE":
+                self.__cache.deleteHashPrefixList(responseThreatList)
+            for removal in response.get("removals", []):
+                self.__cache.removeHashPrefixIndices(
+                    responseThreatList, removal["rawIndices"]["indices"])
+                QCoreApplication.processEvents()
+            for addition in response.get("additions", []):
+                hashPrefixList = HashPrefixList(
+                    addition["rawHashes"]["prefixSize"],
+                    base64.b64decode(addition["rawHashes"]["rawHashes"]))
+                self.__cache.populateHashPrefixList(responseThreatList,
+                                                    hashPrefixList)
+                QCoreApplication.processEvents()
+            expectedChecksum = base64.b64decode(response["checksum"]["sha256"])
+            if self.__verifyThreatListChecksum(responseThreatList,
+                                               expectedChecksum):
+                self.__cache.updateThreatListClientState(
+                    responseThreatList, response["newClientState"])
+            else:
+                ok = False
+                errorMessage = self.tr(
+                    "Local cache checksum does not match the server. Consider"
+                    " cleaning the cache. Threat update has been aborted.")
+        
+        self.__updatingThreatLists = False
+        
+        return ok, errorMessage
+    
+    def isUpdatingThreatLists(self):
+        """
+        Public method to check, if we are in the process of updating the
+        threat lists.
+        
+        @return flag indicating an update process is active
+        @rtype bool
+        """
+        return self.__updatingThreatLists
+    
+    def __verifyThreatListChecksum(self, threatList, remoteChecksum):
+        """
+        Private method to verify the local checksum of a threat list with the
+        checksum of the safe browsing server.
+        
+        @param threatList threat list to calculate checksum for
+        @type ThreatList
+        @param remoteChecksum SHA256 checksum as reported by the Google server
+        @type bytes
+        @return flag indicating equality
+        @rtype bool
+        """
+        localChecksum = self.__cache.hashPrefixListChecksum(threatList)
+        return remoteChecksum == localChecksum
+    
+    def fullCacheCleanup(self):
+        """
+        Public method to clean up the cache completely.
+        """
+        self.__cache.prepareCacheDb()
+    
+    def showSafeBrowsingDialog(self):
+        """
+        Public slot to show the safe browsing management dialog.
+        """
+        if self.__gsbDialog is None:
+            from WebBrowser.WebBrowserWindow import WebBrowserWindow
+            from .SafeBrowsingDialog import SafeBrowsingDialog
+            self.__gsbDialog = SafeBrowsingDialog(
+                self, parent=WebBrowserWindow.mainWindow())
+        
+        self.__gsbDialog.show()
+    
+    def lookupUrl(self, url):
+        """
+        Public method to lookup an URL.
+        
+        @param url URL to be checked
+        @type str or QUrl
+        @return tuple containing the list of threat lists the URL was found in
+            and an error message
+        @rtype tuple of (list of ThreatList, str)
+        @exception ValueError raised for an invalid URL
+        """
+        if self.isEnabled():
+            if self.__useLookupApi:
+                if isinstance(url, str):
+                    url = QUrl(url.strip())
+                
+                if url.isEmpty():
+                    raise ValueError("Empty URL given.")
+                
+                listNames, error = \
+                    self.__apiClient.lookupUrl(url, self.__platforms)
+                return listNames, error
+            else:
+                if isinstance(url, QUrl):
+                    urlStr = url.toString().strip()
+                else:
+                    urlStr = url.strip()
+                
+                if not urlStr:
+                    raise ValueError("Empty URL given.")
+                
+                urlHashes = SafeBrowsingUrl(urlStr).hashes()
+                listNames = self.__lookupHashes(urlHashes)
+                
+                return listNames, ""
+        
+        return None, ""
+    
+    def __lookupHashes(self, fullHashes):
+        """
+        Private method to lookup the given hashes.
+        
+        @param fullHashes list of hashes to lookup
+        @type list of bytes
+        @return names of threat lists hashes were found in
+        @rtype list of ThreatList
+        """
+        fullHashes = list(fullHashes)
+        cues = [toHex(fh[:4]) for fh in fullHashes]
+        result = []
+        
+        matchingPrefixes = {}
+        matchingFullHashes = set()
+        isPotentialThreat = False
+        # Lookup hash prefixes which match full URL hash
+        for _threatList, hashPrefix, negativeCacheExpired in \
+                self.__cache.lookupHashPrefix(cues):
+            for fullHash in fullHashes:
+                if fullHash.startswith(hashPrefix):
+                    isPotentialThreat = True
+                    # consider hash prefix negative cache as expired if it
+                    # is expired in at least one threat list
+                    matchingPrefixes[hashPrefix] = matchingPrefixes.get(
+                        hashPrefix, False) or negativeCacheExpired
+                    matchingFullHashes.add(fullHash)
+            
+        # if none matches, url hash is clear
+        if not isPotentialThreat:
+            return []
+        
+        # if there is non-expired full hash, URL is blacklisted
+        matchingExpiredThreatLists = set()
+        for threatList, hasExpired in self.__cache.lookupFullHashes(
+                matchingFullHashes):
+            if hasExpired:
+                matchingExpiredThreatLists.add(threatList)
+            else:
+                result.append(threatList)
+        if result:
+            return result
+        
+        # If there are no matching expired full hash entries and negative
+        # cache is still current for all prefixes, consider it safe.
+        if len(matchingExpiredThreatLists) == 0 and \
+           sum(map(int, matchingPrefixes.values())) == 0:
+            return []
+        
+        # Now it can be assumed that there are expired matching full hash
+        # entries and/or cache prefix entries with expired negative cache.
+        # Both require full hash synchronization.
+        self.__syncFullHashes(matchingPrefixes.keys())
+        
+        # Now repeat full hash lookup
+        for threatList, hasExpired in self.__cache.lookupFullHashes(
+                matchingFullHashes):
+            if not hasExpired:
+                result.append(threatList)
+        
+        return result
+    
+    def __syncFullHashes(self, hashPrefixes):
+        """
+        Private method to download full hashes matching given prefixes.
+        
+        This also updates the cache expiration timestamps.
+        
+        @param hashPrefixes list of hash prefixes to get full hashes for
+        @type list of bytes
+        """
+        threatLists = self.__cache.getThreatLists()
+        clientStates = {}
+        for threatList, clientState in threatLists:
+            clientStates[threatList.asTuple()] = clientState
+        
+        fullHashResponses = self.__apiClient.getFullHashes(
+            hashPrefixes, clientStates)
+        
+        # update negative cache for each hash prefix
+        # store full hash with positive cache bumped up
+        for match in fullHashResponses["matches"]:
+            threatList = ThreatList.fromApiEntry(match)
+            hashValue = base64.b64decode(match["threat"]["hash"])
+            cacheDuration = int(match["cacheDuration"].rstrip("s"))
+            malwareThreatType = None
+            for metadata in match["threatEntryMetadata"].get("entries", []):
+                key = base64.b64decode(metadata["key"])
+                value = base64.b64decode(metadata["value"])
+                if key == b"malware_threat_type":
+                    malwareThreatType = value
+                    if not isinstance(malwareThreatType, str):
+                        malwareThreatType = malwareThreatType.decode()
+            self.__cache.storeFullHash(threatList, hashValue, cacheDuration,
+                                       malwareThreatType)
+        
+        negativeCacheDuration = int(
+            fullHashResponses["negativeCacheDuration"].rstrip("s"))
+        for prefixValue in hashPrefixes:
+            for threatList, _clientState in threatLists:
+                self.__cache.updateHashPrefixExpiration(
+                    threatList, prefixValue, negativeCacheDuration)
+    
+    @classmethod
+    def getIgnoreSchemes(cls):
+        """
+        Class method to get the schemes not to be checked.
+        
+        @return list of schemes to be ignored
+        @rtype list of str
+        """
+        return [
+            "about",
+            "eric",
+            "qrc",
+            "qthelp",
+            "chrome",
+            "abp",
+            "file",
+        ]
+    
+    def getThreatMessage(self, threatType):
+        """
+        Public method to get a warning message for the given threat type.
+        
+        @param threatType threat type to get the message for
+        @type str
+        @return threat message
+        @rtype str
+        """
+        if self.__apiClient:
+            msg = self.__apiClient.getThreatMessage(threatType)
+        else:
+            msg = ""
+        
+        return msg
+    
+    def getThreatMessages(self, threatLists):
+        """
+        Public method to get threat messages for the given threats.
+        
+        @param threatLists list of threat lists to get a message for
+        @type list of ThreatList
+        @return list of threat messages, one per unique threat type
+        @rtype list of str
+        """
+        threatTypes = set()
+        for threatList in threatLists:
+            threatTypes.add(threatList.threatType)
+        
+        messages = []
+        if self.__apiClient:
+            for threatType in sorted(threatTypes):
+                msg = self.__apiClient.getThreatMessage(threatType)
+                messages.append(msg)
+        
+        return messages
+    
+    def getThreatType(self, threatList):
+        """
+        Public method to get a display string for a given threat type.
+        
+        @param threatList threat list to get display string for
+        @type str
+        @return display string
+        @rtype str
+        """
+        displayString = ""
+        if self.__apiClient:
+            displayString = self.__apiClient.getThreatType(
+                threatList.threatType)
+        return displayString
+    
+    def getPlatformString(self, platformType):
+        """
+        Public method to get the platform string for a given platform type.
+        
+        @param platformType platform type as defined in the v4 API
+        @type str
+        @return platform string
+        @rtype str
+        """
+        if self.__apiClient:
+            return self.__apiClient.getPlatformString(platformType)
+        else:
+            return ""
+    
+    def getThreatEntryString(self, threatEntry):
+        """
+        Public method to get the threat entry string.
+        
+        @param threatEntry threat entry type as defined in the v4 API
+        @type str
+        @return threat entry string
+        @rtype str
+        """
+        if self.__apiClient:
+            return self.__apiClient.getThreatEntryString(threatEntry)
+        else:
+            return ""

eric ide

mercurial