diff -r f99d60d6b59b -r 2602857055c5 eric6/WebBrowser/SafeBrowsing/SafeBrowsingAPIClient.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/eric6/WebBrowser/SafeBrowsing/SafeBrowsingAPIClient.py Sun Apr 14 15:09:21 2019 +0200 @@ -0,0 +1,573 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2017 - 2019 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the low level interface for Google Safe Browsing. +""" + +from __future__ import unicode_literals +try: + str = unicode # __IGNORE_EXCEPTION__ +except NameError: + pass + +import json +import base64 + +from PyQt5.QtCore import pyqtSignal, QObject, QDateTime, QUrl, QByteArray, \ + QCoreApplication, QEventLoop +from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply + +from WebBrowser.WebBrowserWindow import WebBrowserWindow + +from .SafeBrowsingThreatList import ThreatList + + +class SafeBrowsingAPIClient(QObject): + """ + Class implementing the low level interface for Google Safe Browsing. + + @signal networkError(str) emitted to indicate a network error + """ + ClientId = "eric6_API_client" + ClientVersion = "2.0.0" + + GsbUrlTemplate = "https://safebrowsing.googleapis.com/v4/{0}?key={1}" + + networkError = pyqtSignal(str) + + def __init__(self, apiKey, fairUse=True, parent=None): + """ + Constructor + + @param apiKey API key to be used + @type str + @param fairUse flag indicating to follow the fair use policy + @type bool + @param parent reference to the parent object + @type QObject + """ + super(SafeBrowsingAPIClient, self).__init__(parent) + + self.__apiKey = apiKey + self.__fairUse = fairUse + + self.__nextRequestNoSoonerThan = QDateTime() + self.__failCount = 0 + + self.__lookupApiCache = {} + # Temporary cache used by the lookup API (v4) + # key: URL as string + # value: dictionary with these entries: + # "validUntil": (QDateTime) + # "threatInfo": (list of ThreatList) + + def setApiKey(self, apiKey): + """ + Public method to set the API key. + + @param apiKey API key to be set + @type str + """ + self.__apiKey = apiKey + + def getThreatLists(self): + """ + Public method to retrieve all available threat lists. + + @return tuple containing list of threat lists and an error message + @rtype tuple of (list of dict containing 'threatType', 'platformType' + and 'threatEntryType', bool) + """ + url = QUrl(self.GsbUrlTemplate.format("threatLists", self.__apiKey)) + req = QNetworkRequest(url) + reply = WebBrowserWindow.networkManager().get(req) + + while reply.isRunning(): + QCoreApplication.processEvents(QEventLoop.AllEvents, 200) + # max. 200 ms processing + + res = None + error = "" + if reply.error() != QNetworkReply.NoError: + error = reply.errorString() + self.networkError.emit(error) + else: + result = self.__extractData(reply) + res = result["threatLists"] + + reply.deleteLater() + return res, error + + ####################################################################### + ## Methods below implement the 'Update API (v4)' + ####################################################################### + + def getThreatsUpdate(self, clientStates): + """ + Public method to fetch hash prefix updates for the given threat list. + + @param clientStates dictionary of client states with keys like + (threatType, platformType, threatEntryType) + @type dict + @return tuple containing the list of threat updates and an error + message + @rtype tuple of (list of dict, bool) + """ + requestBody = { + "client": { + "clientId": self.ClientId, + "clientVersion": self.ClientVersion, + }, + "listUpdateRequests": [], + } + + for (threatType, platformType, threatEntryType), currentState in \ + clientStates.items(): + requestBody["listUpdateRequests"].append( + { + "threatType": threatType, + "platformType": platformType, + "threatEntryType": threatEntryType, + "state": currentState, + "constraints": { + "supportedCompressions": ["RAW"], + } + } + ) + + data = QByteArray(json.dumps(requestBody).encode("utf-8")) + url = QUrl(self.GsbUrlTemplate.format("threatListUpdates:fetch", + self.__apiKey)) + req = QNetworkRequest(url) + req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json") + reply = WebBrowserWindow.networkManager().post(req, data) + + while reply.isRunning(): + QCoreApplication.processEvents(QEventLoop.AllEvents, 200) + # max. 200 ms processing + + res = None + error = "" + if reply.error() != QNetworkReply.NoError: + error = reply.errorString() + self.networkError.emit(error) + else: + result = self.__extractData(reply) + res = result["listUpdateResponses"] + + reply.deleteLater() + return res, error + + def getFullHashes(self, prefixes, clientState): + """ + Public method to find full hashes matching hash prefixes. + + @param prefixes list of hash prefixes to find + @type list of str (Python 2) or list of bytes (Python 3) + @param clientState dictionary of client states with keys like + (threatType, platformType, threatEntryType) + @type dict + @return dictionary containing the list of found hashes and the + negative cache duration + @rtype dict + """ + requestBody = { + "client": { + "clientId": self.ClientId, + "clientVersion": self.ClientVersion, + }, + "clientStates": [], + "threatInfo": { + "threatTypes": [], + "platformTypes": [], + "threatEntryTypes": [], + "threatEntries": [], + }, + } + + for prefix in prefixes: + requestBody["threatInfo"]["threatEntries"].append( + {"hash": base64.b64encode(prefix).decode("ascii")}) + + for (threatType, platformType, threatEntryType), currentState in \ + clientState.items(): + requestBody["clientStates"].append(currentState) + if threatType not in requestBody["threatInfo"]["threatTypes"]: + requestBody["threatInfo"]["threatTypes"].append(threatType) + if platformType not in \ + requestBody["threatInfo"]["platformTypes"]: + requestBody["threatInfo"]["platformTypes"].append( + platformType) + if threatEntryType not in \ + requestBody["threatInfo"]["threatEntryTypes"]: + requestBody["threatInfo"]["threatEntryTypes"].append( + threatEntryType) + + data = QByteArray(json.dumps(requestBody).encode("utf-8")) + url = QUrl(self.GsbUrlTemplate.format("fullHashes:find", + self.__apiKey)) + req = QNetworkRequest(url) + req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json") + reply = WebBrowserWindow.networkManager().post(req, data) + + while reply.isRunning(): + QCoreApplication.processEvents(QEventLoop.AllEvents, 200) + # max. 200 ms processing + + res = [] + if reply.error() != QNetworkReply.NoError: + self.networkError.emit(reply.errorString()) + else: + res = self.__extractData(reply) + + reply.deleteLater() + return res + + def __extractData(self, reply): + """ + Private method to extract the data of a network reply. + + @param reply reference to the network reply object + @type QNetworkReply + @return extracted data + @rtype list or dict + """ + result = json.loads(str(reply.readAll(), "utf-8")) + self.__setWaitDuration(result.get("minimumWaitDuration")) + return result + + def __setWaitDuration(self, minimumWaitDuration): + """ + Private method to set the minimum wait duration. + + @param minimumWaitDuration duration to be set + @type str + """ + if not self.__fairUse or minimumWaitDuration is None: + self.__nextRequestNoSoonerThan = QDateTime() + else: + waitDuration = int(float(minimumWaitDuration.rstrip("s"))) + self.__nextRequestNoSoonerThan = \ + QDateTime.currentDateTime().addSecs(waitDuration) + + def fairUseDelayExpired(self): + """ + Public method to check, if the fair use wait period has expired. + + @return flag indicating expiration + @rtype bool + """ + return ( + self.__fairUse and + QDateTime.currentDateTime() >= self.__nextRequestNoSoonerThan + ) or not self.__fairUse + + def getFairUseDelayExpirationDateTime(self): + """ + Public method to get the date and time the fair use delay will expire. + + @return fair use delay expiration date and time + @rtype QDateTime + """ + return self.__nextRequestNoSoonerThan + + ####################################################################### + ## Methods below implement the 'Lookup API (v4)' + ####################################################################### + + def lookupUrl(self, url, platforms): + """ + Public method to send an URL to Google for checking. + + @param url URL to be checked + @type QUrl + @param platforms list of platform types to check against + @type list of str + @return tuple containing the list of threat list info objects and + an error message + @rtype tuple of (list of ThreatList, str) + """ + error = "" + + # sanitize the URL by removing user info and query data + url = url.adjusted( + QUrl.RemoveUserInfo | QUrl.RemoveQuery | QUrl.RemoveFragment + ) + urlStr = url.toString() + + # check the local cache first + if urlStr in self.__lookupApiCache: + if self.__lookupApiCache[urlStr]["validUntil"] > \ + QDateTime.currentDateTime(): + # cached entry is still valid + return self.__lookupApiCache[urlStr]["threatInfo"], error + else: + del self.__lookupApiCache[urlStr] + + # no valid entry found, ask the safe browsing server + requestBody = { + "client": { + "clientId": self.ClientId, + "clientVersion": self.ClientVersion, + }, + "threatInfo": { + "threatTypes": SafeBrowsingAPIClient.definedThreatTypes(), + "platformTypes": platforms, + "threatEntryTypes": + SafeBrowsingAPIClient.definedThreatEntryTypes(), + "threatEntries": [ + {"url": urlStr}, + ], + }, + } + + data = QByteArray(json.dumps(requestBody).encode("utf-8")) + url = QUrl(self.GsbUrlTemplate.format("threatMatches:find", + self.__apiKey)) + req = QNetworkRequest(url) + req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json") + reply = WebBrowserWindow.networkManager().post(req, data) + + while reply.isRunning(): + QCoreApplication.processEvents(QEventLoop.AllEvents, 200) + # max. 200 ms processing + + threats = [] + if reply.error() != QNetworkReply.NoError: + error = reply.errorString() + self.networkError.emit(error) + else: + res = json.loads(str(reply.readAll(), "utf-8")) + if res and "matches" in res: + cacheDuration = 0 + for match in res["matches"]: + threatInfo = ThreatList( + match["threatType"], + match["platformType"], + match["threatEntryType"], + ) + threats.append(threatInfo) + if "cacheDuration" in match: + cacheDurationSec = int( + match["cacheDuration"].strip().rstrip("s") + .split(".")[0]) + if cacheDurationSec > cacheDuration: + cacheDuration = cacheDurationSec + if cacheDuration > 0 and bool(threats): + validUntil = QDateTime.currentDateTime().addSecs( + cacheDuration) + self.__lookupApiCache[urlStr] = { + "validUntil": validUntil, + "threatInfo": threats + } + + reply.deleteLater() + return threats, error + + ####################################################################### + ## Methods below implement global (class wide) functionality + ####################################################################### + + @classmethod + def getThreatMessage(cls, threatType): + """ + Class method to get a warning message for the given threat type. + + @param threatType threat type to get the message for + @type str + @return threat message + @rtype str + """ + threatType = threatType.lower() + if threatType == "malware": + msg = QCoreApplication.translate( + "SafeBrowsingAPI", + "<h3>Malware Warning</h3>" + "<p>The web site you are about to visit may try to install" + " harmful programs on your computer in order to steal or" + " destroy your data.</p>") + elif threatType == "social_engineering": + msg = QCoreApplication.translate( + "SafeBrowsingAPI", + "<h3>Phishing Warning</h3>" + "<p>The web site you are about to visit may try to trick you" + " into doing something dangerous online, such as revealing" + " passwords or personal information, usually through a fake" + " website.</p>") + elif threatType == "unwanted_software": + msg = QCoreApplication.translate( + "SafeBrowsingAPI", + "<h3>Unwanted Software Warning</h3>" + "<p>The software you are about to download may negatively" + " affect your browsing or computing experience.</p>") + elif threatType == "potentially_harmful_application": + msg = QCoreApplication.translate( + "SafeBrowsingAPI", + "<h3>Potentially Harmful Application</h3>" + "<p>The web site you are about to visit may try to trick you" + " into installing applications, that may negatively affect" + " your browsing experience.</p>") + elif threatType == "malicious_binary": + msg = QCoreApplication.translate( + "SafeBrowsingAPI", + "<h3>Malicious Binary Warning</h3>" + "<p>The software you are about to download may be harmful" + " to your computer.</p>") + else: + # unknow threat + msg = QCoreApplication.translate( + "SafeBrowsingAPI", + "<h3>Unknown Threat Warning</h3>" + "<p>The web site you are about to visit was found in the Safe" + " Browsing Database but was not classified yet.</p>") + + return msg + + @classmethod + def getThreatType(cls, threatType): + """ + Class method to get a display string for a given threat type. + + @param threatType threat type to get display string for + @type str + @return display string + @rtype str + """ + threatType = threatType.lower() + if threatType == "malware": + displayString = QCoreApplication.translate( + "SafeBrowsingAPI", "Malware") + elif threatType == "social_engineering": + displayString = QCoreApplication.translate( + "SafeBrowsingAPI", "Phishing") + elif threatType == "unwanted_software": + displayString = QCoreApplication.translate( + "SafeBrowsingAPI", "Unwanted Software") + elif threatType == "potentially_harmful_application": + displayString = QCoreApplication.translate( + "SafeBrowsingAPI", "Harmful Application") + elif threatType == "malcious_binary": + displayString = QCoreApplication.translate( + "SafeBrowsingAPI", "Malicious Binary") + else: + displayString = QCoreApplication.translate( + "SafeBrowsingAPI", "Unknown Threat") + + return displayString + + @classmethod + def definedThreatTypes(cls): + """ + Class method to get all threat types defined in API v4. + + @return list of defined threat types + @rtype list of str + """ + return [ + "THREAT_TYPE_UNSPECIFIED", "MALWARE", "SOCIAL_ENGINEERING", + "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION", + ] + + @classmethod + def getThreatEntryString(cls, threatEntry): + """ + Class method to get the threat entry string. + + @param threatEntry threat entry type as defined in the v4 API + @type str + @return threat entry string + @rtype str + """ + if threatEntry == "URL": + return "URL" + elif threatEntry == "EXECUTABLE": + return QCoreApplication.translate( + "SafeBrowsingAPI", "executable program") + else: + return QCoreApplication.translate( + "SafeBrowsingAPI", "unknown type") + + @classmethod + def definedThreatEntryTypes(cls): + """ + Class method to get all threat entry types defined in API v4. + + @return list of all defined threat entry types + @rtype list of str + """ + return [ + "THREAT_ENTRY_TYPE_UNSPECIFIED", "URL", "EXECUTABLE", + ] + + @classmethod + def getPlatformString(cls, platformType): + """ + Class method to get the platform string for a given platform type. + + @param platformType platform type as defined in the v4 API + @type str + @return platform string + @rtype str + """ + platformStrings = { + "WINDOWS": "Windows", + "LINUX": "Linux", + "ANDROID": "Android", + "OSX": "macOS", + "IOS": "iOS", + "CHROME": "Chrome OS", + } + if platformType in platformStrings: + return platformStrings[platformType] + + if platformType == "ANY_PLATFORM": + return QCoreApplication.translate( + "SafeBrowsingAPI", "any defined platform") + elif platformType == "ALL_PLATFORMS": + return QCoreApplication.translate( + "SafeBrowsingAPI", "all defined platforms") + else: + return QCoreApplication.translate( + "SafeBrowsingAPI", "unknown platform") + + @classmethod + def getPlatformTypes(cls, platform): + """ + Class method to get the platform types for a given platform. + + @param platform platform string + @type str (one of 'linux', 'windows', 'macos') + @return list of platform types as defined in the v4 API for the + given platform + @rtype list of str + @exception ValueError raised to indicate an invalid platform string + """ + platform = platform.lower() + + platformTypes = ["ANY_PLATFORM", "ALL_PLATFORMS"] + if platform == "linux": + platformTypes.append("LINUX") + elif platform == "windows": + platformTypes.append("WINDOWS") + elif platform == "macos": + platformTypes.append("OSX") + else: + raise ValueError("Unsupported platform") + + return platformTypes + + @classmethod + def definedPlatformTypes(cls): + """ + Class method to get all platform types defined in API v4. + + @return list of all defined platform types + @rtype list of str + """ + return [ + "PLATFORM_TYPE_UNSPECIFIED", "WINDOWS", "LINUX", "ANDROID", "OSX", + "IOS", "ANY_PLATFORM", "ALL_PLATFORMS", "CHROME", + ]