Tue, 08 Aug 2017 17:20:28 +0200
Added an auto-update feature to the safe browsing manager of the new web browser.
# -*- coding: utf-8 -*- # Copyright (c) 2017 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the interface for Google Safe Browsing. """ # # Some part of this code were ported from gglsbl.client and adapted # to Qt. # # https://github.com/afilipovich/gglsbl # from __future__ import unicode_literals import os import base64 from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QCoreApplication, \ QUrl, QDateTime, QTimer import Preferences import Utilities from .SafeBrowsingAPIClient import SafeBrowsingAPIClient from .SafeBrowsingCache import SafeBrowsingCache, ThreatList, HashPrefixList from .SafeBrowsingUrl import SafeBrowsingUrl from .SafeBrowsingUtilities import toHex class SafeBrowsingManager(QObject): """ Class implementing the interface for Google Safe Browsing. @signal progressMessage(message,maximum) emitted to give a message for the action about to be performed and the maximum value @signal progress(current) emitted to signal the current progress """ progressMessage = pyqtSignal(str, int) progress = pyqtSignal(int) def __init__(self): """ Constructor """ super(SafeBrowsingManager, self).__init__() self.__apiKey = Preferences.getWebBrowser("SafeBrowsingApiKey") if self.__apiKey: self.__apiClient = SafeBrowsingAPIClient(self.__apiKey, parent=self) else: self.__apiClient = None self.__enabled = ( Preferences.getWebBrowser("SafeBrowsingEnabled") and bool(self.__apiKey)) gsbCachePath = os.path.join( Utilities.getConfigDir(), "web_browser", "safe_browsing") self.__cache = SafeBrowsingCache(gsbCachePath, self) self.__gsbDialog = None self.__setPlatforms() self.__updatingThreatLists = False self.__threatListsUpdateTimer = QTimer(self) self.__threatListsUpdateTimer.setSingleShot(True) self.__threatListsUpdateTimer.timeout.connect( self.__threatListsUpdateTimerTimeout) self.__setAutoUpdateThreatLists() def configurationChanged(self): """ Public method to handle changes of the settings. """ apiKey = Preferences.getWebBrowser("SafeBrowsingApiKey") if apiKey != self.__apiKey: self.__apiKey = apiKey if self.__apiKey: if self.__apiClient: self.__apiClient.setApiKey(self.__apiKey) else: self.__apiClient = SafeBrowsingAPIClient(self.__apiKey, parent=self) self.__enabled = ( Preferences.getWebBrowser("SafeBrowsingEnabled") and bool(self.__apiKey)) self.__setPlatforms() self.__setAutoUpdateThreatLists() def __setPlatforms(self): """ Private method to set the platforms to be checked against. """ self.__platforms = None if Preferences.getWebBrowser("SafeBrowsingFilterPlatform"): if Utilities.isWindowsPlatform(): platform = "windows" elif Utilities.isMacPlatform(): platform = "macos" else: # treat all other platforms like linux platform = "linux" self.__platforms = SafeBrowsingAPIClient.getPlatformTypes(platform) def isEnabled(self): """ Public method to check, if safe browsing is enabled. @return flag indicating the enabled state @rtype bool """ return self.__enabled def close(self): """ Public method to close the safe browsing interface. """ self.__cache.close() def fairUseDelayExpired(self): """ Public method to check, if the fair use wait period has expired. @return flag indicating expiration @rtype bool """ return self.__enabled and self.__apiClient.fairUseDelayExpired() def __showStatusBarMessage(self, message): """ Private method to show some message in the main window status bar. @param message message to be shown @type str """ from WebBrowser.WebBrowserWindow import WebBrowserWindow WebBrowserWindow.mainWindow().statusBar().showMessage(message, 5000) def __setAutoUpdateThreatLists(self): """ Private method to set auto update for the threat lists. """ autoUpdateEnabled = Preferences.getWebBrowser("SafeBrowsingAutoUpdate") if autoUpdateEnabled and self.__enabled: nextUpdateDateTime = Preferences.getWebBrowser( "SafeBrowsingUpdateDateTime") if nextUpdateDateTime.isValid(): interval = \ QDateTime.currentDateTime().secsTo(nextUpdateDateTime) + 2 # 2 seconds extra wait time; interval in milliseconds if interval < 5: interval = 5 # minimum 5 seconds interval else: interval = 5 # just wait 5 seconds self.__threatListsUpdateTimer.start(interval * 1000) else: if self.__threatListsUpdateTimer.isActive(): self.__threatListsUpdateTimer.stop() @pyqtSlot() def __threatListsUpdateTimerTimeout(self): """ Private slot to perform the auto update of the threat lists. """ ok = False if self.__enabled: self.__showStatusBarMessage(self.tr("Updating threat lists...")) ok = self.updateHashPrefixCache()[0] if ok: self.__showStatusBarMessage( self.tr("Updating threat lists done")) else: self.__showStatusBarMessage( self.tr("Updating threat lists failed")) if ok: nextUpdateDateTime = \ self.__apiClient.getFairUseDelayExpirationDateTime() Preferences.setWebBrowser("SafeBrowsingUpdateDateTime", nextUpdateDateTime) self.__threatListsUpdateTimer.start( (QDateTime.currentDateTime().secsTo(nextUpdateDateTime) + 2) * 1000) # 2 seconds extra wait time; interval in milliseconds else: Preferences.setWebBrowser("SafeBrowsingUpdateDateTime", QDateTime()) def updateHashPrefixCache(self): """ Public method to load or update the locally cached threat lists. @return flag indicating success and an error message @rtype tuple of (bool, str) """ if not self.__enabled: return False, self.tr("Safe Browsing is disabled.") if not self.__apiClient.fairUseDelayExpired(): return False, \ self.tr("The fair use wait period has not expired yet." "Expiration will be at {0}.").format( self.__apiClient.getFairUseDelayExpirationDateTime() .toString("yyyy-MM-dd, HH:mm:ss")) self.__updatingThreatLists = True ok = True errorMessage = "" # step 1: remove expired hashes self.__cache.cleanupFullHashes() # step 2: update threat lists threatListsForRemove = {} for threatList, clientState in self.__cache.getThreatLists(): threatListsForRemove[repr(threatList)] = threatList threatLists = self.__apiClient.getThreatLists() maximum = len(threatLists) current = 0 self.progressMessage.emit(self.tr("Updating threat lists"), maximum) for entry in threatLists: current += 1 self.progress.emit(current) QCoreApplication.processEvents() threatList = ThreatList.fromApiEntry(entry) if self.__platforms is None or \ threatList.platformType in self.__platforms: self.__cache.addThreatList(threatList) key = repr(threatList) if key in threatListsForRemove: del threatListsForRemove[key] maximum = len(threatListsForRemove.values()) current = 0 self.progressMessage.emit(self.tr("Deleting obsolete threat lists"), maximum) for threatList in threatListsForRemove.values(): current += 1 self.progress.emit(current) QCoreApplication.processEvents() self.__cache.deleteHashPrefixList(threatList) self.__cache.deleteThreatList(threatList) del threatListsForRemove # step 3: update threats threatLists = self.__cache.getThreatLists() clientStates = {} for threatList, clientState in threatLists: clientStates[threatList.asTuple()] = clientState threatsUpdateResponses = \ self.__apiClient.getThreatsUpdate(clientStates) maximum = len(threatsUpdateResponses) current = 0 self.progressMessage.emit(self.tr("Updating hash prefixes"), maximum) for response in threatsUpdateResponses: current += 1 self.progress.emit(current) QCoreApplication.processEvents() responseThreatList = ThreatList.fromApiEntry(response) if response["responseType"] == "FULL_UPDATE": self.__cache.deleteHashPrefixList(responseThreatList) for removal in response.get("removals", []): self.__cache.removeHashPrefixIndices( responseThreatList, removal["rawIndices"]["indices"]) QCoreApplication.processEvents() for addition in response.get("additions", []): hashPrefixList = HashPrefixList( addition["rawHashes"]["prefixSize"], base64.b64decode(addition["rawHashes"]["rawHashes"])) self.__cache.populateHashPrefixList(responseThreatList, hashPrefixList) QCoreApplication.processEvents() expectedChecksum = base64.b64decode(response["checksum"]["sha256"]) if self.__verifyThreatListChecksum(responseThreatList, expectedChecksum): self.__cache.updateThreatListClientState( responseThreatList, response["newClientState"]) else: ok = False errorMessage = self.tr( "Local cache checksum does not match the server. Consider" " cleaning the cache. Threat update has been aborted.") self.__updatingThreatLists = False return ok, errorMessage def isUpdatingThreatLists(self): """ Public method to check, if we are in the process of updating the threat lists. @return flag indicating an update process is active @rtype bool """ return self.__updatingThreatLists def __verifyThreatListChecksum(self, threatList, remoteChecksum): """ Private method to verify the local checksum of a threat list with the checksum of the safe browsing server. @param threatList threat list to calculate checksum for @type ThreatList @param remoteChecksum SHA256 checksum as reported by the Google server @type bytes @return flag indicating equality @rtype bool """ localChecksum = self.__cache.hashPrefixListChecksum(threatList) return remoteChecksum == localChecksum def fullCacheCleanup(self): """ Public method to clean up the cache completely. """ self.__cache.prepareCacheDb() def showSafeBrowsingDialog(self): """ Public slot to show the safe browsing management dialog. """ if self.__gsbDialog is None: from WebBrowser.WebBrowserWindow import WebBrowserWindow from .SafeBrowsingDialog import SafeBrowsingDialog self.__gsbDialog = SafeBrowsingDialog( self, parent=WebBrowserWindow.mainWindow()) self.__gsbDialog.show() def lookupUrl(self, url): """ Public method to lookup an URL. @param url URL to be checked @type str or QUrl @return list of threat lists the URL was found in @rtype list of ThreatList @exception ValueError raised for an invalid URL """ if self.__enabled: if isinstance(url, QUrl): urlStr = url.toString().strip() else: urlStr = url.strip() if not urlStr: raise ValueError("Empty URL given.") urlHashes = SafeBrowsingUrl(urlStr).hashes() listNames = self.__lookupHashes(urlHashes) if listNames: return listNames return None def __lookupHashes(self, fullHashes): """ Private method to lookup the given hashes. @param fullHashes list of hashes to lookup @type list of bytes @return names of threat lists hashes were found in @rtype list of ThreatList """ fullHashes = list(fullHashes) cues = [toHex(fh[:4]) for fh in fullHashes] result = [] matchingPrefixes = {} matchingFullHashes = set() isPotentialThreat = False # Lookup hash prefixes which match full URL hash for threatList, hashPrefix, negativeCacheExpired in \ self.__cache.lookupHashPrefix(cues): for fullHash in fullHashes: if fullHash.startswith(hashPrefix): isPotentialThreat = True # consider hash prefix negative cache as expired if it # is expired in at least one threat list matchingPrefixes[hashPrefix] = matchingPrefixes.get( hashPrefix, False) or negativeCacheExpired matchingFullHashes.add(fullHash) # if none matches, url hash is clear if not isPotentialThreat: return [] # if there is non-expired full hash, URL is blacklisted matchingExpiredThreatLists = set() for threatList, hasExpired in self.__cache.lookupFullHashes( matchingFullHashes): if hasExpired: matchingExpiredThreatLists.add(threatList) else: result.append(threatList) if result: return result # If there are no matching expired full hash entries and negative # cache is still current for all prefixes, consider it safe. if len(matchingExpiredThreatLists) == 0 and \ sum(map(int, matchingPrefixes.values())) == 0: return [] # Now it can be assumed that there are expired matching full hash # entries and/or cache prefix entries with expired negative cache. # Both require full hash synchronization. self.__syncFullHashes(matchingPrefixes.keys()) # Now repeat full hash lookup for threatList, hasExpired in self.__cache.lookupFullHashes( matchingFullHashes): if not hasExpired: result.append(threatList) return result def __syncFullHashes(self, hashPrefixes): """ Private method to download full hashes matching given prefixes. This also updates the cache expiration timestamps. @param hashPrefixes list of hash prefixes to get full hashes for @type list of bytes """ threatLists = self.__cache.getThreatLists() clientStates = {} for threatList, clientState in threatLists: clientStates[threatList.asTuple()] = clientState fullHashResponses = self.__apiClient.getFullHashes( hashPrefixes, clientStates) # update negative cache for each hash prefix # store full hash with positive cache bumped up for match in fullHashResponses["matches"]: threatList = ThreatList.fromApiEntry(match) hashValue = base64.b64decode(match["threat"]["hash"]) cacheDuration = int(match["cacheDuration"].rstrip("s")) malwareThreatType = None for metadata in match["threatEntryMetadata"].get("entries", []): key = base64.b64decode(metadata["key"]) value = base64.b64decode(metadata["value"]) if key == b"malware_threat_type": malwareThreatType = value if not isinstance(malwareThreatType, str): malwareThreatType = malwareThreatType.decode() self.__cache.storeFullHash(threatList, hashValue, cacheDuration, malwareThreatType) negativeCacheDuration = int( fullHashResponses["negativeCacheDuration"].rstrip("s")) for prefixValue in hashPrefixes: for threatList, clientState in threatLists: self.__cache.updateHashPrefixExpiration( threatList, prefixValue, negativeCacheDuration) @classmethod def getIgnoreSchemes(cls): """ Class method to get the schemes not to be checked. @return list of schemes to be ignored @rtype list of str """ return [ "about", "eric", "qrc", "qthelp", "chrome", "abp", "file", ] def getThreatMessage(self, threatType): """ Public method to get a warning message for the given threat type. @param threatType threat type to get the message for @type str @return threat message @rtype str """ if self.__apiClient: msg = self.__apiClient.getThreatMessage(threatType) else: msg = "" return msg def getThreatMessages(self, threatLists): """ Public method to get threat messages for the given threats. @param threatLists list of threat lists to get a message for @type list of ThreatList @return list of threat messages, one per unique threat type @rtype list of str """ threatTypes = set() for threatList in threatLists: threatTypes.add(threatList.threatType) messages = [] if self.__apiClient: for threatType in sorted(threatTypes): msg = self.__apiClient.getThreatMessage(threatType) messages.append(msg) return messages def getThreatType(self, threatList): """ Public method to get a display string for a given threat type. @param threatList threat list to get display string for @type str @return display string @rtype str """ displayString = "" if self.__apiClient: displayString = self.__apiClient.getThreatType( threatList.threatType) return displayString def getPlatformString(self, platformType): """ Public method to get the platform string for a given platform type. @param platformType platform type as defined in the v4 API @type str @return platform string @rtype str """ if self.__apiClient: return self.__apiClient.getPlatformString(platformType) else: return "" def getThreatEntryString(self, threatEntry): """ Public method to get the threat entry string. @param threatEntry threat entry type as defined in the v4 API @type str @return threat entry string @rtype str """ if self.__apiClient: return self.__apiClient.getThreatEntryString(threatEntry) else: return ""