WebBrowser/VirusTotal/VirusTotalApi.py

branch
QtWebEngine
changeset 4753
8d2ea02ed785
parent 4631
5c1a96925da4
child 4768
57da9217196b
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/WebBrowser/VirusTotal/VirusTotalApi.py	Sat Feb 20 14:34:32 2016 +0100
@@ -0,0 +1,416 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2011 - 2016 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the <a href="http://www.virustotal.com">VirusTotal</a>
+API class.
+"""
+
+from __future__ import unicode_literals
+try:
+    str = unicode       # __IGNORE_EXCEPTION__
+except NameError:
+    pass
+
+import json
+
+from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal, qVersion
+from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
+
+from E5Gui import E5MessageBox
+
+import Preferences
+
+
+class VirusTotalAPI(QObject):
+    """
+    Class implementing the <a href="http://www.virustotal.com">VirusTotal</a>
+    API.
+    
+    @signal checkServiceKeyFinished(bool, str) emitted after the service key
+        check has been performed. It gives a flag indicating validity
+        (boolean) and an error message in case of a network error (string).
+    @signal submitUrlError(str) emitted with the error string, if the URL scan
+        submission returned an error.
+    @signal urlScanReport(str) emitted with the URL of the URL scan report page
+    @signal fileScanReport(str) emitted with the URL of the file scan report
+        page
+    """
+    checkServiceKeyFinished = pyqtSignal(bool, str)
+    submitUrlError = pyqtSignal(str)
+    urlScanReport = pyqtSignal(str)
+    fileScanReport = pyqtSignal(str)
+    
+    TestServiceKeyScanID = \
+        "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
+    
+    ServiceResult_ItemQueued = -2
+    ServiceResult_ItemNotPresent = 0
+    ServiceResult_ItemPresent = 1
+    
+    # HTTP Status Codes
+    ServiceCode_InvalidKey = 202
+    ServiceCode_RateLimitExceeded = 204
+    ServiceCode_InvalidPrivilege = 403
+    
+    GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
+    ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
+    GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
+    GetIpAddressReportPattern = \
+        "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
+    GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
+    
+    def __init__(self, parent=None):
+        """
+        Constructor
+        
+        @param parent reference to the parent object (QObject)
+        """
+        super(VirusTotalAPI, self).__init__(parent)
+        
+        self.__replies = []
+        
+        self.__loadSettings()
+        
+        self.__lastIP = ""
+        self.__lastDomain = ""
+        self.__ipReportDlg = None
+        self.__domainReportDlg = None
+    
+    def __loadSettings(self):
+        """
+        Private method to load the settings.
+        """
+        if Preferences.getHelp("VirusTotalSecure"):
+            protocol = "https"
+        else:
+            protocol = "http"
+        self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
+        self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
+        self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
+        self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
+            protocol)
+        self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
+        
+        self.errorMessages = {
+            204: self.tr("Request limit has been reached."),
+            0: self.tr("Requested item is not present."),
+            -2: self.tr("Requested item is still queued."),
+        }
+    
+    def preferencesChanged(self):
+        """
+        Public slot to handle a change of preferences.
+        """
+        self.__loadSettings()
+    
+    def checkServiceKeyValidity(self, key, protocol=""):
+        """
+        Public method to check the validity of the given service key.
+        
+        @param key service key (string)
+        @param protocol protocol used to access VirusTotal (string)
+        """
+        if protocol == "":
+            urlStr = self.GetFileReportUrl
+        else:
+            urlStr = self.GetFileReportPattern.format(protocol)
+        request = QNetworkRequest(QUrl(urlStr))
+        request.setHeader(QNetworkRequest.ContentTypeHeader,
+                          "application/x-www-form-urlencoded")
+        params = QByteArray("apikey={0}&resource={1}".format(
+            key, self.TestServiceKeyScanID).encode("utf-8"))
+        
+        import WebBrowser.WebBrowserWindow
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
+            .networkManager()
+        reply = nam.post(request, params)
+        reply.finished.connect(self.__checkServiceKeyValidityFinished)
+        self.__replies.append(reply)
+    
+    def __checkServiceKeyValidityFinished(self):
+        """
+        Private slot to determine the result of the service key validity check.
+        """
+        res = False
+        msg = ""
+        
+        reply = self.sender()
+        if reply.error() == QNetworkReply.NoError:
+            res = True
+        elif reply.error() == self.ServiceCode_InvalidKey:
+            res = False
+        else:
+            msg = reply.errorString()
+        self.__replies.remove(reply)
+        reply.deleteLater()
+        
+        self.checkServiceKeyFinished.emit(res, msg)
+    
+    def submitUrl(self, url):
+        """
+        Public method to submit an URL to be scanned.
+        
+        @param url url to be scanned (QUrl)
+        """
+        request = QNetworkRequest(QUrl(self.ScanUrlUrl))
+        request.setHeader(QNetworkRequest.ContentTypeHeader,
+                          "application/x-www-form-urlencoded")
+        params = QByteArray("apikey={0}&url=".format(
+            Preferences.getHelp("VirusTotalServiceKey")).encode("utf-8"))\
+            .append(QUrl.toPercentEncoding(url.toString()))
+        
+        import WebBrowser.WebBrowserWindow
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
+            .networkManager()
+        reply = nam.post(request, params)
+        reply.finished.connect(self.__submitUrlFinished)
+        self.__replies.append(reply)
+    
+    def __submitUrlFinished(self):
+        """
+        Private slot to determine the result of the URL scan submission.
+        """
+        reply = self.sender()
+        if reply.error() == QNetworkReply.NoError:
+            result = json.loads(str(reply.readAll(), "utf-8"))
+            if result["response_code"] == self.ServiceResult_ItemPresent:
+                self.urlScanReport.emit(result["permalink"])
+                self.__getUrlScanReportUrl(result["scan_id"])
+            else:
+                if result["response_code"] in self.errorMessages:
+                    msg = self.errorMessages[result["response_code"]]
+                else:
+                    msg = result["verbose_msg"]
+                self.submitUrlError.emit(msg)
+        elif reply.error() == self.ServiceCode_RateLimitExceeded:
+            self.submitUrlError.emit(
+                self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
+        else:
+            self.submitUrlError.emit(reply.errorString())
+        self.__replies.remove(reply)
+        reply.deleteLater()
+    
+    def __getUrlScanReportUrl(self, scanId):
+        """
+        Private method to get the report URL for a URL scan.
+        
+        @param scanId ID of the scan to get the report URL for (string)
+        """
+        request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
+        request.setHeader(QNetworkRequest.ContentTypeHeader,
+                          "application/x-www-form-urlencoded")
+        params = QByteArray("apikey={0}&resource={1}".format(
+            Preferences.getHelp("VirusTotalServiceKey"), scanId)
+            .encode("utf-8"))
+        
+        import WebBrowser.WebBrowserWindow
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
+            .networkManager()
+        reply = nam.post(request, params)
+        reply.finished.connect(self.__getUrlScanReportUrlFinished)
+        self.__replies.append(reply)
+    
+    def __getUrlScanReportUrlFinished(self):
+        """
+        Private slot to determine the result of the URL scan report URL
+        request.
+        """
+        reply = self.sender()
+        if reply.error() == QNetworkReply.NoError:
+            result = json.loads(str(reply.readAll(), "utf-8"))
+            if "filescan_id" in result and result["filescan_id"] is not None:
+                self.__getFileScanReportUrl(result["filescan_id"])
+        self.__replies.remove(reply)
+        reply.deleteLater()
+    
+    def __getFileScanReportUrl(self, scanId):
+        """
+        Private method to get the report URL for a file scan.
+        
+        @param scanId ID of the scan to get the report URL for (string)
+        """
+        request = QNetworkRequest(QUrl(self.GetFileReportUrl))
+        request.setHeader(QNetworkRequest.ContentTypeHeader,
+                          "application/x-www-form-urlencoded")
+        params = QByteArray("apikey={0}&resource={1}".format(
+            Preferences.getHelp("VirusTotalServiceKey"), scanId)
+            .encode("utf-8"))
+        
+        import WebBrowser.WebBrowserWindow
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
+            .networkManager()
+        reply = nam.post(request, params)
+        reply.finished.connect(self.__getFileScanReportUrlFinished)
+        self.__replies.append(reply)
+    
+    def __getFileScanReportUrlFinished(self):
+        """
+        Private slot to determine the result of the file scan report URL
+        request.
+        """
+        reply = self.sender()
+        if reply.error() == QNetworkReply.NoError:
+            result = json.loads(str(reply.readAll(), "utf-8"))
+            self.fileScanReport.emit(result["permalink"])
+        self.__replies.remove(reply)
+        reply.deleteLater()
+    
+    def getIpAddressReport(self, ipAddress):
+        """
+        Public method to retrieve a report for an IP address.
+        
+        @param ipAddress valid IPv4 address in dotted quad notation
+        @type str
+        """
+        self.__lastIP = ipAddress
+        
+        queryItems = [
+            ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
+            ("ip", ipAddress),
+        ]
+        url = QUrl(self.GetIpAddressReportUrl)
+        if qVersion() >= "5.0.0":
+            from PyQt5.QtCore import QUrlQuery
+            query = QUrlQuery()
+            query.setQueryItems(queryItems)
+            url.setQuery(query)
+        else:
+            url.setQueryItems(queryItems)
+        request = QNetworkRequest(url)
+        
+        import WebBrowser.WebBrowserWindow
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
+            .networkManager()
+        reply = nam.get(request)
+        reply.finished.connect(self.__getIpAddressReportFinished)
+        self.__replies.append(reply)
+    
+    def __getIpAddressReportFinished(self):
+        """
+        Private slot to process the IP address report data.
+        """
+        reply = self.sender()
+        if reply.error() == QNetworkReply.NoError:
+            result = json.loads(str(reply.readAll(), "utf-8"))
+            if result["response_code"] == 0:
+                E5MessageBox.information(
+                    None,
+                    self.tr("VirusTotal IP Address Report"),
+                    self.tr("""VirusTotal does not have any information for"""
+                            """ the given IP address."""))
+            elif result["response_code"] == -1:
+                E5MessageBox.information(
+                    None,
+                    self.tr("VirusTotal IP Address Report"),
+                    self.tr("""The submitted IP address is invalid."""))
+            else:
+                owner = result["as_owner"]
+                resolutions = result["resolutions"]
+                try:
+                    urls = result["detected_urls"]
+                except KeyError:
+                    urls = []
+                
+                from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
+                self.__ipReportDlg = VirusTotalIpReportDialog(
+                    self.__lastIP, owner, resolutions, urls)
+                self.__ipReportDlg.show()
+        self.__replies.remove(reply)
+        reply.deleteLater()
+    
+    def getDomainReport(self, domain):
+        """
+        Public method to retrieve a report for a domain.
+        
+        @param domain domain name
+        @type str
+        """
+        self.__lastDomain = domain
+        
+        queryItems = [
+            ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
+            ("domain", domain),
+        ]
+        url = QUrl(self.GetDomainReportUrl)
+        if qVersion() >= "5.0.0":
+            from PyQt5.QtCore import QUrlQuery
+            query = QUrlQuery()
+            query.setQueryItems(queryItems)
+            url.setQuery(query)
+        else:
+            url.setQueryItems(queryItems)
+        request = QNetworkRequest(url)
+        
+        import WebBrowser.WebBrowserWindow
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
+            .networkManager()
+        reply = nam.get(request)
+        reply.finished.connect(self.__getDomainReportFinished)
+        self.__replies.append(reply)
+    
+    def __getDomainReportFinished(self):
+        """
+        Private slot to process the IP address report data.
+        """
+        reply = self.sender()
+        if reply.error() == QNetworkReply.NoError:
+            result = json.loads(str(reply.readAll(), "utf-8"))
+            if result["response_code"] == 0:
+                E5MessageBox.information(
+                    None,
+                    self.tr("VirusTotal Domain Report"),
+                    self.tr("""VirusTotal does not have any information for"""
+                            """ the given domain."""))
+            elif result["response_code"] == -1:
+                E5MessageBox.information(
+                    None,
+                    self.tr("VirusTotal Domain Report"),
+                    self.tr("""The submitted domain address is invalid."""))
+            else:
+                resolutions = result["resolutions"]
+                try:
+                    urls = result["detected_urls"]
+                except KeyError:
+                    urls = []
+                try:
+                    subdomains = result["subdomains"]
+                except KeyError:
+                    subdomains = []
+                try:
+                    bdCategory = result["BitDefender category"]
+                except KeyError:
+                    bdCategory = self.tr("not available")
+                try:
+                    tmCategory = result["TrendMicro category"]
+                except KeyError:
+                    tmCategory = self.tr("not available")
+                try:
+                    wtsCategory = result["Websense ThreatSeeker category"]
+                except KeyError:
+                    wtsCategory = self.tr("not available")
+                try:
+                    whois = result["whois"]
+                except KeyError:
+                    whois = ""
+                
+                from .VirusTotalDomainReportDialog import \
+                    VirusTotalDomainReportDialog
+                self.__domainReportDlg = VirusTotalDomainReportDialog(
+                    self.__lastDomain, resolutions, urls, subdomains,
+                    bdCategory, tmCategory, wtsCategory, whois)
+                self.__domainReportDlg.show()
+        self.__replies.remove(reply)
+        reply.deleteLater()
+    
+    def close(self):
+        """
+        Public slot to close the API.
+        """
+        for reply in self.__replies:
+            reply.abort()
+        
+        self.__ipReportDlg and self.__ipReportDlg.close()
+        self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial