Sat, 29 Jul 2017 19:41:16 +0200
Started implementing the safe browsing manager and management dialog.
# -*- coding: utf-8 -*- # Copyright (c) 2017 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the low level interface for Google Safe Browsing. """ from __future__ import unicode_literals try: str = unicode # __IGNORE_EXCEPTION__ except NameError: pass import json import base64 from PyQt5.QtCore import pyqtSignal, QObject, QDateTime, QUrl, QByteArray, \ QCoreApplication, QEventLoop from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply from WebBrowser.WebBrowserWindow import WebBrowserWindow class SafeBrowsingAPIClient(QObject): """ Class implementing the low level interface for Google Safe Browsing. @signal networkError(str) emitted to indicate a network error """ ClientId = "eric6_API_client" ClientVersion = "1.0.0" GsbUrlTemplate = "https://safebrowsing.googleapis.com/v4/{0}?key={1}" networkError = pyqtSignal(str) def __init__(self, apiKey, fairUse=True, parent=None): """ Constructor @param apiKey API key to be used @type str @param fairUse flag indicating to follow the fair use policy @type bool @param parent reference to the parent object @type QObject """ self.__apiKey = apiKey self.__fairUse = fairUse self.__nextRequestNoSoonerThan = QDateTime() self.__failCount = 0 def setApiKey(self, apiKey): """ Public method to set the API key. @param apiKey API key to be set @type str """ self.__apiKey = apiKey def getThreatLists(self): """ Public method to retrieve all available threat lists. @return list of threat lists @rtype list of dict containing 'threatType', 'platformType' and 'threatEntryType' """ url = QUrl(self.GsbUrlTemplate.format("threatLists", self.__apiKey)) req = QNetworkRequest(url) reply = WebBrowserWindow.networkManager().get(req) while reply.isRunning(): QCoreApplication.processEvents(QEventLoop.AllEvents, 200) # max. 200 ms processing res = None if reply.error() != QNetworkReply.NoError: self.networkError.emit(reply.errorString()) else: result = self.__extractData(reply) res = result["threatLists"] reply.deleteLater() return res def getThreatsUpdate(self, clientStates): """ Public method to fetch hash prefix updates for the given threat list. @param clientStates dictionary of client states with keys like (threatType, platformType, threatEntryType) @type dict @return list of threat updates @rtype list of dict """ requestBody = { "client": { "clientId": self.ClientId, "clientVersion": self.ClientVersion, }, "listUpdateRequests": [], } for (threatType, platformType, threatEntryType), currentState in \ clientStates.items(): requestBody["listUpdateRequests"].append( { "threatType": threatType, "platformType": platformType, "threatEntryType": threatEntryType, "state": currentState, "constraints": { "supportedCompressions": ["RAW"], } } ) data = QByteArray(json.dumps(requestBody).encode("utf-8")) url = QUrl(self.GsbUrlTemplate.format("threatListUpdates:fetch", self.__apiKey)) req = QNetworkRequest(url) req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json") reply = WebBrowserWindow.networkManager().post(req, data) while reply.isRunning(): QCoreApplication.processEvents(QEventLoop.AllEvents, 200) # max. 200 ms processing res = None if reply.error() != QNetworkReply.NoError: self.networkError.emit(reply.errorString()) else: result = self.__extractData(reply) res = result["listUpdateResponses"] reply.deleteLater() return res def getFullHashes(self, prefixes, clientState): """ Public method to find full hashes matching hash prefixes. @param prefixes list of hash prefixes to find @type list of str (Python 2) or list of bytes (Python 3) @param clientState dictionary of client states with keys like (threatType, platformType, threatEntryType) @type dict @return dictionary containing the list of found hashes and the negative cache duration @rtype dict """ requestBody = { "client": { "clientId": self.ClientId, "clientVersion": self.ClientVersion, }, "clientStates": [], "threatInfo": { "threatTypes": [], "platformTypes": [], "threatEntryTypes": [], "threatEntries": [], }, } for prefix in prefixes: requestBody["threatInfo"]["threatEntries"].append( {"hash": base64.b64encode(prefix).decode("ascii")}) for (threatType, platformType, threatEntryType), currentState in \ clientState.items(): requestBody["clientStates"].append(clientState) if threatType not in requestBody["threatInfo"]["threatTypes"]: requestBody["threatInfo"]["threatTypes"].append(threatType) if platformType not in \ requestBody["threatInfo"]["platformTypes"]: requestBody["threatInfo"]["platformTypes"].append( platformType) if threatEntryType not in \ requestBody["threatInfo"]["threatEntryTypes"]: requestBody["threatInfo"]["threatEntryTypes"].append( threatEntryType) data = QByteArray(json.dumps(requestBody).encode("utf-8")) url = QUrl(self.GsbUrlTemplate.format("fullHashes:find", self.__apiKey)) req = QNetworkRequest(url) req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json") reply = WebBrowserWindow.networkManager().post(req, data) while reply.isRunning(): QCoreApplication.processEvents(QEventLoop.AllEvents, 200) # max. 200 ms processing res = None if reply.error() != QNetworkReply.NoError: self.networkError.emit(reply.errorString()) else: res = self.__extractData(reply) reply.deleteLater() return res def __extractData(self, reply): """ Private method to extract the data of a network reply. @param reply reference to the network reply object @type QNetworkReply @return extracted data @type list or dict """ result = json.loads(str(reply.readAll(), "utf-8")) self.__setWaitDuration(result.get("minimumWaitDuration")) return result def __setWaitDuration(self, minimumWaitDuration): """ Private method to set the minimum wait duration. @param minimumWaitDuration duration to be set @type str """ if not self.__fairUse or minimumWaitDuration is None: self.__nextRequestNoSoonerThan = QDateTime() else: waitDuration = int(minimumWaitDuration.rstrip("s")) self.__nextRequestNoSoonerThan = \ QDateTime.currentDateTime().addSecs(waitDuration) def fairUseDelayExpired(self): """ Public method to check, if the fair use wait period has expired. @return flag indicating expiration @rtype bool """ return ( self.__fairUse and QDateTime.currentDateTime() >= self.__nextRequestNoSoonerThan ) or not self.__fairUse