--- a/src/eric7/WebBrowser/VirusTotal/VirusTotalApi.py Wed Jul 13 11:16:20 2022 +0200 +++ b/src/eric7/WebBrowser/VirusTotal/VirusTotalApi.py Wed Jul 13 14:55:47 2022 +0200 @@ -23,7 +23,7 @@ """ Class implementing the <a href="http://www.virustotal.com">VirusTotal</a> API. - + @signal checkServiceKeyFinished(bool, str) emitted after the service key check has been performed. It gives a flag indicating validity (boolean) and an error message in case of a network error (string). @@ -33,114 +33,111 @@ @signal fileScanReport(str) emitted with the URL of the file scan report page """ + checkServiceKeyFinished = pyqtSignal(bool, str) submitUrlError = pyqtSignal(str) urlScanReport = pyqtSignal(str) fileScanReport = pyqtSignal(str) - + TestServiceKeyScanID = ( "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad" ) - + ServiceResult_ItemQueued = -2 ServiceResult_ItemNotPresent = 0 ServiceResult_ItemPresent = 1 - + # HTTP Status Codes ServiceCode_InvalidKey = 202 ServiceCode_RateLimitExceeded = 204 ServiceCode_InvalidPrivilege = 403 - + GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report" ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan" GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report" - GetIpAddressReportPattern = ( - "{0}://www.virustotal.com/vtapi/v2/ip-address/report" - ) + GetIpAddressReportPattern = "{0}://www.virustotal.com/vtapi/v2/ip-address/report" GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report" - + def __init__(self, parent=None): """ Constructor - + @param parent reference to the parent object (QObject) """ super().__init__(parent) - + self.__replies = [] - + self.__loadSettings() - + self.__lastIP = "" self.__lastDomain = "" self.__ipReportDlg = None self.__domainReportDlg = None - + def __loadSettings(self): """ Private method to load the settings. """ - protocol = ( - "https" - if Preferences.getWebBrowser("VirusTotalSecure") else - "http" - ) + protocol = "https" if Preferences.getWebBrowser("VirusTotalSecure") else "http" self.GetFileReportUrl = self.GetFileReportPattern.format(protocol) self.ScanUrlUrl = self.ScanUrlPattern.format(protocol) self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol) - self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format( - protocol) + self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(protocol) self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol) - + self.errorMessages = { 204: self.tr("Request limit has been reached."), 0: self.tr("Requested item is not present."), -2: self.tr("Requested item is still queued."), } - + def preferencesChanged(self): """ Public slot to handle a change of preferences. """ self.__loadSettings() - + def checkServiceKeyValidity(self, key, protocol=""): """ Public method to check the validity of the given service key. - + @param key service key (string) @param protocol protocol used to access VirusTotal (string) """ urlStr = ( self.GetFileReportUrl - if protocol == "" else - self.GetFileReportPattern.format(protocol) + if protocol == "" + else self.GetFileReportPattern.format(protocol) ) request = QNetworkRequest(QUrl(urlStr)) - request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, - "application/x-www-form-urlencoded") - params = QByteArray("apikey={0}&resource={1}".format( - key, self.TestServiceKeyScanID).encode("utf-8")) - + request.setHeader( + QNetworkRequest.KnownHeaders.ContentTypeHeader, + "application/x-www-form-urlencoded", + ) + params = QByteArray( + "apikey={0}&resource={1}".format(key, self.TestServiceKeyScanID).encode( + "utf-8" + ) + ) + import WebBrowser.WebBrowserWindow - nam = ( - WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() - ) + + nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() reply = nam.post(request, params) - reply.finished.connect( - lambda: self.__checkServiceKeyValidityFinished(reply)) + reply.finished.connect(lambda: self.__checkServiceKeyValidityFinished(reply)) self.__replies.append(reply) - + def __checkServiceKeyValidityFinished(self, reply): """ Private slot to determine the result of the service key validity check. - + @param reply reference to the network reply @type QNetworkReply """ res = False msg = "" - + if reply.error() == QNetworkReply.NetworkError.NoError: res = True elif reply.error() == self.ServiceCode_InvalidKey: @@ -149,34 +146,37 @@ msg = reply.errorString() self.__replies.remove(reply) reply.deleteLater() - + self.checkServiceKeyFinished.emit(res, msg) - + def submitUrl(self, url): """ Public method to submit an URL to be scanned. - + @param url url to be scanned (QUrl) """ request = QNetworkRequest(QUrl(self.ScanUrlUrl)) - request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, - "application/x-www-form-urlencoded") - params = QByteArray("apikey={0}&url=".format( - Preferences.getWebBrowser("VirusTotalServiceKey")) - .encode("utf-8")).append(QUrl.toPercentEncoding(url.toString())) - + request.setHeader( + QNetworkRequest.KnownHeaders.ContentTypeHeader, + "application/x-www-form-urlencoded", + ) + params = QByteArray( + "apikey={0}&url=".format( + Preferences.getWebBrowser("VirusTotalServiceKey") + ).encode("utf-8") + ).append(QUrl.toPercentEncoding(url.toString())) + import WebBrowser.WebBrowserWindow - nam = ( - WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() - ) + + nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() reply = nam.post(request, params) reply.finished.connect(lambda: self.__submitUrlFinished(reply)) self.__replies.append(reply) - + def __submitUrlFinished(self, reply): """ Private slot to determine the result of the URL scan submission. - + @param reply reference to the network reply @type QNetworkReply """ @@ -193,38 +193,41 @@ self.submitUrlError.emit(msg) elif reply.error() == self.ServiceCode_RateLimitExceeded: self.submitUrlError.emit( - self.errorMessages[result[self.ServiceCode_RateLimitExceeded]]) + self.errorMessages[result[self.ServiceCode_RateLimitExceeded]] + ) else: self.submitUrlError.emit(reply.errorString()) self.__replies.remove(reply) reply.deleteLater() - + def __getUrlScanReportUrl(self, scanId): """ Private method to get the report URL for a URL scan. - + @param scanId ID of the scan to get the report URL for (string) """ request = QNetworkRequest(QUrl(self.GetUrlReportUrl)) - request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, - "application/x-www-form-urlencoded") - params = QByteArray("apikey={0}&resource={1}".format( - Preferences.getWebBrowser("VirusTotalServiceKey"), scanId) - .encode("utf-8")) - + request.setHeader( + QNetworkRequest.KnownHeaders.ContentTypeHeader, + "application/x-www-form-urlencoded", + ) + params = QByteArray( + "apikey={0}&resource={1}".format( + Preferences.getWebBrowser("VirusTotalServiceKey"), scanId + ).encode("utf-8") + ) + import WebBrowser.WebBrowserWindow - nam = ( - WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() - ) + + nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() reply = nam.post(request, params) - reply.finished.connect( - lambda: self.__getUrlScanReportUrlFinished(reply)) + reply.finished.connect(lambda: self.__getUrlScanReportUrlFinished(reply)) self.__replies.append(reply) - + def __getUrlScanReportUrlFinished(self, reply): """ Private slot to determine the result of the URL scan report URL. - + @param reply reference to the network reply @type QNetworkReply request. @@ -235,34 +238,36 @@ self.__getFileScanReportUrl(result["filescan_id"]) self.__replies.remove(reply) reply.deleteLater() - + def __getFileScanReportUrl(self, scanId): """ Private method to get the report URL for a file scan. - + @param scanId ID of the scan to get the report URL for (string) """ request = QNetworkRequest(QUrl(self.GetFileReportUrl)) - request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, - "application/x-www-form-urlencoded") - params = QByteArray("apikey={0}&resource={1}".format( - Preferences.getWebBrowser("VirusTotalServiceKey"), scanId) - .encode("utf-8")) - + request.setHeader( + QNetworkRequest.KnownHeaders.ContentTypeHeader, + "application/x-www-form-urlencoded", + ) + params = QByteArray( + "apikey={0}&resource={1}".format( + Preferences.getWebBrowser("VirusTotalServiceKey"), scanId + ).encode("utf-8") + ) + import WebBrowser.WebBrowserWindow - nam = ( - WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() - ) + + nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() reply = nam.post(request, params) - reply.finished.connect( - lambda: self.__getFileScanReportUrlFinished(reply)) + reply.finished.connect(lambda: self.__getFileScanReportUrlFinished(reply)) self.__replies.append(reply) - + def __getFileScanReportUrlFinished(self, reply): """ Private slot to determine the result of the file scan report URL request. - + @param reply reference to the network reply @type QNetworkReply """ @@ -271,16 +276,16 @@ self.fileScanReport.emit(result["permalink"]) self.__replies.remove(reply) reply.deleteLater() - + def getIpAddressReport(self, ipAddress): """ Public method to retrieve a report for an IP address. - + @param ipAddress valid IPv4 address in dotted quad notation @type str """ self.__lastIP = ipAddress - + queryItems = [ ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")), ("ip", ipAddress), @@ -290,20 +295,18 @@ query.setQueryItems(queryItems) url.setQuery(query) request = QNetworkRequest(url) - + import WebBrowser.WebBrowserWindow - nam = ( - WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() - ) + + nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() reply = nam.get(request) - reply.finished.connect( - lambda: self.__getIpAddressReportFinished(reply)) + reply.finished.connect(lambda: self.__getIpAddressReportFinished(reply)) self.__replies.append(reply) - + def __getIpAddressReportFinished(self, reply): """ Private slot to process the IP address report data. - + @param reply reference to the network reply @type QNetworkReply """ @@ -313,13 +316,17 @@ EricMessageBox.information( None, self.tr("VirusTotal IP Address Report"), - self.tr("""VirusTotal does not have any information for""" - """ the given IP address.""")) + self.tr( + """VirusTotal does not have any information for""" + """ the given IP address.""" + ), + ) elif result["response_code"] == -1: EricMessageBox.information( None, self.tr("VirusTotal IP Address Report"), - self.tr("""The submitted IP address is invalid.""")) + self.tr("""The submitted IP address is invalid."""), + ) else: owner = result["as_owner"] resolutions = result["resolutions"] @@ -327,23 +334,25 @@ urls = result["detected_urls"] except KeyError: urls = [] - + from .VirusTotalIpReportDialog import VirusTotalIpReportDialog + self.__ipReportDlg = VirusTotalIpReportDialog( - self.__lastIP, owner, resolutions, urls) + self.__lastIP, owner, resolutions, urls + ) self.__ipReportDlg.show() self.__replies.remove(reply) reply.deleteLater() - + def getDomainReport(self, domain): """ Public method to retrieve a report for a domain. - + @param domain domain name @type str """ self.__lastDomain = domain - + queryItems = [ ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")), ("domain", domain), @@ -353,19 +362,18 @@ query.setQueryItems(queryItems) url.setQuery(query) request = QNetworkRequest(url) - + import WebBrowser.WebBrowserWindow - nam = ( - WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() - ) + + nam = WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() reply = nam.get(request) reply.finished.connect(lambda: self.__getDomainReportFinished(reply)) self.__replies.append(reply) - + def __getDomainReportFinished(self, reply): """ Private slot to process the IP address report data. - + @param reply reference to the network reply @type QNetworkReply """ @@ -375,13 +383,17 @@ EricMessageBox.information( None, self.tr("VirusTotal Domain Report"), - self.tr("""VirusTotal does not have any information for""" - """ the given domain.""")) + self.tr( + """VirusTotal does not have any information for""" + """ the given domain.""" + ), + ) elif result["response_code"] == -1: EricMessageBox.information( None, self.tr("VirusTotal Domain Report"), - self.tr("""The submitted domain address is invalid.""")) + self.tr("""The submitted domain address is invalid."""), + ) else: resolutions = result["resolutions"] try: @@ -392,7 +404,7 @@ subdomains = result["subdomains"] except KeyError: subdomains = [] - + categoriesMapping = { "bitdefender": ("BitDefender category",), "sophos": ("sophos category", "Sophos category"), @@ -412,12 +424,8 @@ whois = result["whois"] except KeyError: whois = "" - - webutationData = { - "adult": "--", - "safety": "--", - "verdict": "--" - } + + webutationData = {"adult": "--", "safety": "--", "verdict": "--"} with contextlib.suppress(KeyError): webutation = result["Webutation domain info"] with contextlib.suppress(KeyError): @@ -426,23 +434,28 @@ webutationData["safety"] = webutation["Safety score"] with contextlib.suppress(KeyError): webutationData["verdict"] = webutation["Verdict"] - - from .VirusTotalDomainReportDialog import ( - VirusTotalDomainReportDialog + + from .VirusTotalDomainReportDialog import VirusTotalDomainReportDialog + + self.__domainReportDlg = VirusTotalDomainReportDialog( + self.__lastDomain, + resolutions, + urls, + subdomains, + categories, + webutationData, + whois, ) - self.__domainReportDlg = VirusTotalDomainReportDialog( - self.__lastDomain, resolutions, urls, subdomains, - categories, webutationData, whois) self.__domainReportDlg.show() self.__replies.remove(reply) reply.deleteLater() - + def close(self): """ Public slot to close the API. """ for reply in self.__replies: reply.abort() - + self.__ipReportDlg and self.__ipReportDlg.close() self.__domainReportDlg and self.__domainReportDlg.close()