src/eric7/WebBrowser/VirusTotal/VirusTotalApi.py

branch
eric7
changeset 9221
bf71ee032bb4
parent 9209
b99e7fd55fd3
child 9278
36448ca469c2
--- a/src/eric7/WebBrowser/VirusTotal/VirusTotalApi.py	Wed Jul 13 11:16:20 2022 +0200
+++ b/src/eric7/WebBrowser/VirusTotal/VirusTotalApi.py	Wed Jul 13 14:55:47 2022 +0200
@@ -23,7 +23,7 @@
     """
     Class implementing the <a href="http://www.virustotal.com">VirusTotal</a>
     API.
-    
+
     @signal checkServiceKeyFinished(bool, str) emitted after the service key
         check has been performed. It gives a flag indicating validity
         (boolean) and an error message in case of a network error (string).
@@ -33,114 +33,111 @@
     @signal fileScanReport(str) emitted with the URL of the file scan report
         page
     """
+
     checkServiceKeyFinished = pyqtSignal(bool, str)
     submitUrlError = pyqtSignal(str)
     urlScanReport = pyqtSignal(str)
     fileScanReport = pyqtSignal(str)
-    
+
     TestServiceKeyScanID = (
         "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
     )
-    
+
     ServiceResult_ItemQueued = -2
     ServiceResult_ItemNotPresent = 0
     ServiceResult_ItemPresent = 1
-    
+
     # HTTP Status Codes
     ServiceCode_InvalidKey = 202
     ServiceCode_RateLimitExceeded = 204
     ServiceCode_InvalidPrivilege = 403
-    
+
     GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
     ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
     GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
-    GetIpAddressReportPattern = (
-        "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
-    )
+    GetIpAddressReportPattern = "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
     GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
-    
+
     def __init__(self, parent=None):
         """
         Constructor
-        
+
         @param parent reference to the parent object (QObject)
         """
         super().__init__(parent)
-        
+
         self.__replies = []
-        
+
         self.__loadSettings()
-        
+
         self.__lastIP = ""
         self.__lastDomain = ""
         self.__ipReportDlg = None
         self.__domainReportDlg = None
-    
+
     def __loadSettings(self):
         """
         Private method to load the settings.
         """
-        protocol = (
-            "https"
-            if Preferences.getWebBrowser("VirusTotalSecure") else
-            "http"
-        )
+        protocol = "https" if Preferences.getWebBrowser("VirusTotalSecure") else "http"
         self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
         self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
         self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
-        self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
-            protocol)
+        self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(protocol)
         self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
-        
+
         self.errorMessages = {
             204: self.tr("Request limit has been reached."),
             0: self.tr("Requested item is not present."),
             -2: self.tr("Requested item is still queued."),
         }
-    
+
     def preferencesChanged(self):
         """
         Public slot to handle a change of preferences.
         """
         self.__loadSettings()
-    
+
     def checkServiceKeyValidity(self, key, protocol=""):
         """
         Public method to check the validity of the given service key.
-        
+
         @param key service key (string)
         @param protocol protocol used to access VirusTotal (string)
         """
         urlStr = (
             self.GetFileReportUrl
-            if protocol == "" else
-            self.GetFileReportPattern.format(protocol)
+            if protocol == ""
+            else self.GetFileReportPattern.format(protocol)
         )
         request = QNetworkRequest(QUrl(urlStr))
-        request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
-                          "application/x-www-form-urlencoded")
-        params = QByteArray("apikey={0}&resource={1}".format(
-            key, self.TestServiceKeyScanID).encode("utf-8"))
-        
+        request.setHeader(
+            QNetworkRequest.KnownHeaders.ContentTypeHeader,
+            "application/x-www-form-urlencoded",
+        )
+        params = QByteArray(
+            "apikey={0}&resource={1}".format(key, self.TestServiceKeyScanID).encode(
+                "utf-8"
+            )
+        )
+
         import WebBrowser.WebBrowserWindow
-        nam = (
-            WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
-        )
+
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
         reply = nam.post(request, params)
-        reply.finished.connect(
-            lambda: self.__checkServiceKeyValidityFinished(reply))
+        reply.finished.connect(lambda: self.__checkServiceKeyValidityFinished(reply))
         self.__replies.append(reply)
-    
+
     def __checkServiceKeyValidityFinished(self, reply):
         """
         Private slot to determine the result of the service key validity check.
-        
+
         @param reply reference to the network reply
         @type QNetworkReply
         """
         res = False
         msg = ""
-        
+
         if reply.error() == QNetworkReply.NetworkError.NoError:
             res = True
         elif reply.error() == self.ServiceCode_InvalidKey:
@@ -149,34 +146,37 @@
             msg = reply.errorString()
         self.__replies.remove(reply)
         reply.deleteLater()
-        
+
         self.checkServiceKeyFinished.emit(res, msg)
-    
+
     def submitUrl(self, url):
         """
         Public method to submit an URL to be scanned.
-        
+
         @param url url to be scanned (QUrl)
         """
         request = QNetworkRequest(QUrl(self.ScanUrlUrl))
-        request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
-                          "application/x-www-form-urlencoded")
-        params = QByteArray("apikey={0}&url=".format(
-            Preferences.getWebBrowser("VirusTotalServiceKey"))
-            .encode("utf-8")).append(QUrl.toPercentEncoding(url.toString()))
-        
+        request.setHeader(
+            QNetworkRequest.KnownHeaders.ContentTypeHeader,
+            "application/x-www-form-urlencoded",
+        )
+        params = QByteArray(
+            "apikey={0}&url=".format(
+                Preferences.getWebBrowser("VirusTotalServiceKey")
+            ).encode("utf-8")
+        ).append(QUrl.toPercentEncoding(url.toString()))
+
         import WebBrowser.WebBrowserWindow
-        nam = (
-            WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
-        )
+
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
         reply = nam.post(request, params)
         reply.finished.connect(lambda: self.__submitUrlFinished(reply))
         self.__replies.append(reply)
-    
+
     def __submitUrlFinished(self, reply):
         """
         Private slot to determine the result of the URL scan submission.
-        
+
         @param reply reference to the network reply
         @type QNetworkReply
         """
@@ -193,38 +193,41 @@
                 self.submitUrlError.emit(msg)
         elif reply.error() == self.ServiceCode_RateLimitExceeded:
             self.submitUrlError.emit(
-                self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
+                self.errorMessages[result[self.ServiceCode_RateLimitExceeded]]
+            )
         else:
             self.submitUrlError.emit(reply.errorString())
         self.__replies.remove(reply)
         reply.deleteLater()
-    
+
     def __getUrlScanReportUrl(self, scanId):
         """
         Private method to get the report URL for a URL scan.
-        
+
         @param scanId ID of the scan to get the report URL for (string)
         """
         request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
-        request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
-                          "application/x-www-form-urlencoded")
-        params = QByteArray("apikey={0}&resource={1}".format(
-            Preferences.getWebBrowser("VirusTotalServiceKey"), scanId)
-            .encode("utf-8"))
-        
+        request.setHeader(
+            QNetworkRequest.KnownHeaders.ContentTypeHeader,
+            "application/x-www-form-urlencoded",
+        )
+        params = QByteArray(
+            "apikey={0}&resource={1}".format(
+                Preferences.getWebBrowser("VirusTotalServiceKey"), scanId
+            ).encode("utf-8")
+        )
+
         import WebBrowser.WebBrowserWindow
-        nam = (
-            WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
-        )
+
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
         reply = nam.post(request, params)
-        reply.finished.connect(
-            lambda: self.__getUrlScanReportUrlFinished(reply))
+        reply.finished.connect(lambda: self.__getUrlScanReportUrlFinished(reply))
         self.__replies.append(reply)
-    
+
     def __getUrlScanReportUrlFinished(self, reply):
         """
         Private slot to determine the result of the URL scan report URL.
-        
+
         @param reply reference to the network reply
         @type QNetworkReply
         request.
@@ -235,34 +238,36 @@
                 self.__getFileScanReportUrl(result["filescan_id"])
         self.__replies.remove(reply)
         reply.deleteLater()
-    
+
     def __getFileScanReportUrl(self, scanId):
         """
         Private method to get the report URL for a file scan.
-        
+
         @param scanId ID of the scan to get the report URL for (string)
         """
         request = QNetworkRequest(QUrl(self.GetFileReportUrl))
-        request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
-                          "application/x-www-form-urlencoded")
-        params = QByteArray("apikey={0}&resource={1}".format(
-            Preferences.getWebBrowser("VirusTotalServiceKey"), scanId)
-            .encode("utf-8"))
-        
+        request.setHeader(
+            QNetworkRequest.KnownHeaders.ContentTypeHeader,
+            "application/x-www-form-urlencoded",
+        )
+        params = QByteArray(
+            "apikey={0}&resource={1}".format(
+                Preferences.getWebBrowser("VirusTotalServiceKey"), scanId
+            ).encode("utf-8")
+        )
+
         import WebBrowser.WebBrowserWindow
-        nam = (
-            WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
-        )
+
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
         reply = nam.post(request, params)
-        reply.finished.connect(
-            lambda: self.__getFileScanReportUrlFinished(reply))
+        reply.finished.connect(lambda: self.__getFileScanReportUrlFinished(reply))
         self.__replies.append(reply)
-    
+
     def __getFileScanReportUrlFinished(self, reply):
         """
         Private slot to determine the result of the file scan report URL
         request.
-        
+
         @param reply reference to the network reply
         @type QNetworkReply
         """
@@ -271,16 +276,16 @@
             self.fileScanReport.emit(result["permalink"])
         self.__replies.remove(reply)
         reply.deleteLater()
-    
+
     def getIpAddressReport(self, ipAddress):
         """
         Public method to retrieve a report for an IP address.
-        
+
         @param ipAddress valid IPv4 address in dotted quad notation
         @type str
         """
         self.__lastIP = ipAddress
-        
+
         queryItems = [
             ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")),
             ("ip", ipAddress),
@@ -290,20 +295,18 @@
         query.setQueryItems(queryItems)
         url.setQuery(query)
         request = QNetworkRequest(url)
-        
+
         import WebBrowser.WebBrowserWindow
-        nam = (
-            WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
-        )
+
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
         reply = nam.get(request)
-        reply.finished.connect(
-            lambda: self.__getIpAddressReportFinished(reply))
+        reply.finished.connect(lambda: self.__getIpAddressReportFinished(reply))
         self.__replies.append(reply)
-    
+
     def __getIpAddressReportFinished(self, reply):
         """
         Private slot to process the IP address report data.
-        
+
         @param reply reference to the network reply
         @type QNetworkReply
         """
@@ -313,13 +316,17 @@
                 EricMessageBox.information(
                     None,
                     self.tr("VirusTotal IP Address Report"),
-                    self.tr("""VirusTotal does not have any information for"""
-                            """ the given IP address."""))
+                    self.tr(
+                        """VirusTotal does not have any information for"""
+                        """ the given IP address."""
+                    ),
+                )
             elif result["response_code"] == -1:
                 EricMessageBox.information(
                     None,
                     self.tr("VirusTotal IP Address Report"),
-                    self.tr("""The submitted IP address is invalid."""))
+                    self.tr("""The submitted IP address is invalid."""),
+                )
             else:
                 owner = result["as_owner"]
                 resolutions = result["resolutions"]
@@ -327,23 +334,25 @@
                     urls = result["detected_urls"]
                 except KeyError:
                     urls = []
-                
+
                 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
+
                 self.__ipReportDlg = VirusTotalIpReportDialog(
-                    self.__lastIP, owner, resolutions, urls)
+                    self.__lastIP, owner, resolutions, urls
+                )
                 self.__ipReportDlg.show()
         self.__replies.remove(reply)
         reply.deleteLater()
-    
+
     def getDomainReport(self, domain):
         """
         Public method to retrieve a report for a domain.
-        
+
         @param domain domain name
         @type str
         """
         self.__lastDomain = domain
-        
+
         queryItems = [
             ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")),
             ("domain", domain),
@@ -353,19 +362,18 @@
         query.setQueryItems(queryItems)
         url.setQuery(query)
         request = QNetworkRequest(url)
-        
+
         import WebBrowser.WebBrowserWindow
-        nam = (
-            WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
-        )
+
+        nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
         reply = nam.get(request)
         reply.finished.connect(lambda: self.__getDomainReportFinished(reply))
         self.__replies.append(reply)
-    
+
     def __getDomainReportFinished(self, reply):
         """
         Private slot to process the IP address report data.
-        
+
         @param reply reference to the network reply
         @type QNetworkReply
         """
@@ -375,13 +383,17 @@
                 EricMessageBox.information(
                     None,
                     self.tr("VirusTotal Domain Report"),
-                    self.tr("""VirusTotal does not have any information for"""
-                            """ the given domain."""))
+                    self.tr(
+                        """VirusTotal does not have any information for"""
+                        """ the given domain."""
+                    ),
+                )
             elif result["response_code"] == -1:
                 EricMessageBox.information(
                     None,
                     self.tr("VirusTotal Domain Report"),
-                    self.tr("""The submitted domain address is invalid."""))
+                    self.tr("""The submitted domain address is invalid."""),
+                )
             else:
                 resolutions = result["resolutions"]
                 try:
@@ -392,7 +404,7 @@
                     subdomains = result["subdomains"]
                 except KeyError:
                     subdomains = []
-                
+
                 categoriesMapping = {
                     "bitdefender": ("BitDefender category",),
                     "sophos": ("sophos category", "Sophos category"),
@@ -412,12 +424,8 @@
                     whois = result["whois"]
                 except KeyError:
                     whois = ""
-                
-                webutationData = {
-                    "adult": "--",
-                    "safety": "--",
-                    "verdict": "--"
-                }
+
+                webutationData = {"adult": "--", "safety": "--", "verdict": "--"}
                 with contextlib.suppress(KeyError):
                     webutation = result["Webutation domain info"]
                     with contextlib.suppress(KeyError):
@@ -426,23 +434,28 @@
                         webutationData["safety"] = webutation["Safety score"]
                     with contextlib.suppress(KeyError):
                         webutationData["verdict"] = webutation["Verdict"]
-                
-                from .VirusTotalDomainReportDialog import (
-                    VirusTotalDomainReportDialog
+
+                from .VirusTotalDomainReportDialog import VirusTotalDomainReportDialog
+
+                self.__domainReportDlg = VirusTotalDomainReportDialog(
+                    self.__lastDomain,
+                    resolutions,
+                    urls,
+                    subdomains,
+                    categories,
+                    webutationData,
+                    whois,
                 )
-                self.__domainReportDlg = VirusTotalDomainReportDialog(
-                    self.__lastDomain, resolutions, urls, subdomains,
-                    categories, webutationData, whois)
                 self.__domainReportDlg.show()
         self.__replies.remove(reply)
         reply.deleteLater()
-    
+
     def close(self):
         """
         Public slot to close the API.
         """
         for reply in self.__replies:
             reply.abort()
-        
+
         self.__ipReportDlg and self.__ipReportDlg.close()
         self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial