Fri, 04 Aug 2017 18:38:45 +0200
Finished coding the safe browsing module of the new web browser.
# -*- coding: utf-8 -*- # Copyright (c) 2017 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the interface for Google Safe Browsing. """ # # Some part of this code were ported from gglsbl.client and adapted # to Qt. # # https://github.com/afilipovich/gglsbl # from __future__ import unicode_literals import os import base64 from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication, QUrl import Preferences import Utilities from .SafeBrowsingAPIClient import SafeBrowsingAPIClient from .SafeBrowsingCache import SafeBrowsingCache, ThreatList, HashPrefixList from .SafeBrowsingUrl import SafeBrowsingUrl from .SafeBrowsingUtilities import toHex class SafeBrowsingManager(QObject): """ Class implementing the interface for Google Safe Browsing. @signal progressMessage(message,maximum) emitted to give a message for the action about to be performed and the maximum value @signal progress(current) emitted to signal the current progress """ progressMessage = pyqtSignal(str, int) progress = pyqtSignal(int) def __init__(self): """ Constructor """ super(SafeBrowsingManager, self).__init__() self.__apiKey = Preferences.getWebBrowser("SafeBrowsingApiKey") if self.__apiKey: self.__apiClient = SafeBrowsingAPIClient(self.__apiKey, parent=self) # TODO: switch these after debugging is finished ## self.__apiClient = SafeBrowsingAPIClient(self.__apiKey, ## parent=self, ## fairUse=False) else: self.__apiClient = None self.__enabled = ( Preferences.getWebBrowser("SafeBrowsingEnabled") and bool(self.__apiKey)) gsbCachePath = os.path.join( Utilities.getConfigDir(), "web_browser", "safe_browsing") self.__cache = SafeBrowsingCache(gsbCachePath, self) self.__gsbDialog = None self.__setPlatforms() def configurationChanged(self): """ Public method to handle changes of the settings. """ apiKey = Preferences.getWebBrowser("SafeBrowsingApiKey") if apiKey != self.__apiKey: self.__apiKey = apiKey if self.__apiKey: if self.__apiClient: self.__apiClient.setApiKey(self.__apiKey) else: self.__apiClient = SafeBrowsingAPIClient(self.__apiKey, parent=self) # TODO: switch these after debugging is finished ## self.__apiClient = SafeBrowsingAPIClient(self.__apiKey, ## parent=self, ## fairUse=False) self.__enabled = ( Preferences.getWebBrowser("SafeBrowsingEnabled") and bool(self.__apiKey)) self.__setPlatforms() def __setPlatforms(self): """ Private method to set the platforms to be checked against. """ self.__platforms = None if Preferences.getWebBrowser("SafeBrowsingFilterPlatform"): if Utilities.isWindowsPlatform(): platform = "windows" elif Utilities.isMacPlatform(): platform = "macos" else: # treat all other platforms like linux platform = "linux" self.__platforms = SafeBrowsingAPIClient.getPlatformTypes(platform) def isEnabled(self): """ Public method to check, if safe browsing is enabled. @return flag indicating the enabled state @rtype bool """ return self.__enabled def close(self): """ Public method to close the safe browsing interface. """ self.__cache.close() def fairUseDelayExpired(self): """ Public method to check, if the fair use wait period has expired. @return flag indicating expiration @rtype bool """ return self.__enabled and self.__apiClient.fairUseDelayExpired() def updateHashPrefixCache(self): """ Public method to load or update the locally cached threat lists. @return flag indicating success and an error message @rtype tuple of (bool, str) """ if not self.__enabled: return False, self.tr("Safe Browsing is disabled.") if not self.__apiClient.fairUseDelayExpired(): return False, \ self.tr("The fair use wait period has not expired yet." "Expiration will be at {0}.").format( self.__apiClient.getFairUseDelayExpirationDateTime() .toString("yyyy-MM-dd, HH:mm:ss")) # step 1: remove expired hashes self.__cache.cleanupFullHashes() # step 2: update threat lists threatListsForRemove = {} for threatList, clientState in self.__cache.getThreatLists(): threatListsForRemove[repr(threatList)] = threatList threatLists = self.__apiClient.getThreatLists() maximum = len(threatLists) current = 0 self.progressMessage.emit(self.tr("Updating threat lists"), maximum) for entry in threatLists: current += 1 self.progress.emit(current) QCoreApplication.processEvents() threatList = ThreatList.fromApiEntry(entry) if self.__platforms is None or \ threatList.platformType in self.__platforms: self.__cache.addThreatList(threatList) key = repr(threatList) if key in threatListsForRemove: del threatListsForRemove[key] maximum = len(threatListsForRemove.values()) current = 0 self.progressMessage.emit(self.tr("Deleting obsolete threat lists"), maximum) for threatList in threatListsForRemove.values(): current += 1 self.progress.emit(current) QCoreApplication.processEvents() self.__cache.deleteHashPrefixList(threatList) self.__cache.deleteThreatList(threatList) del threatListsForRemove # step 3: update threats threatLists = self.__cache.getThreatLists() clientStates = {} for threatList, clientState in threatLists: clientStates[threatList.asTuple()] = clientState threatsUpdateResponses = \ self.__apiClient.getThreatsUpdate(clientStates) maximum = len(threatsUpdateResponses) current = 0 self.progressMessage.emit(self.tr("Updating hash prefixes"), maximum) for response in threatsUpdateResponses: current += 1 self.progress.emit(current) QCoreApplication.processEvents() responseThreatList = ThreatList.fromApiEntry(response) if response["responseType"] == "FULL_UPDATE": self.__cache.deleteHashPrefixList(responseThreatList) for removal in response.get("removals", []): self.__cache.removeHashPrefixIndices( responseThreatList, removal["rawIndices"]["indices"]) QCoreApplication.processEvents() for addition in response.get("additions", []): hashPrefixList = HashPrefixList( addition["rawHashes"]["prefixSize"], base64.b64decode(addition["rawHashes"]["rawHashes"])) self.__cache.populateHashPrefixList(responseThreatList, hashPrefixList) QCoreApplication.processEvents() expectedChecksum = base64.b64decode(response["checksum"]["sha256"]) if self.__verifyThreatListChecksum(responseThreatList, expectedChecksum): self.__cache.updateThreatListClientState( responseThreatList, response["newClientState"]) else: return False, \ self.tr("Local cache checksum does not match the server." " Consider cleaning the cache. Threat update has" " been aborted.") return True, "" def __verifyThreatListChecksum(self, threatList, remoteChecksum): """ Private method to verify the local checksum of a threat list with the checksum of the safe browsing server. @param threatList threat list to calculate checksum for @type ThreatList @param remoteChecksum SHA256 checksum as reported by the Google server @type bytes @return flag indicating equality @rtype bool """ localChecksum = self.__cache.hashPrefixListChecksum(threatList) return remoteChecksum == localChecksum def fullCacheCleanup(self): """ Public method to clean up the cache completely. """ self.__cache.prepareCacheDb() def showSafeBrowsingDialog(self): """ Public slot to show the safe browsing management dialog. """ if self.__gsbDialog is None: from WebBrowser.WebBrowserWindow import WebBrowserWindow from .SafeBrowsingDialog import SafeBrowsingDialog self.__gsbDialog = SafeBrowsingDialog( self, parent=WebBrowserWindow.mainWindow()) self.__gsbDialog.show() def lookupUrl(self, url): """ Public method to lookup an URL. @param url URL to be checked @type str or QUrl @return list of threat lists the URL was found in @rtype list of ThreatList @exception ValueError raised for an invalid URL """ if self.__enabled: if isinstance(url, QUrl): urlStr = url.toString().strip() else: urlStr = url.strip() if not urlStr: raise ValueError("Empty URL given.") urlHashes = SafeBrowsingUrl(urlStr).hashes() listNames = self.__lookupHashes(urlHashes) if listNames: return listNames return None def __lookupHashes(self, fullHashes): """ Private method to lookup the given hashes. @param fullHashes list of hashes to lookup @type list of bytes @return names of threat lists hashes were found in @rtype list of ThreatList """ fullHashes = list(fullHashes) cues = [toHex(fh[:4]) for fh in fullHashes] result = [] matchingPrefixes = {} matchingFullHashes = set() isPotentialThreat = False # Lookup hash prefixes which match full URL hash for threatList, hashPrefix, negativeCacheExpired in \ self.__cache.lookupHashPrefix(cues): for fullHash in fullHashes: if fullHash.startswith(hashPrefix): isPotentialThreat = True # consider hash prefix negative cache as expired if it # is expired in at least one threat list matchingPrefixes[hashPrefix] = matchingPrefixes.get( hashPrefix, False) or negativeCacheExpired matchingFullHashes.add(fullHash) # if none matches, url hash is clear if not isPotentialThreat: return [] # if there is non-expired full hash, URL is blacklisted matchingExpiredThreatLists = set() for threatList, hasExpired in self.__cache.lookupFullHashes( matchingFullHashes): if hasExpired: matchingExpiredThreatLists.add(threatList) else: result.append(threatList) if result: return result # If there are no matching expired full hash entries and negative # cache is still current for all prefixes, consider it safe. if len(matchingExpiredThreatLists) == 0 and \ sum(map(int, matchingPrefixes.values())) == 0: return [] # Now it can be assumed that there are expired matching full hash # entries and/or cache prefix entries with expired negative cache. # Both require full hash synchronization. self.__syncFullHashes(matchingPrefixes.keys()) # Now repeat full hash lookup for threatList, hasExpired in self.__cache.lookupFullHashes( matchingFullHashes): if not hasExpired: result.append(threatList) return result def __syncFullHashes(self, hashPrefixes): """ Private method to download full hashes matching given prefixes. This also updates the cache expiration timestamps. @param hashPrefixes list of hash prefixes to get full hashes for @type list of bytes """ threatLists = self.__cache.getThreatLists() clientStates = {} for threatList, clientState in threatLists: clientStates[threatList.asTuple()] = clientState fullHashResponses = self.__apiClient.getFullHashes( hashPrefixes, clientStates) # update negative cache for each hash prefix # store full hash with positive cache bumped up for match in fullHashResponses["matches"]: threatList = ThreatList.fromApiEntry(match) hashValue = base64.b64decode(match["threat"]["hash"]) cacheDuration = int(match["cacheDuration"].rstrip("s")) malwareThreatType = None for metadata in match["threatEntryMetadata"].get("entries", []): key = base64.b64decode(metadata["key"]) value = base64.b64decode(metadata["value"]) if key == b"malware_threat_type": malwareThreatType = value if not isinstance(malwareThreatType, str): malwareThreatType = malwareThreatType.decode() self.__cache.storeFullHash(threatList, hashValue, cacheDuration, malwareThreatType) negativeCacheDuration = int( fullHashResponses["negativeCacheDuration"].rstrip("s")) for prefixValue in hashPrefixes: for threatList, clientState in threatLists: self.__cache.updateHashPrefixExpiration( threatList, prefixValue, negativeCacheDuration) @classmethod def getIgnoreSchemes(cls): """ Class method to get the schemes not to be checked. @return list of schemes to be ignored @rtype list of str """ return [ "about", "eric", "qrc", "qthelp", "chrome", "abp", "file", ] def getThreatMessage(self, threatType): """ Public method to get a warning message for the given threat type. @param threatType threat type to get the message for @type str @return threat message @rtype str """ if self.__apiClient: msg = self.__apiClient.getThreatMessage(threatType) else: msg = "" return msg def getThreatMessages(self, threatLists): """ Public method to get threat messages for the given threats. @param threatLists list of threat lists to get a message for @type list of ThreatList @return list of threat messages, one per unique threat type @rtype list of str """ threatTypes = set() for threatList in threatLists: threatTypes.add(threatList.threatType) messages = [] if self.__apiClient: for threatType in sorted(threatTypes): msg = self.__apiClient.getThreatMessage(threatType) messages.append(msg) return messages def getThreatType(self, threatList): """ Public method to get a display string for a given threat type. @param threatList threat list to get display string for @type str @return display string @rtype str """ displayString = "" if self.__apiClient: displayString = self.__apiClient.getThreatType( threatList.threatType) return displayString def getPlatformString(self, platformType): """ Public method to get the platform string for a given platform type. @param platformType platform type as defined in the v4 API @type str @return platform string @rtype str """ if self.__apiClient: return self.__apiClient.getPlatformString(platformType) else: return "" def getThreatEntryString(self, threatEntry): """ Public method to get the threat entry string. @param threatEntry threat entry type as defined in the v4 API @type str @return threat entry string @rtype str """ if self.__apiClient: return self.__apiClient.getThreatEntryString(threatEntry) else: return ""