--- a/Helpviewer/VirusTotalApi.py Sat Jul 25 18:22:34 2015 +0200 +++ b/Helpviewer/VirusTotalApi.py Sat Jul 25 20:00:25 2015 +0200 @@ -16,9 +16,10 @@ import json -from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal -from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply, \ - QNetworkAccessManager +from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal, qVersion +from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply + +from E5Gui import E5MessageBox import Preferences @@ -45,21 +46,21 @@ TestServiceKeyScanID = \ "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad" - ServiceResult_RequestLimitReached = -2 - ServiceResult_InvalidServiceKey = -1 + ServiceResult_ItemQueued = -2 ServiceResult_ItemNotPresent = 0 ServiceResult_ItemPresent = 1 - GetFileReportPattern = "{0}://www.virustotal.com/api/get_file_report.json" - ScanUrlPattern = "{0}://www.virustotal.com/api/scan_url.json" - GetUrlReportPattern = "{0}://www.virustotal.com/api/get_url_report.json" + # HTTP Status Codes + ServiceCode_InvalidKey = 202 + ServiceCode_RateLimitExceeded = 204 + ServiceCode_InvalidPrivilege = 403 - ReportUrlScanPagePattern = \ - "http://www.virustotal.com/url-scan/report.html?id={0}" - ReportFileScanPagePattern = \ - "http://www.virustotal.com/file-scan/report.html?id={0}" - - SearchUrl = "http://www.virustotal.com/search.html" + GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report" + ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan" + GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report" + GetIpAddressReportPattern = \ + "{0}://www.virustotal.com/vtapi/v2/ip-address/report" + GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report" def __init__(self, parent=None): """ @@ -72,6 +73,11 @@ self.__replies = [] self.__loadSettings() + + self.__lastIP = "" + self.__lastDomain = "" + self.__ipReportDlg = None + self.__domainReportDlg = None def __loadSettings(self): """ @@ -84,11 +90,14 @@ self.GetFileReportUrl = self.GetFileReportPattern.format(protocol) self.ScanUrlUrl = self.ScanUrlPattern.format(protocol) self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol) + self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format( + protocol) + self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol) self.errorMessages = { - -2: self.tr("Request limit has been reached."), - -1: self.tr("Invalid key given."), - 0: self.tr("Requested item is not present.") + 204: self.tr("Request limit has been reached."), + 0: self.tr("Requested item is not present."), + -2: self.tr("Requested item is still queued."), } def preferencesChanged(self): @@ -111,7 +120,7 @@ request = QNetworkRequest(QUrl(urlStr)) request.setHeader(QNetworkRequest.ContentTypeHeader, "application/x-www-form-urlencoded") - params = QByteArray("key={0}&resource={1}".format( + params = QByteArray("apikey={0}&resource={1}".format( key, self.TestServiceKeyScanID).encode("utf-8")) import Helpviewer.HelpWindow @@ -129,12 +138,13 @@ reply = self.sender() if reply.error() == QNetworkReply.NoError: - result = json.loads(str(reply.readAll(), "utf-8")) - if result["result"] != self.ServiceResult_InvalidServiceKey: - res = True + res = True + elif reply.error() == self.ServiceCode_InvalidKey: + res = False else: msg = reply.errorString() self.__replies.remove(reply) + reply.deleteLater() self.checkServiceKeyFinished.emit(res, msg) @@ -147,7 +157,7 @@ request = QNetworkRequest(QUrl(self.ScanUrlUrl)) request.setHeader(QNetworkRequest.ContentTypeHeader, "application/x-www-form-urlencoded") - params = QByteArray("key={0}&url=".format( + params = QByteArray("apikey={0}&url=".format( Preferences.getHelp("VirusTotalServiceKey")).encode("utf-8"))\ .append(QUrl.toPercentEncoding(url.toString())) @@ -164,15 +174,54 @@ reply = self.sender() if reply.error() == QNetworkReply.NoError: result = json.loads(str(reply.readAll(), "utf-8")) - if result["result"] == self.ServiceResult_ItemPresent: - self.urlScanReport.emit( - self.ReportUrlScanPagePattern.format(result["scan_id"])) - self.__getFileScanReportUrl(result["scan_id"]) + if result["response_code"] == self.ServiceResult_ItemPresent: + self.urlScanReport.emit(result["permalink"]) + self.__getUrlScanReportUrl(result["scan_id"]) else: - self.submitUrlError.emit(self.errorMessages[result["result"]]) + if result["response_code"] in self.errorMessages: + msg = self.errorMessages[result["response_code"]] + else: + msg = result["verbose_msg"] + self.submitUrlError.emit(msg) + elif reply.error() == self.ServiceCode_RateLimitExceeded: + self.submitUrlError.emit( + self.errorMessages[result[self.ServiceCode_RateLimitExceeded]]) else: self.submitUrlError.emit(reply.errorString()) self.__replies.remove(reply) + reply.deleteLater() + + def __getUrlScanReportUrl(self, scanId): + """ + Private method to get the report URL for a URL scan. + + @param scanId ID of the scan to get the report URL for (string) + """ + request = QNetworkRequest(QUrl(self.GetUrlReportUrl)) + request.setHeader(QNetworkRequest.ContentTypeHeader, + "application/x-www-form-urlencoded") + params = QByteArray("apikey={0}&resource={1}".format( + Preferences.getHelp("VirusTotalServiceKey"), scanId) + .encode("utf-8")) + + import Helpviewer.HelpWindow + nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() + reply = nam.post(request, params) + reply.finished.connect(self.__getUrlScanReportUrlFinished) + self.__replies.append(reply) + + def __getUrlScanReportUrlFinished(self): + """ + Private slot to determine the result of the URL scan report URL + request. + """ + reply = self.sender() + if reply.error() == QNetworkReply.NoError: + result = json.loads(str(reply.readAll(), "utf-8")) + if "filescan_id" in result and result["filescan_id"] is not None: + self.__getFileScanReportUrl(result["filescan_id"]) + self.__replies.remove(reply) + reply.deleteLater() def __getFileScanReportUrl(self, scanId): """ @@ -180,10 +229,10 @@ @param scanId ID of the scan to get the report URL for (string) """ - request = QNetworkRequest(QUrl(self.GetUrlReportUrl)) + request = QNetworkRequest(QUrl(self.GetFileReportUrl)) request.setHeader(QNetworkRequest.ContentTypeHeader, "application/x-www-form-urlencoded") - params = QByteArray("key={0}&resource={1}".format( + params = QByteArray("apikey={0}&resource={1}".format( Preferences.getHelp("VirusTotalServiceKey"), scanId) .encode("utf-8")) @@ -201,25 +250,161 @@ reply = self.sender() if reply.error() == QNetworkReply.NoError: result = json.loads(str(reply.readAll(), "utf-8")) - if "file-report" in result: - self.fileScanReport.emit( - self.ReportFileScanPagePattern.format( - result["file-report"])) + self.fileScanReport.emit(result["permalink"]) self.__replies.remove(reply) + reply.deleteLater() - @classmethod - def getSearchRequestData(cls, term): + def getIpAddressReport(self, ipAddress): + """ + Public method to retrieve a report for an IP address. + + @param ipAddress valid IPv4 address in dotted quad notation + @type str + """ + self.__lastIP = ipAddress + + queryItems = [ + ("apikey", Preferences.getHelp("VirusTotalServiceKey")), + ("ip", ipAddress), + ] + url = QUrl(self.GetIpAddressReportUrl) + if qVersion() >= "5.0.0": + from PyQt5.QtCore import QUrlQuery + query = QUrlQuery() + query.setQueryItems(queryItems) + url.setQuery(query) + else: + url.setQueryItems(queryItems) + request = QNetworkRequest(url) + + import Helpviewer.HelpWindow + nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() + reply = nam.get(request) + reply.finished.connect(self.__getIpAddressReportFinished) + self.__replies.append(reply) + + def __getIpAddressReportFinished(self): + """ + Private slot to process the IP address report data. """ - Class method to assemble the search request data structure. + reply = self.sender() + if reply.error() == QNetworkReply.NoError: + result = json.loads(str(reply.readAll(), "utf-8")) + if result["response_code"] == 0: + E5MessageBox.information( + None, + self.tr("VirusTotal IP Address Report"), + self.tr("""VirusTotal does not have any information for""" + """ the given IP address.""")) + elif result["response_code"] == -1: + E5MessageBox.information( + None, + self.tr("VirusTotal IP Address Report"), + self.tr("""The submitted IP address is invalid.""")) + else: + owner = result["as_owner"] + resolutions = result["resolutions"] + try: + urls = result["detected_urls"] + except KeyError: + urls = [] + + from .VirusTotalIpReportDialog import VirusTotalIpReportDialog + self.__ipReportDlg = VirusTotalIpReportDialog( + self.__lastIP, owner, resolutions, urls) + self.__ipReportDlg.show() + self.__replies.remove(reply) + reply.deleteLater() + + def getDomainReport(self, domain): + """ + Public method to retrieve a report for a domain. + + @param domain domain name + @type str + """ + self.__lastDomain = domain - @param term search term (string) - @return tuple of network request object, operation and parameters - (QNetworkRequest, QNetworkAccessManager.Operation, QByteArray) + queryItems = [ + ("apikey", Preferences.getHelp("VirusTotalServiceKey")), + ("domain", domain), + ] + url = QUrl(self.GetDomainReportUrl) + if qVersion() >= "5.0.0": + from PyQt5.QtCore import QUrlQuery + query = QUrlQuery() + query.setQueryItems(queryItems) + url.setQuery(query) + else: + url.setQueryItems(queryItems) + request = QNetworkRequest(url) + + import Helpviewer.HelpWindow + nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() + reply = nam.get(request) + reply.finished.connect(self.__getDomainReportFinished) + self.__replies.append(reply) + + def __getDomainReportFinished(self): + """ + Private slot to process the IP address report data. """ - request = QNetworkRequest(QUrl(cls.SearchUrl)) - request.setHeader(QNetworkRequest.ContentTypeHeader, - "application/x-www-form-urlencoded") - op = QNetworkAccessManager.PostOperation - params = QByteArray(b"chain=").append(QUrl.toPercentEncoding(term)) + reply = self.sender() + if reply.error() == QNetworkReply.NoError: + result = json.loads(str(reply.readAll(), "utf-8")) + if result["response_code"] == 0: + E5MessageBox.information( + None, + self.tr("VirusTotal Domain Report"), + self.tr("""VirusTotal does not have any information for""" + """ the given domain.""")) + elif result["response_code"] == -1: + E5MessageBox.information( + None, + self.tr("VirusTotal Domain Report"), + self.tr("""The submitted domain address is invalid.""")) + else: + resolutions = result["resolutions"] + try: + urls = result["detected_urls"] + except KeyError: + urls = [] + try: + subdomains = result["subdomains"] + except KeyError: + subdomains = [] + try: + bdCategory = result["BitDefender category"] + except KeyError: + bdCategory = self.tr("not available") + try: + tmCategory = result["TrendMicro category"] + except KeyError: + tmCategory = self.tr("not available") + try: + wtsCategory = result["Websense ThreatSeeker category"] + except KeyError: + wtsCategory = self.tr("not available") + try: + categories = result["categories"] + except KeyError: + categories = [] + + from .VirusTotalDomainReportDialog import \ + VirusTotalDomainReportDialog + self.__domainReportDlg = VirusTotalDomainReportDialog( + self.__lastDomain, resolutions, urls, subdomains, + bdCategory, tmCategory, wtsCategory, categories) + self.__domainReportDlg.show() + self.__replies.remove(reply) + reply.deleteLater() + + def close(self): + """ + Public slot to close the API. + """ + for reply in self.__replies: + reply.abort() - return (request, op, params) + self.__ipReportDlg and self.__ipReportDlg.close() + self.__domainReportDlg and self.__domainReportDlg.close()