WebBrowser/SafeBrowsing/SafeBrowsingAPIClient.py

Sat, 29 Jul 2017 19:41:16 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 29 Jul 2017 19:41:16 +0200
branch
safe_browsing
changeset 5820
b610cb5b501a
parent 5816
93c74269d59e
child 5829
d3448873ced3
permissions
-rw-r--r--

Started implementing the safe browsing manager and management dialog.

# -*- coding: utf-8 -*-

# Copyright (c) 2017 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the low level interface for Google Safe Browsing.
"""

from __future__ import unicode_literals
try:
    str = unicode       # __IGNORE_EXCEPTION__
except NameError:
    pass

import json
import base64

from PyQt5.QtCore import pyqtSignal, QObject, QDateTime, QUrl, QByteArray, \
    QCoreApplication, QEventLoop
from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply

from WebBrowser.WebBrowserWindow import WebBrowserWindow


class SafeBrowsingAPIClient(QObject):
    """
    Class implementing the low level interface for Google Safe Browsing.
    
    @signal networkError(str) emitted to indicate a network error
    """
    ClientId = "eric6_API_client"
    ClientVersion = "1.0.0"
    
    GsbUrlTemplate = "https://safebrowsing.googleapis.com/v4/{0}?key={1}"
    
    networkError = pyqtSignal(str)
    
    def __init__(self, apiKey, fairUse=True, parent=None):
        """
        Constructor
        
        @param apiKey API key to be used
        @type str
        @param fairUse flag indicating to follow the fair use policy
        @type bool
        @param parent reference to the parent object
        @type QObject
        """
        self.__apiKey = apiKey
        self.__fairUse = fairUse
        
        self.__nextRequestNoSoonerThan = QDateTime()
        self.__failCount = 0
    
    def setApiKey(self, apiKey):
        """
        Public method to set the API key.
        
        @param apiKey API key to be set
        @type str
        """
        self.__apiKey = apiKey
    
    def getThreatLists(self):
        """
        Public method to retrieve all available threat lists.
        
        @return list of threat lists
        @rtype list of dict containing 'threatType', 'platformType' and
            'threatEntryType'
        """
        url = QUrl(self.GsbUrlTemplate.format("threatLists", self.__apiKey))
        req = QNetworkRequest(url)
        reply = WebBrowserWindow.networkManager().get(req)
        
        while reply.isRunning():
            QCoreApplication.processEvents(QEventLoop.AllEvents, 200)
            # max. 200 ms processing
        
        res = None
        if reply.error() != QNetworkReply.NoError:
            self.networkError.emit(reply.errorString())
        else:
            result = self.__extractData(reply)
            res = result["threatLists"]
        
        reply.deleteLater()
        return res
    
    def getThreatsUpdate(self, clientStates):
        """
        Public method to fetch hash prefix updates for the given threat list.
        
        @param clientStates dictionary of client states with keys like
            (threatType, platformType, threatEntryType)
        @type dict
        @return list of threat updates
        @rtype list of dict
        """
        requestBody = {
            "client": {
                "clientId": self.ClientId,
                "clientVersion": self.ClientVersion,
            },
            "listUpdateRequests": [],
        }
        
        for (threatType, platformType, threatEntryType), currentState in \
                clientStates.items():
            requestBody["listUpdateRequests"].append(
                {
                    "threatType": threatType,
                    "platformType": platformType,
                    "threatEntryType": threatEntryType,
                    "state": currentState,
                    "constraints": {
                        "supportedCompressions": ["RAW"],
                    }
                }
            )
        
        data = QByteArray(json.dumps(requestBody).encode("utf-8"))
        url = QUrl(self.GsbUrlTemplate.format("threatListUpdates:fetch",
                                              self.__apiKey))
        req = QNetworkRequest(url)
        req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
        reply = WebBrowserWindow.networkManager().post(req, data)
        
        while reply.isRunning():
            QCoreApplication.processEvents(QEventLoop.AllEvents, 200)
            # max. 200 ms processing
        
        res = None
        if reply.error() != QNetworkReply.NoError:
            self.networkError.emit(reply.errorString())
        else:
            result = self.__extractData(reply)
            res = result["listUpdateResponses"]
        
        reply.deleteLater()
        return res
    
    def getFullHashes(self, prefixes, clientState):
        """
        Public method to find full hashes matching hash prefixes.
        
        @param prefixes list of hash prefixes to find
        @type list of str (Python 2) or list of bytes (Python 3)
        @param clientState dictionary of client states with keys like
            (threatType, platformType, threatEntryType)
        @type dict
        @return dictionary containing the list of found hashes and the
            negative cache duration
        @rtype dict
        """
        requestBody = {
            "client": {
                "clientId": self.ClientId,
                "clientVersion": self.ClientVersion,
            },
            "clientStates": [],
            "threatInfo": {
                "threatTypes": [],
                "platformTypes": [],
                "threatEntryTypes": [],
                "threatEntries": [],
            },
        }
        
        for prefix in prefixes:
            requestBody["threatInfo"]["threatEntries"].append(
                {"hash": base64.b64encode(prefix).decode("ascii")})
        
        for (threatType, platformType, threatEntryType), currentState in \
                clientState.items():
            requestBody["clientStates"].append(clientState)
            if threatType not in requestBody["threatInfo"]["threatTypes"]:
                requestBody["threatInfo"]["threatTypes"].append(threatType)
            if platformType not in \
                    requestBody["threatInfo"]["platformTypes"]:
                requestBody["threatInfo"]["platformTypes"].append(
                    platformType)
            if threatEntryType not in \
                    requestBody["threatInfo"]["threatEntryTypes"]:
                requestBody["threatInfo"]["threatEntryTypes"].append(
                    threatEntryType)
        
        data = QByteArray(json.dumps(requestBody).encode("utf-8"))
        url = QUrl(self.GsbUrlTemplate.format("fullHashes:find",
                                              self.__apiKey))
        req = QNetworkRequest(url)
        req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
        reply = WebBrowserWindow.networkManager().post(req, data)
        
        while reply.isRunning():
            QCoreApplication.processEvents(QEventLoop.AllEvents, 200)
            # max. 200 ms processing
        
        res = None
        if reply.error() != QNetworkReply.NoError:
            self.networkError.emit(reply.errorString())
        else:
            res = self.__extractData(reply)
        
        reply.deleteLater()
        return res
    
    def __extractData(self, reply):
        """
        Private method to extract the data of a network reply.
        
        @param reply reference to the network reply object
        @type QNetworkReply
        @return extracted data
        @type list or dict
        """
        result = json.loads(str(reply.readAll(), "utf-8"))
        self.__setWaitDuration(result.get("minimumWaitDuration"))
        return result
    
    def __setWaitDuration(self, minimumWaitDuration):
        """
        Private method to set the minimum wait duration.
        
        @param minimumWaitDuration duration to be set
        @type str
        """
        if not self.__fairUse or minimumWaitDuration is None:
            self.__nextRequestNoSoonerThan = QDateTime()
        else:
            waitDuration = int(minimumWaitDuration.rstrip("s"))
            self.__nextRequestNoSoonerThan = \
                QDateTime.currentDateTime().addSecs(waitDuration)
    
    def fairUseDelayExpired(self):
        """
        Public method to check, if the fair use wait period has expired.
        
        @return flag indicating expiration
        @rtype bool
        """
        return (
            self.__fairUse and
            QDateTime.currentDateTime() >= self.__nextRequestNoSoonerThan
        ) or not self.__fairUse

eric ide

mercurial