WebBrowser/SafeBrowsing/SafeBrowsingManager.py

Thu, 10 Aug 2017 13:58:50 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Thu, 10 Aug 2017 13:58:50 +0200
changeset 5842
c3f41b959a65
parent 5839
fe4d62e23908
child 5853
e45a570528a4
permissions
-rw-r--r--

Fine tuned the safe browsing update notifications.

# -*- coding: utf-8 -*-

# Copyright (c) 2017 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the interface for Google Safe Browsing.
"""

#
# Some part of this code were ported from gglsbl.client and adapted
# to Qt.
#
# https://github.com/afilipovich/gglsbl
#

from __future__ import unicode_literals

import os
import base64

from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QCoreApplication, \
    QUrl, QDateTime, QTimer

import Preferences
import Utilities

import UI.PixmapCache

from .SafeBrowsingAPIClient import SafeBrowsingAPIClient
from .SafeBrowsingCache import SafeBrowsingCache, ThreatList, HashPrefixList
from .SafeBrowsingUrl import SafeBrowsingUrl
from .SafeBrowsingUtilities import toHex


class SafeBrowsingManager(QObject):
    """
    Class implementing the interface for Google Safe Browsing.
    
    @signal progressMessage(message,maximum) emitted to give a message for the
        action about to be performed and the maximum value
    @signal progress(current) emitted to signal the current progress
    """
    progressMessage = pyqtSignal(str, int)
    progress = pyqtSignal(int)
    
    def __init__(self):
        """
        Constructor
        """
        super(SafeBrowsingManager, self).__init__()
        
        self.__apiKey = Preferences.getWebBrowser("SafeBrowsingApiKey")
        if self.__apiKey:
            self.__apiClient = SafeBrowsingAPIClient(self.__apiKey,
                                                     parent=self)
        else:
            self.__apiClient = None
        
        self.__enabled = (
            Preferences.getWebBrowser("SafeBrowsingEnabled") and
            bool(self.__apiKey))
        
        gsbCachePath = os.path.join(
            Utilities.getConfigDir(), "web_browser", "safe_browsing")
        self.__cache = SafeBrowsingCache(gsbCachePath, self)
        
        self.__gsbDialog = None
        self.__setPlatforms()
        
        self.__updatingThreatLists = False
        self.__threatListsUpdateTimer = QTimer(self)
        self.__threatListsUpdateTimer.setSingleShot(True)
        self.__threatListsUpdateTimer.timeout.connect(
            self.__threatListsUpdateTimerTimeout)
        self.__setAutoUpdateThreatLists()
    
    def configurationChanged(self):
        """
        Public method to handle changes of the settings.
        """
        apiKey = Preferences.getWebBrowser("SafeBrowsingApiKey")
        if apiKey != self.__apiKey:
            self.__apiKey = apiKey
            if self.__apiKey:
                if self.__apiClient:
                    self.__apiClient.setApiKey(self.__apiKey)
                else:
                    self.__apiClient = SafeBrowsingAPIClient(self.__apiKey,
                                                             parent=self)
        
        self.__enabled = (
            Preferences.getWebBrowser("SafeBrowsingEnabled") and
            bool(self.__apiKey))
        
        self.__setPlatforms()
        self.__setAutoUpdateThreatLists()
    
    def __setPlatforms(self):
        """
        Private method to set the platforms to be checked against.
        """
        self.__platforms = None
        if Preferences.getWebBrowser("SafeBrowsingFilterPlatform"):
            if Utilities.isWindowsPlatform():
                platform = "windows"
            elif Utilities.isMacPlatform():
                platform = "macos"
            else:
                # treat all other platforms like linux
                platform = "linux"
            self.__platforms = SafeBrowsingAPIClient.getPlatformTypes(platform)
    
    def isEnabled(self):
        """
        Public method to check, if safe browsing is enabled.
        
        @return flag indicating the enabled state
        @rtype bool
        """
        return self.__enabled
    
    def close(self):
        """
        Public method to close the safe browsing interface.
        """
        self.__cache.close()
    
    def fairUseDelayExpired(self):
        """
        Public method to check, if the fair use wait period has expired.
        
        @return flag indicating expiration
        @rtype bool
        """
        return self.__enabled and self.__apiClient.fairUseDelayExpired()
    
    def __showNotificationMessage(self, message, timeout=5):
        """
        Private method to show some message in a notification widget.
        
        If desktop notifications have been disabled, the message will
        be shown in the status bar of the main window (either the main
        web browser window or the eric main window)
        
        @param message message to be shown
        @type str
        @param timeout amount of time in seconds the message should be shown
            (0 = indefinitely)
        @type int
        """
        from WebBrowser.WebBrowserWindow import WebBrowserWindow
        
        if WebBrowserWindow.notificationsEnabled():
            WebBrowserWindow.showNotification(
                UI.PixmapCache.getPixmap("safeBrowsing48.png"),
                self.tr("Google Safe Browsing"),
                message,
                timeout=timeout,
            )
        else:
            statusBar = WebBrowserWindow.globalStatusBar()
            if statusBar is not None:
                statusBar.showMessage(message, timeout * 1000)
    
    def __setAutoUpdateThreatLists(self):
        """
        Private method to set auto update for the threat lists.
        """
        autoUpdateEnabled = Preferences.getWebBrowser("SafeBrowsingAutoUpdate")
        if autoUpdateEnabled and self.__enabled:
            nextUpdateDateTime = Preferences.getWebBrowser(
                "SafeBrowsingUpdateDateTime")
            if nextUpdateDateTime.isValid():
                interval = \
                    QDateTime.currentDateTime().secsTo(nextUpdateDateTime) + 2
                # 2 seconds extra wait time; interval in milliseconds
                
                if interval < 5:
                    interval = 5
                    # minimum 5 seconds interval
            else:
                interval = 5
                # just wait 5 seconds
            self.__threatListsUpdateTimer.start(interval * 1000)
        else:
            if self.__threatListsUpdateTimer.isActive():
                self.__threatListsUpdateTimer.stop()
    
    @pyqtSlot()
    def __threatListsUpdateTimerTimeout(self):
        """
        Private slot to perform the auto update of the threat lists.
        """
        ok = False
        if self.__enabled:
            self.__showNotificationMessage(
                self.tr("Updating threat lists..."), 0)
            ok = self.updateHashPrefixCache()[0]
            if ok:
                self.__showNotificationMessage(
                    self.tr("Updating threat lists done"))
            else:
                self.__showNotificationMessage(
                    self.tr("Updating threat lists failed"))
        
        if ok:
            nextUpdateDateTime = \
                self.__apiClient.getFairUseDelayExpirationDateTime()
            Preferences.setWebBrowser("SafeBrowsingUpdateDateTime",
                                      nextUpdateDateTime)
            self.__threatListsUpdateTimer.start(
                (QDateTime.currentDateTime().secsTo(nextUpdateDateTime) + 2) *
                1000)
            # 2 seconds extra wait time; interval in milliseconds
        else:
            Preferences.setWebBrowser("SafeBrowsingUpdateDateTime",
                                      QDateTime())
    
    def updateHashPrefixCache(self):
        """
        Public method to load or update the locally cached threat lists.
        
        @return flag indicating success and an error message
        @rtype tuple of (bool, str)
        """
        if not self.__enabled:
            return False, self.tr("Safe Browsing is disabled.")
        
        if not self.__apiClient.fairUseDelayExpired():
            return False, \
                self.tr("The fair use wait period has not expired yet."
                        "Expiration will be at {0}.").format(
                    self.__apiClient.getFairUseDelayExpirationDateTime()
                    .toString("yyyy-MM-dd, HH:mm:ss"))
        
        self.__updatingThreatLists = True
        ok = True
        errorMessage = ""
        
        # step 1: remove expired hashes
        self.__cache.cleanupFullHashes()
        
        # step 2: update threat lists
        threatListsForRemove = {}
        for threatList, clientState in self.__cache.getThreatLists():
            threatListsForRemove[repr(threatList)] = threatList
        threatLists = self.__apiClient.getThreatLists()
        maximum = len(threatLists)
        current = 0
        self.progressMessage.emit(self.tr("Updating threat lists"), maximum)
        for entry in threatLists:
            current += 1
            self.progress.emit(current)
            QCoreApplication.processEvents()
            threatList = ThreatList.fromApiEntry(entry)
            if self.__platforms is None or \
                    threatList.platformType in self.__platforms:
                self.__cache.addThreatList(threatList)
                key = repr(threatList)
                if key in threatListsForRemove:
                    del threatListsForRemove[key]
        maximum = len(threatListsForRemove.values())
        current = 0
        self.progressMessage.emit(self.tr("Deleting obsolete threat lists"),
                                  maximum)
        for threatList in threatListsForRemove.values():
            current += 1
            self.progress.emit(current)
            QCoreApplication.processEvents()
            self.__cache.deleteHashPrefixList(threatList)
            self.__cache.deleteThreatList(threatList)
        del threatListsForRemove
        
        # step 3: update threats
        threatLists = self.__cache.getThreatLists()
        clientStates = {}
        for threatList, clientState in threatLists:
            clientStates[threatList.asTuple()] = clientState
        threatsUpdateResponses = \
            self.__apiClient.getThreatsUpdate(clientStates)
        maximum = len(threatsUpdateResponses)
        current = 0
        self.progressMessage.emit(self.tr("Updating hash prefixes"), maximum)
        for response in threatsUpdateResponses:
            current += 1
            self.progress.emit(current)
            QCoreApplication.processEvents()
            responseThreatList = ThreatList.fromApiEntry(response)
            if response["responseType"] == "FULL_UPDATE":
                self.__cache.deleteHashPrefixList(responseThreatList)
            for removal in response.get("removals", []):
                self.__cache.removeHashPrefixIndices(
                    responseThreatList, removal["rawIndices"]["indices"])
                QCoreApplication.processEvents()
            for addition in response.get("additions", []):
                hashPrefixList = HashPrefixList(
                    addition["rawHashes"]["prefixSize"],
                    base64.b64decode(addition["rawHashes"]["rawHashes"]))
                self.__cache.populateHashPrefixList(responseThreatList,
                                                    hashPrefixList)
                QCoreApplication.processEvents()
            expectedChecksum = base64.b64decode(response["checksum"]["sha256"])
            if self.__verifyThreatListChecksum(responseThreatList,
                                               expectedChecksum):
                self.__cache.updateThreatListClientState(
                    responseThreatList, response["newClientState"])
            else:
                ok = False
                errorMessage = self.tr(
                    "Local cache checksum does not match the server. Consider"
                    " cleaning the cache. Threat update has been aborted.")
        
        self.__updatingThreatLists = False
        
        return ok, errorMessage
    
    def isUpdatingThreatLists(self):
        """
        Public method to check, if we are in the process of updating the
        threat lists.
        
        @return flag indicating an update process is active
        @rtype bool
        """
        return self.__updatingThreatLists
    
    def __verifyThreatListChecksum(self, threatList, remoteChecksum):
        """
        Private method to verify the local checksum of a threat list with the
        checksum of the safe browsing server.
        
        @param threatList threat list to calculate checksum for
        @type ThreatList
        @param remoteChecksum SHA256 checksum as reported by the Google server
        @type bytes
        @return flag indicating equality
        @rtype bool
        """
        localChecksum = self.__cache.hashPrefixListChecksum(threatList)
        return remoteChecksum == localChecksum
    
    def fullCacheCleanup(self):
        """
        Public method to clean up the cache completely.
        """
        self.__cache.prepareCacheDb()
    
    def showSafeBrowsingDialog(self):
        """
        Public slot to show the safe browsing management dialog.
        """
        if self.__gsbDialog is None:
            from WebBrowser.WebBrowserWindow import WebBrowserWindow
            from .SafeBrowsingDialog import SafeBrowsingDialog
            self.__gsbDialog = SafeBrowsingDialog(
                self, parent=WebBrowserWindow.mainWindow())
        
        self.__gsbDialog.show()
    
    def lookupUrl(self, url):
        """
        Public method to lookup an URL.
        
        @param url URL to be checked
        @type str or QUrl
        @return list of threat lists the URL was found in
        @rtype list of ThreatList
        @exception ValueError raised for an invalid URL
        """
        if self.__enabled:
            if isinstance(url, QUrl):
                urlStr = url.toString().strip()
            else:
                urlStr = url.strip()
            
            if not urlStr:
                raise ValueError("Empty URL given.")
            
            urlHashes = SafeBrowsingUrl(urlStr).hashes()
            listNames = self.__lookupHashes(urlHashes)
            if listNames:
                return listNames
        
        return None
    
    def __lookupHashes(self, fullHashes):
        """
        Private method to lookup the given hashes.
        
        @param fullHashes list of hashes to lookup
        @type list of bytes
        @return names of threat lists hashes were found in
        @rtype list of ThreatList
        """
        fullHashes = list(fullHashes)
        cues = [toHex(fh[:4]) for fh in fullHashes]
        result = []
        
        matchingPrefixes = {}
        matchingFullHashes = set()
        isPotentialThreat = False
        # Lookup hash prefixes which match full URL hash
        for threatList, hashPrefix, negativeCacheExpired in \
                self.__cache.lookupHashPrefix(cues):
            for fullHash in fullHashes:
                if fullHash.startswith(hashPrefix):
                    isPotentialThreat = True
                    # consider hash prefix negative cache as expired if it
                    # is expired in at least one threat list
                    matchingPrefixes[hashPrefix] = matchingPrefixes.get(
                        hashPrefix, False) or negativeCacheExpired
                    matchingFullHashes.add(fullHash)
            
        # if none matches, url hash is clear
        if not isPotentialThreat:
            return []
        
        # if there is non-expired full hash, URL is blacklisted
        matchingExpiredThreatLists = set()
        for threatList, hasExpired in self.__cache.lookupFullHashes(
                matchingFullHashes):
            if hasExpired:
                matchingExpiredThreatLists.add(threatList)
            else:
                result.append(threatList)
        if result:
            return result
        
        # If there are no matching expired full hash entries and negative
        # cache is still current for all prefixes, consider it safe.
        if len(matchingExpiredThreatLists) == 0 and \
           sum(map(int, matchingPrefixes.values())) == 0:
            return []
        
        # Now it can be assumed that there are expired matching full hash
        # entries and/or cache prefix entries with expired negative cache.
        # Both require full hash synchronization.
        self.__syncFullHashes(matchingPrefixes.keys())
        
        # Now repeat full hash lookup
        for threatList, hasExpired in self.__cache.lookupFullHashes(
                matchingFullHashes):
            if not hasExpired:
                result.append(threatList)
        
        return result
    
    def __syncFullHashes(self, hashPrefixes):
        """
        Private method to download full hashes matching given prefixes.
        
        This also updates the cache expiration timestamps.
        
        @param hashPrefixes list of hash prefixes to get full hashes for
        @type list of bytes
        """
        threatLists = self.__cache.getThreatLists()
        clientStates = {}
        for threatList, clientState in threatLists:
            clientStates[threatList.asTuple()] = clientState
        
        fullHashResponses = self.__apiClient.getFullHashes(
            hashPrefixes, clientStates)
        
        # update negative cache for each hash prefix
        # store full hash with positive cache bumped up
        for match in fullHashResponses["matches"]:
            threatList = ThreatList.fromApiEntry(match)
            hashValue = base64.b64decode(match["threat"]["hash"])
            cacheDuration = int(match["cacheDuration"].rstrip("s"))
            malwareThreatType = None
            for metadata in match["threatEntryMetadata"].get("entries", []):
                key = base64.b64decode(metadata["key"])
                value = base64.b64decode(metadata["value"])
                if key == b"malware_threat_type":
                    malwareThreatType = value
                    if not isinstance(malwareThreatType, str):
                        malwareThreatType = malwareThreatType.decode()
            self.__cache.storeFullHash(threatList, hashValue, cacheDuration,
                                       malwareThreatType)
        
        negativeCacheDuration = int(
            fullHashResponses["negativeCacheDuration"].rstrip("s"))
        for prefixValue in hashPrefixes:
            for threatList, clientState in threatLists:
                self.__cache.updateHashPrefixExpiration(
                    threatList, prefixValue, negativeCacheDuration)
    
    @classmethod
    def getIgnoreSchemes(cls):
        """
        Class method to get the schemes not to be checked.
        
        @return list of schemes to be ignored
        @rtype list of str
        """
        return [
            "about",
            "eric",
            "qrc",
            "qthelp",
            "chrome",
            "abp",
            "file",
        ]
    
    def getThreatMessage(self, threatType):
        """
        Public method to get a warning message for the given threat type.
        
        @param threatType threat type to get the message for
        @type str
        @return threat message
        @rtype str
        """
        if self.__apiClient:
            msg = self.__apiClient.getThreatMessage(threatType)
        else:
            msg = ""
        
        return msg
    
    def getThreatMessages(self, threatLists):
        """
        Public method to get threat messages for the given threats.
        
        @param threatLists list of threat lists to get a message for
        @type list of ThreatList
        @return list of threat messages, one per unique threat type
        @rtype list of str
        """
        threatTypes = set()
        for threatList in threatLists:
            threatTypes.add(threatList.threatType)
        
        messages = []
        if self.__apiClient:
            for threatType in sorted(threatTypes):
                msg = self.__apiClient.getThreatMessage(threatType)
                messages.append(msg)
        
        return messages
    
    def getThreatType(self, threatList):
        """
        Public method to get a display string for a given threat type.
        
        @param threatList threat list to get display string for
        @type str
        @return display string
        @rtype str
        """
        displayString = ""
        if self.__apiClient:
            displayString = self.__apiClient.getThreatType(
                threatList.threatType)
        return displayString
    
    def getPlatformString(self, platformType):
        """
        Public method to get the platform string for a given platform type.
        
        @param platformType platform type as defined in the v4 API
        @type str
        @return platform string
        @rtype str
        """
        if self.__apiClient:
            return self.__apiClient.getPlatformString(platformType)
        else:
            return ""
    
    def getThreatEntryString(self, threatEntry):
        """
        Public method to get the threat entry string.
        
        @param threatEntry threat entry type as defined in the v4 API
        @type str
        @return threat entry string
        @rtype str
        """
        if self.__apiClient:
            return self.__apiClient.getThreatEntryString(threatEntry)
        else:
            return ""

eric ide

mercurial