eric6/Helpviewer/VirusTotal/VirusTotalApi.py

changeset 6942
2602857055c5
parent 6645
ad476851d7e0
equal deleted inserted replaced
6941:f99d60d6b59b 6942:2602857055c5
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2011 - 2019 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the <a href="http://www.virustotal.com">VirusTotal</a>
8 API class.
9 """
10
11 from __future__ import unicode_literals
12 try:
13 str = unicode
14 except NameError:
15 pass
16
17 import json
18
19 from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal
20 from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
21
22 from E5Gui import E5MessageBox
23
24 import Preferences
25 from Globals import qVersionTuple
26
27
28 class VirusTotalAPI(QObject):
29 """
30 Class implementing the <a href="http://www.virustotal.com">VirusTotal</a>
31 API.
32
33 @signal checkServiceKeyFinished(bool, str) emitted after the service key
34 check has been performed. It gives a flag indicating validity
35 (boolean) and an error message in case of a network error (string).
36 @signal submitUrlError(str) emitted with the error string, if the URL scan
37 submission returned an error.
38 @signal urlScanReport(str) emitted with the URL of the URL scan report page
39 @signal fileScanReport(str) emitted with the URL of the file scan report
40 page
41 """
42 checkServiceKeyFinished = pyqtSignal(bool, str)
43 submitUrlError = pyqtSignal(str)
44 urlScanReport = pyqtSignal(str)
45 fileScanReport = pyqtSignal(str)
46
47 TestServiceKeyScanID = \
48 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
49
50 ServiceResult_ItemQueued = -2
51 ServiceResult_ItemNotPresent = 0
52 ServiceResult_ItemPresent = 1
53
54 # HTTP Status Codes
55 ServiceCode_InvalidKey = 202
56 ServiceCode_RateLimitExceeded = 204
57 ServiceCode_InvalidPrivilege = 403
58
59 GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
60 ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
61 GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
62 GetIpAddressReportPattern = \
63 "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
64 GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
65
66 def __init__(self, parent=None):
67 """
68 Constructor
69
70 @param parent reference to the parent object (QObject)
71 """
72 super(VirusTotalAPI, self).__init__(parent)
73
74 self.__replies = []
75
76 self.__loadSettings()
77
78 self.__lastIP = ""
79 self.__lastDomain = ""
80 self.__ipReportDlg = None
81 self.__domainReportDlg = None
82
83 def __loadSettings(self):
84 """
85 Private method to load the settings.
86 """
87 if Preferences.getHelp("VirusTotalSecure"):
88 protocol = "https"
89 else:
90 protocol = "http"
91 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
92 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
93 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
94 self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
95 protocol)
96 self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
97
98 self.errorMessages = {
99 204: self.tr("Request limit has been reached."),
100 0: self.tr("Requested item is not present."),
101 -2: self.tr("Requested item is still queued."),
102 }
103
104 def preferencesChanged(self):
105 """
106 Public slot to handle a change of preferences.
107 """
108 self.__loadSettings()
109
110 def checkServiceKeyValidity(self, key, protocol=""):
111 """
112 Public method to check the validity of the given service key.
113
114 @param key service key (string)
115 @param protocol protocol used to access VirusTotal (string)
116 """
117 if protocol == "":
118 urlStr = self.GetFileReportUrl
119 else:
120 urlStr = self.GetFileReportPattern.format(protocol)
121 request = QNetworkRequest(QUrl(urlStr))
122 request.setHeader(QNetworkRequest.ContentTypeHeader,
123 "application/x-www-form-urlencoded")
124 params = QByteArray("apikey={0}&resource={1}".format(
125 key, self.TestServiceKeyScanID).encode("utf-8"))
126
127 import Helpviewer.HelpWindow
128 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
129 reply = nam.post(request, params)
130 reply.finished.connect(
131 lambda: self.__checkServiceKeyValidityFinished(reply))
132 self.__replies.append(reply)
133
134 def __checkServiceKeyValidityFinished(self, reply):
135 """
136 Private slot to determine the result of the service key validity check.
137
138 @param reply reference to the network reply
139 @type QNetworkReply
140 """
141 res = False
142 msg = ""
143
144 if reply.error() == QNetworkReply.NoError:
145 res = True
146 elif reply.error() == self.ServiceCode_InvalidKey:
147 res = False
148 else:
149 msg = reply.errorString()
150 self.__replies.remove(reply)
151 reply.deleteLater()
152
153 self.checkServiceKeyFinished.emit(res, msg)
154
155 def submitUrl(self, url):
156 """
157 Public method to submit an URL to be scanned.
158
159 @param url url to be scanned (QUrl)
160 """
161 request = QNetworkRequest(QUrl(self.ScanUrlUrl))
162 request.setHeader(QNetworkRequest.ContentTypeHeader,
163 "application/x-www-form-urlencoded")
164 params = QByteArray("apikey={0}&url=".format(
165 Preferences.getHelp("VirusTotalServiceKey")).encode("utf-8"))\
166 .append(QUrl.toPercentEncoding(url.toString()))
167
168 import Helpviewer.HelpWindow
169 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
170 reply = nam.post(request, params)
171 reply.finished.connect(self.__submitUrlFinished)
172 self.__replies.append(reply)
173
174 def __submitUrlFinished(self, reply):
175 """
176 Private slot to determine the result of the URL scan submission.
177
178 @param reply reference to the network reply
179 @type QNetworkReply
180 """
181 if reply.error() == QNetworkReply.NoError:
182 result = json.loads(str(reply.readAll(), "utf-8"))
183 if result["response_code"] == self.ServiceResult_ItemPresent:
184 self.urlScanReport.emit(result["permalink"])
185 self.__getUrlScanReportUrl(result["scan_id"])
186 else:
187 if result["response_code"] in self.errorMessages:
188 msg = self.errorMessages[result["response_code"]]
189 else:
190 msg = result["verbose_msg"]
191 self.submitUrlError.emit(msg)
192 elif reply.error() == self.ServiceCode_RateLimitExceeded:
193 self.submitUrlError.emit(
194 self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
195 else:
196 self.submitUrlError.emit(reply.errorString())
197 self.__replies.remove(reply)
198 reply.deleteLater()
199
200 def __getUrlScanReportUrl(self, scanId):
201 """
202 Private method to get the report URL for a URL scan.
203
204 @param scanId ID of the scan to get the report URL for (string)
205 """
206 request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
207 request.setHeader(QNetworkRequest.ContentTypeHeader,
208 "application/x-www-form-urlencoded")
209 params = QByteArray("apikey={0}&resource={1}".format(
210 Preferences.getHelp("VirusTotalServiceKey"), scanId)
211 .encode("utf-8"))
212
213 import Helpviewer.HelpWindow
214 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
215 reply = nam.post(request, params)
216 reply.finished.connect(self.__getUrlScanReportUrlFinished)
217 self.__replies.append(reply)
218
219 def __getUrlScanReportUrlFinished(self, reply):
220 """
221 Private slot to determine the result of the URL scan report URL
222 request.
223
224 @param reply reference to the network reply
225 @type QNetworkReply
226 """
227 if reply.error() == QNetworkReply.NoError:
228 result = json.loads(str(reply.readAll(), "utf-8"))
229 if "filescan_id" in result and result["filescan_id"] is not None:
230 self.__getFileScanReportUrl(result["filescan_id"])
231 self.__replies.remove(reply)
232 reply.deleteLater()
233
234 def __getFileScanReportUrl(self, scanId):
235 """
236 Private method to get the report URL for a file scan.
237
238 @param scanId ID of the scan to get the report URL for (string)
239 """
240 request = QNetworkRequest(QUrl(self.GetFileReportUrl))
241 request.setHeader(QNetworkRequest.ContentTypeHeader,
242 "application/x-www-form-urlencoded")
243 params = QByteArray("apikey={0}&resource={1}".format(
244 Preferences.getHelp("VirusTotalServiceKey"), scanId)
245 .encode("utf-8"))
246
247 import Helpviewer.HelpWindow
248 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
249 reply = nam.post(request, params)
250 reply.finished.connect(self.__getFileScanReportUrlFinished)
251 self.__replies.append(reply)
252
253 def __getFileScanReportUrlFinished(self, reply):
254 """
255 Private slot to determine the result of the file scan report URL
256 request.
257
258 @param reply reference to the network reply
259 @type QNetworkReply
260 """
261 if reply.error() == QNetworkReply.NoError:
262 result = json.loads(str(reply.readAll(), "utf-8"))
263 self.fileScanReport.emit(result["permalink"])
264 self.__replies.remove(reply)
265 reply.deleteLater()
266
267 def getIpAddressReport(self, ipAddress):
268 """
269 Public method to retrieve a report for an IP address.
270
271 @param ipAddress valid IPv4 address in dotted quad notation
272 @type str
273 """
274 self.__lastIP = ipAddress
275
276 queryItems = [
277 ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
278 ("ip", ipAddress),
279 ]
280 url = QUrl(self.GetIpAddressReportUrl)
281 if qVersionTuple() >= (5, 0, 0):
282 from PyQt5.QtCore import QUrlQuery
283 query = QUrlQuery()
284 query.setQueryItems(queryItems)
285 url.setQuery(query)
286 else:
287 url.setQueryItems(queryItems)
288 request = QNetworkRequest(url)
289
290 import Helpviewer.HelpWindow
291 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
292 reply = nam.get(request)
293 reply.finished.connect(self.__getIpAddressReportFinished)
294 self.__replies.append(reply)
295
296 def __getIpAddressReportFinished(self, reply):
297 """
298 Private slot to process the IP address report data.
299
300 @param reply reference to the network reply
301 @type QNetworkReply
302 """
303 if reply.error() == QNetworkReply.NoError:
304 result = json.loads(str(reply.readAll(), "utf-8"))
305 if result["response_code"] == 0:
306 E5MessageBox.information(
307 None,
308 self.tr("VirusTotal IP Address Report"),
309 self.tr("""VirusTotal does not have any information for"""
310 """ the given IP address."""))
311 elif result["response_code"] == -1:
312 E5MessageBox.information(
313 None,
314 self.tr("VirusTotal IP Address Report"),
315 self.tr("""The submitted IP address is invalid."""))
316 else:
317 owner = result["as_owner"]
318 resolutions = result["resolutions"]
319 try:
320 urls = result["detected_urls"]
321 except KeyError:
322 urls = []
323
324 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
325 self.__ipReportDlg = VirusTotalIpReportDialog(
326 self.__lastIP, owner, resolutions, urls)
327 self.__ipReportDlg.show()
328 self.__replies.remove(reply)
329 reply.deleteLater()
330
331 def getDomainReport(self, domain):
332 """
333 Public method to retrieve a report for a domain.
334
335 @param domain domain name
336 @type str
337 """
338 self.__lastDomain = domain
339
340 queryItems = [
341 ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
342 ("domain", domain),
343 ]
344 url = QUrl(self.GetDomainReportUrl)
345 if qVersionTuple() >= (5, 0, 0):
346 from PyQt5.QtCore import QUrlQuery
347 query = QUrlQuery()
348 query.setQueryItems(queryItems)
349 url.setQuery(query)
350 else:
351 url.setQueryItems(queryItems)
352 request = QNetworkRequest(url)
353
354 import Helpviewer.HelpWindow
355 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
356 reply = nam.get(request)
357 reply.finished.connect(lambda: self.__getDomainReportFinished(reply))
358 self.__replies.append(reply)
359
360 def __getDomainReportFinished(self, reply):
361 """
362 Private slot to process the IP address report data.
363
364 @param reply reference to the network reply
365 @type QNetworkReply
366 """
367 if reply.error() == QNetworkReply.NoError:
368 result = json.loads(str(reply.readAll(), "utf-8"))
369 if result["response_code"] == 0:
370 E5MessageBox.information(
371 None,
372 self.tr("VirusTotal Domain Report"),
373 self.tr("""VirusTotal does not have any information for"""
374 """ the given domain."""))
375 elif result["response_code"] == -1:
376 E5MessageBox.information(
377 None,
378 self.tr("VirusTotal Domain Report"),
379 self.tr("""The submitted domain address is invalid."""))
380 else:
381 resolutions = result["resolutions"]
382 try:
383 urls = result["detected_urls"]
384 except KeyError:
385 urls = []
386 try:
387 subdomains = result["subdomains"]
388 except KeyError:
389 subdomains = []
390 try:
391 bdCategory = result["BitDefender category"]
392 except KeyError:
393 bdCategory = self.tr("not available")
394 try:
395 tmCategory = result["TrendMicro category"]
396 except KeyError:
397 tmCategory = self.tr("not available")
398 try:
399 wtsCategory = result["Websense ThreatSeeker category"]
400 except KeyError:
401 wtsCategory = self.tr("not available")
402 try:
403 whois = result["whois"]
404 except KeyError:
405 whois = ""
406
407 from .VirusTotalDomainReportDialog import \
408 VirusTotalDomainReportDialog
409 self.__domainReportDlg = VirusTotalDomainReportDialog(
410 self.__lastDomain, resolutions, urls, subdomains,
411 bdCategory, tmCategory, wtsCategory, whois)
412 self.__domainReportDlg.show()
413 self.__replies.remove(reply)
414 reply.deleteLater()
415
416 def close(self):
417 """
418 Public slot to close the API.
419 """
420 for reply in self.__replies:
421 reply.abort()
422
423 self.__ipReportDlg and self.__ipReportDlg.close()
424 self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial