--- a/Helpviewer/VirusTotalApi.py Sat Jul 25 20:00:25 2015 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,410 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (c) 2011 - 2015 Detlev Offenbach <detlev@die-offenbachs.de> -# - -""" -Module implementing the <a href="http://www.virustotal.com">VirusTotal</a> -API class. -""" - -from __future__ import unicode_literals -try: - str = unicode -except NameError: - pass - -import json - -from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal, qVersion -from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply - -from E5Gui import E5MessageBox - -import Preferences - - -class VirusTotalAPI(QObject): - """ - Class implementing the <a href="http://www.virustotal.com">VirusTotal</a> - API. - - @signal checkServiceKeyFinished(bool, str) emitted after the service key - check has been performed. It gives a flag indicating validity - (boolean) and an error message in case of a network error (string). - @signal submitUrlError(str) emitted with the error string, if the URL scan - submission returned an error. - @signal urlScanReport(str) emitted with the URL of the URL scan report page - @signal fileScanReport(str) emitted with the URL of the file scan report - page - """ - checkServiceKeyFinished = pyqtSignal(bool, str) - submitUrlError = pyqtSignal(str) - urlScanReport = pyqtSignal(str) - fileScanReport = pyqtSignal(str) - - TestServiceKeyScanID = \ - "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad" - - ServiceResult_ItemQueued = -2 - ServiceResult_ItemNotPresent = 0 - ServiceResult_ItemPresent = 1 - - # HTTP Status Codes - ServiceCode_InvalidKey = 202 - ServiceCode_RateLimitExceeded = 204 - ServiceCode_InvalidPrivilege = 403 - - GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report" - ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan" - GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report" - GetIpAddressReportPattern = \ - "{0}://www.virustotal.com/vtapi/v2/ip-address/report" - GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report" - - def __init__(self, parent=None): - """ - Constructor - - @param parent reference to the parent object (QObject) - """ - super(VirusTotalAPI, self).__init__(parent) - - self.__replies = [] - - self.__loadSettings() - - self.__lastIP = "" - self.__lastDomain = "" - self.__ipReportDlg = None - self.__domainReportDlg = None - - def __loadSettings(self): - """ - Private method to load the settings. - """ - if Preferences.getHelp("VirusTotalSecure"): - protocol = "https" - else: - protocol = "http" - self.GetFileReportUrl = self.GetFileReportPattern.format(protocol) - self.ScanUrlUrl = self.ScanUrlPattern.format(protocol) - self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol) - self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format( - protocol) - self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol) - - self.errorMessages = { - 204: self.tr("Request limit has been reached."), - 0: self.tr("Requested item is not present."), - -2: self.tr("Requested item is still queued."), - } - - def preferencesChanged(self): - """ - Public slot to handle a change of preferences. - """ - self.__loadSettings() - - def checkServiceKeyValidity(self, key, protocol=""): - """ - Public method to check the validity of the given service key. - - @param key service key (string) - @param protocol protocol used to access VirusTotal (string) - """ - if protocol == "": - urlStr = self.GetFileReportUrl - else: - urlStr = self.GetFileReportPattern.format(protocol) - request = QNetworkRequest(QUrl(urlStr)) - request.setHeader(QNetworkRequest.ContentTypeHeader, - "application/x-www-form-urlencoded") - params = QByteArray("apikey={0}&resource={1}".format( - key, self.TestServiceKeyScanID).encode("utf-8")) - - import Helpviewer.HelpWindow - nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() - reply = nam.post(request, params) - reply.finished.connect(self.__checkServiceKeyValidityFinished) - self.__replies.append(reply) - - def __checkServiceKeyValidityFinished(self): - """ - Private slot to determine the result of the service key validity check. - """ - res = False - msg = "" - - reply = self.sender() - if reply.error() == QNetworkReply.NoError: - res = True - elif reply.error() == self.ServiceCode_InvalidKey: - res = False - else: - msg = reply.errorString() - self.__replies.remove(reply) - reply.deleteLater() - - self.checkServiceKeyFinished.emit(res, msg) - - def submitUrl(self, url): - """ - Public method to submit an URL to be scanned. - - @param url url to be scanned (QUrl) - """ - request = QNetworkRequest(QUrl(self.ScanUrlUrl)) - request.setHeader(QNetworkRequest.ContentTypeHeader, - "application/x-www-form-urlencoded") - params = QByteArray("apikey={0}&url=".format( - Preferences.getHelp("VirusTotalServiceKey")).encode("utf-8"))\ - .append(QUrl.toPercentEncoding(url.toString())) - - import Helpviewer.HelpWindow - nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() - reply = nam.post(request, params) - reply.finished.connect(self.__submitUrlFinished) - self.__replies.append(reply) - - def __submitUrlFinished(self): - """ - Private slot to determine the result of the URL scan submission. - """ - reply = self.sender() - if reply.error() == QNetworkReply.NoError: - result = json.loads(str(reply.readAll(), "utf-8")) - if result["response_code"] == self.ServiceResult_ItemPresent: - self.urlScanReport.emit(result["permalink"]) - self.__getUrlScanReportUrl(result["scan_id"]) - else: - if result["response_code"] in self.errorMessages: - msg = self.errorMessages[result["response_code"]] - else: - msg = result["verbose_msg"] - self.submitUrlError.emit(msg) - elif reply.error() == self.ServiceCode_RateLimitExceeded: - self.submitUrlError.emit( - self.errorMessages[result[self.ServiceCode_RateLimitExceeded]]) - else: - self.submitUrlError.emit(reply.errorString()) - self.__replies.remove(reply) - reply.deleteLater() - - def __getUrlScanReportUrl(self, scanId): - """ - Private method to get the report URL for a URL scan. - - @param scanId ID of the scan to get the report URL for (string) - """ - request = QNetworkRequest(QUrl(self.GetUrlReportUrl)) - request.setHeader(QNetworkRequest.ContentTypeHeader, - "application/x-www-form-urlencoded") - params = QByteArray("apikey={0}&resource={1}".format( - Preferences.getHelp("VirusTotalServiceKey"), scanId) - .encode("utf-8")) - - import Helpviewer.HelpWindow - nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() - reply = nam.post(request, params) - reply.finished.connect(self.__getUrlScanReportUrlFinished) - self.__replies.append(reply) - - def __getUrlScanReportUrlFinished(self): - """ - Private slot to determine the result of the URL scan report URL - request. - """ - reply = self.sender() - if reply.error() == QNetworkReply.NoError: - result = json.loads(str(reply.readAll(), "utf-8")) - if "filescan_id" in result and result["filescan_id"] is not None: - self.__getFileScanReportUrl(result["filescan_id"]) - self.__replies.remove(reply) - reply.deleteLater() - - def __getFileScanReportUrl(self, scanId): - """ - Private method to get the report URL for a file scan. - - @param scanId ID of the scan to get the report URL for (string) - """ - request = QNetworkRequest(QUrl(self.GetFileReportUrl)) - request.setHeader(QNetworkRequest.ContentTypeHeader, - "application/x-www-form-urlencoded") - params = QByteArray("apikey={0}&resource={1}".format( - Preferences.getHelp("VirusTotalServiceKey"), scanId) - .encode("utf-8")) - - import Helpviewer.HelpWindow - nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() - reply = nam.post(request, params) - reply.finished.connect(self.__getFileScanReportUrlFinished) - self.__replies.append(reply) - - def __getFileScanReportUrlFinished(self): - """ - Private slot to determine the result of the file scan report URL - request. - """ - reply = self.sender() - if reply.error() == QNetworkReply.NoError: - result = json.loads(str(reply.readAll(), "utf-8")) - self.fileScanReport.emit(result["permalink"]) - self.__replies.remove(reply) - reply.deleteLater() - - def getIpAddressReport(self, ipAddress): - """ - Public method to retrieve a report for an IP address. - - @param ipAddress valid IPv4 address in dotted quad notation - @type str - """ - self.__lastIP = ipAddress - - queryItems = [ - ("apikey", Preferences.getHelp("VirusTotalServiceKey")), - ("ip", ipAddress), - ] - url = QUrl(self.GetIpAddressReportUrl) - if qVersion() >= "5.0.0": - from PyQt5.QtCore import QUrlQuery - query = QUrlQuery() - query.setQueryItems(queryItems) - url.setQuery(query) - else: - url.setQueryItems(queryItems) - request = QNetworkRequest(url) - - import Helpviewer.HelpWindow - nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() - reply = nam.get(request) - reply.finished.connect(self.__getIpAddressReportFinished) - self.__replies.append(reply) - - def __getIpAddressReportFinished(self): - """ - Private slot to process the IP address report data. - """ - reply = self.sender() - if reply.error() == QNetworkReply.NoError: - result = json.loads(str(reply.readAll(), "utf-8")) - if result["response_code"] == 0: - E5MessageBox.information( - None, - self.tr("VirusTotal IP Address Report"), - self.tr("""VirusTotal does not have any information for""" - """ the given IP address.""")) - elif result["response_code"] == -1: - E5MessageBox.information( - None, - self.tr("VirusTotal IP Address Report"), - self.tr("""The submitted IP address is invalid.""")) - else: - owner = result["as_owner"] - resolutions = result["resolutions"] - try: - urls = result["detected_urls"] - except KeyError: - urls = [] - - from .VirusTotalIpReportDialog import VirusTotalIpReportDialog - self.__ipReportDlg = VirusTotalIpReportDialog( - self.__lastIP, owner, resolutions, urls) - self.__ipReportDlg.show() - self.__replies.remove(reply) - reply.deleteLater() - - def getDomainReport(self, domain): - """ - Public method to retrieve a report for a domain. - - @param domain domain name - @type str - """ - self.__lastDomain = domain - - queryItems = [ - ("apikey", Preferences.getHelp("VirusTotalServiceKey")), - ("domain", domain), - ] - url = QUrl(self.GetDomainReportUrl) - if qVersion() >= "5.0.0": - from PyQt5.QtCore import QUrlQuery - query = QUrlQuery() - query.setQueryItems(queryItems) - url.setQuery(query) - else: - url.setQueryItems(queryItems) - request = QNetworkRequest(url) - - import Helpviewer.HelpWindow - nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() - reply = nam.get(request) - reply.finished.connect(self.__getDomainReportFinished) - self.__replies.append(reply) - - def __getDomainReportFinished(self): - """ - Private slot to process the IP address report data. - """ - reply = self.sender() - if reply.error() == QNetworkReply.NoError: - result = json.loads(str(reply.readAll(), "utf-8")) - if result["response_code"] == 0: - E5MessageBox.information( - None, - self.tr("VirusTotal Domain Report"), - self.tr("""VirusTotal does not have any information for""" - """ the given domain.""")) - elif result["response_code"] == -1: - E5MessageBox.information( - None, - self.tr("VirusTotal Domain Report"), - self.tr("""The submitted domain address is invalid.""")) - else: - resolutions = result["resolutions"] - try: - urls = result["detected_urls"] - except KeyError: - urls = [] - try: - subdomains = result["subdomains"] - except KeyError: - subdomains = [] - try: - bdCategory = result["BitDefender category"] - except KeyError: - bdCategory = self.tr("not available") - try: - tmCategory = result["TrendMicro category"] - except KeyError: - tmCategory = self.tr("not available") - try: - wtsCategory = result["Websense ThreatSeeker category"] - except KeyError: - wtsCategory = self.tr("not available") - try: - categories = result["categories"] - except KeyError: - categories = [] - - from .VirusTotalDomainReportDialog import \ - VirusTotalDomainReportDialog - self.__domainReportDlg = VirusTotalDomainReportDialog( - self.__lastDomain, resolutions, urls, subdomains, - bdCategory, tmCategory, wtsCategory, categories) - self.__domainReportDlg.show() - self.__replies.remove(reply) - reply.deleteLater() - - def close(self): - """ - Public slot to close the API. - """ - for reply in self.__replies: - reply.abort() - - self.__ipReportDlg and self.__ipReportDlg.close() - self.__domainReportDlg and self.__domainReportDlg.close()