eric6/WebBrowser/VirusTotal/VirusTotalApi.py

changeset 6942
2602857055c5
parent 6645
ad476851d7e0
child 7192
a22eee00b052
equal deleted inserted replaced
6941:f99d60d6b59b 6942:2602857055c5
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2011 - 2019 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the <a href="http://www.virustotal.com">VirusTotal</a>
8 API class.
9 """
10
11 from __future__ import unicode_literals
12 try:
13 str = unicode # __IGNORE_EXCEPTION__
14 except NameError:
15 pass
16
17 import json
18
19 from PyQt5.QtCore import pyqtSignal, QObject, QUrl, QUrlQuery, QByteArray
20 from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
21
22 from E5Gui import E5MessageBox
23
24 import Preferences
25
26
27 class VirusTotalAPI(QObject):
28 """
29 Class implementing the <a href="http://www.virustotal.com">VirusTotal</a>
30 API.
31
32 @signal checkServiceKeyFinished(bool, str) emitted after the service key
33 check has been performed. It gives a flag indicating validity
34 (boolean) and an error message in case of a network error (string).
35 @signal submitUrlError(str) emitted with the error string, if the URL scan
36 submission returned an error.
37 @signal urlScanReport(str) emitted with the URL of the URL scan report page
38 @signal fileScanReport(str) emitted with the URL of the file scan report
39 page
40 """
41 checkServiceKeyFinished = pyqtSignal(bool, str)
42 submitUrlError = pyqtSignal(str)
43 urlScanReport = pyqtSignal(str)
44 fileScanReport = pyqtSignal(str)
45
46 TestServiceKeyScanID = \
47 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
48
49 ServiceResult_ItemQueued = -2
50 ServiceResult_ItemNotPresent = 0
51 ServiceResult_ItemPresent = 1
52
53 # HTTP Status Codes
54 ServiceCode_InvalidKey = 202
55 ServiceCode_RateLimitExceeded = 204
56 ServiceCode_InvalidPrivilege = 403
57
58 GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
59 ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
60 GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
61 GetIpAddressReportPattern = \
62 "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
63 GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
64
65 def __init__(self, parent=None):
66 """
67 Constructor
68
69 @param parent reference to the parent object (QObject)
70 """
71 super(VirusTotalAPI, self).__init__(parent)
72
73 self.__replies = []
74
75 self.__loadSettings()
76
77 self.__lastIP = ""
78 self.__lastDomain = ""
79 self.__ipReportDlg = None
80 self.__domainReportDlg = None
81
82 def __loadSettings(self):
83 """
84 Private method to load the settings.
85 """
86 if Preferences.getWebBrowser("VirusTotalSecure"):
87 protocol = "https"
88 else:
89 protocol = "http"
90 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
91 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
92 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
93 self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
94 protocol)
95 self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
96
97 self.errorMessages = {
98 204: self.tr("Request limit has been reached."),
99 0: self.tr("Requested item is not present."),
100 -2: self.tr("Requested item is still queued."),
101 }
102
103 def preferencesChanged(self):
104 """
105 Public slot to handle a change of preferences.
106 """
107 self.__loadSettings()
108
109 def checkServiceKeyValidity(self, key, protocol=""):
110 """
111 Public method to check the validity of the given service key.
112
113 @param key service key (string)
114 @param protocol protocol used to access VirusTotal (string)
115 """
116 if protocol == "":
117 urlStr = self.GetFileReportUrl
118 else:
119 urlStr = self.GetFileReportPattern.format(protocol)
120 request = QNetworkRequest(QUrl(urlStr))
121 request.setHeader(QNetworkRequest.ContentTypeHeader,
122 "application/x-www-form-urlencoded")
123 params = QByteArray("apikey={0}&resource={1}".format(
124 key, self.TestServiceKeyScanID).encode("utf-8"))
125
126 import WebBrowser.WebBrowserWindow
127 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
128 .networkManager()
129 reply = nam.post(request, params)
130 reply.finished.connect(
131 lambda: self.__checkServiceKeyValidityFinished(reply))
132 self.__replies.append(reply)
133
134 def __checkServiceKeyValidityFinished(self, reply):
135 """
136 Private slot to determine the result of the service key validity check.
137
138 @param reply reference to the network reply
139 @type QNetworkReply
140 """
141 res = False
142 msg = ""
143
144 if reply.error() == QNetworkReply.NoError:
145 res = True
146 elif reply.error() == self.ServiceCode_InvalidKey:
147 res = False
148 else:
149 msg = reply.errorString()
150 self.__replies.remove(reply)
151 reply.deleteLater()
152
153 self.checkServiceKeyFinished.emit(res, msg)
154
155 def submitUrl(self, url):
156 """
157 Public method to submit an URL to be scanned.
158
159 @param url url to be scanned (QUrl)
160 """
161 request = QNetworkRequest(QUrl(self.ScanUrlUrl))
162 request.setHeader(QNetworkRequest.ContentTypeHeader,
163 "application/x-www-form-urlencoded")
164 params = QByteArray("apikey={0}&url=".format(
165 Preferences.getWebBrowser("VirusTotalServiceKey"))
166 .encode("utf-8")).append(QUrl.toPercentEncoding(url.toString()))
167
168 import WebBrowser.WebBrowserWindow
169 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
170 .networkManager()
171 reply = nam.post(request, params)
172 reply.finished.connect(self.__submitUrlFinished)
173 self.__replies.append(reply)
174
175 def __submitUrlFinished(self, reply):
176 """
177 Private slot to determine the result of the URL scan submission.
178
179 @param reply reference to the network reply
180 @type QNetworkReply
181 """
182 if reply.error() == QNetworkReply.NoError:
183 result = json.loads(str(reply.readAll(), "utf-8"))
184 if result["response_code"] == self.ServiceResult_ItemPresent:
185 self.urlScanReport.emit(result["permalink"])
186 self.__getUrlScanReportUrl(result["scan_id"])
187 else:
188 if result["response_code"] in self.errorMessages:
189 msg = self.errorMessages[result["response_code"]]
190 else:
191 msg = result["verbose_msg"]
192 self.submitUrlError.emit(msg)
193 elif reply.error() == self.ServiceCode_RateLimitExceeded:
194 self.submitUrlError.emit(
195 self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
196 else:
197 self.submitUrlError.emit(reply.errorString())
198 self.__replies.remove(reply)
199 reply.deleteLater()
200
201 def __getUrlScanReportUrl(self, scanId):
202 """
203 Private method to get the report URL for a URL scan.
204
205 @param scanId ID of the scan to get the report URL for (string)
206 """
207 request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
208 request.setHeader(QNetworkRequest.ContentTypeHeader,
209 "application/x-www-form-urlencoded")
210 params = QByteArray("apikey={0}&resource={1}".format(
211 Preferences.getWebBrowser("VirusTotalServiceKey"), scanId)
212 .encode("utf-8"))
213
214 import WebBrowser.WebBrowserWindow
215 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
216 .networkManager()
217 reply = nam.post(request, params)
218 reply.finished.connect(self.__getUrlScanReportUrlFinished)
219 self.__replies.append(reply)
220
221 def __getUrlScanReportUrlFinished(self, reply):
222 """
223 Private slot to determine the result of the URL scan report URL.
224
225 @param reply reference to the network reply
226 @type QNetworkReply
227 request.
228 """
229 if reply.error() == QNetworkReply.NoError:
230 result = json.loads(str(reply.readAll(), "utf-8"))
231 if "filescan_id" in result and result["filescan_id"] is not None:
232 self.__getFileScanReportUrl(result["filescan_id"])
233 self.__replies.remove(reply)
234 reply.deleteLater()
235
236 def __getFileScanReportUrl(self, scanId):
237 """
238 Private method to get the report URL for a file scan.
239
240 @param scanId ID of the scan to get the report URL for (string)
241 """
242 request = QNetworkRequest(QUrl(self.GetFileReportUrl))
243 request.setHeader(QNetworkRequest.ContentTypeHeader,
244 "application/x-www-form-urlencoded")
245 params = QByteArray("apikey={0}&resource={1}".format(
246 Preferences.getWebBrowser("VirusTotalServiceKey"), scanId)
247 .encode("utf-8"))
248
249 import WebBrowser.WebBrowserWindow
250 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
251 .networkManager()
252 reply = nam.post(request, params)
253 reply.finished.connect(self.__getFileScanReportUrlFinished)
254 self.__replies.append(reply)
255
256 def __getFileScanReportUrlFinished(self, reply):
257 """
258 Private slot to determine the result of the file scan report URL
259 request.
260
261 @param reply reference to the network reply
262 @type QNetworkReply
263 """
264 if reply.error() == QNetworkReply.NoError:
265 result = json.loads(str(reply.readAll(), "utf-8"))
266 self.fileScanReport.emit(result["permalink"])
267 self.__replies.remove(reply)
268 reply.deleteLater()
269
270 def getIpAddressReport(self, ipAddress):
271 """
272 Public method to retrieve a report for an IP address.
273
274 @param ipAddress valid IPv4 address in dotted quad notation
275 @type str
276 """
277 self.__lastIP = ipAddress
278
279 queryItems = [
280 ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")),
281 ("ip", ipAddress),
282 ]
283 url = QUrl(self.GetIpAddressReportUrl)
284 query = QUrlQuery()
285 query.setQueryItems(queryItems)
286 url.setQuery(query)
287 request = QNetworkRequest(url)
288
289 import WebBrowser.WebBrowserWindow
290 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
291 .networkManager()
292 reply = nam.get(request)
293 reply.finished.connect(self.__getIpAddressReportFinished)
294 self.__replies.append(reply)
295
296 def __getIpAddressReportFinished(self, reply):
297 """
298 Private slot to process the IP address report data.
299
300 @param reply reference to the network reply
301 @type QNetworkReply
302 """
303 if reply.error() == QNetworkReply.NoError:
304 result = json.loads(str(reply.readAll(), "utf-8"))
305 if result["response_code"] == 0:
306 E5MessageBox.information(
307 None,
308 self.tr("VirusTotal IP Address Report"),
309 self.tr("""VirusTotal does not have any information for"""
310 """ the given IP address."""))
311 elif result["response_code"] == -1:
312 E5MessageBox.information(
313 None,
314 self.tr("VirusTotal IP Address Report"),
315 self.tr("""The submitted IP address is invalid."""))
316 else:
317 owner = result["as_owner"]
318 resolutions = result["resolutions"]
319 try:
320 urls = result["detected_urls"]
321 except KeyError:
322 urls = []
323
324 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
325 self.__ipReportDlg = VirusTotalIpReportDialog(
326 self.__lastIP, owner, resolutions, urls)
327 self.__ipReportDlg.show()
328 self.__replies.remove(reply)
329 reply.deleteLater()
330
331 def getDomainReport(self, domain):
332 """
333 Public method to retrieve a report for a domain.
334
335 @param domain domain name
336 @type str
337 """
338 self.__lastDomain = domain
339
340 queryItems = [
341 ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")),
342 ("domain", domain),
343 ]
344 url = QUrl(self.GetDomainReportUrl)
345 query = QUrlQuery()
346 query.setQueryItems(queryItems)
347 url.setQuery(query)
348 request = QNetworkRequest(url)
349
350 import WebBrowser.WebBrowserWindow
351 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
352 .networkManager()
353 reply = nam.get(request)
354 reply.finished.connect(lambda: self.__getDomainReportFinished(reply))
355 self.__replies.append(reply)
356
357 def __getDomainReportFinished(self, reply):
358 """
359 Private slot to process the IP address report data.
360
361 @param reply reference to the network reply
362 @type QNetworkReply
363 """
364 if reply.error() == QNetworkReply.NoError:
365 result = json.loads(str(reply.readAll(), "utf-8"))
366 if result["response_code"] == 0:
367 E5MessageBox.information(
368 None,
369 self.tr("VirusTotal Domain Report"),
370 self.tr("""VirusTotal does not have any information for"""
371 """ the given domain."""))
372 elif result["response_code"] == -1:
373 E5MessageBox.information(
374 None,
375 self.tr("VirusTotal Domain Report"),
376 self.tr("""The submitted domain address is invalid."""))
377 else:
378 resolutions = result["resolutions"]
379 try:
380 urls = result["detected_urls"]
381 except KeyError:
382 urls = []
383 try:
384 subdomains = result["subdomains"]
385 except KeyError:
386 subdomains = []
387 try:
388 bdCategory = result["BitDefender category"]
389 except KeyError:
390 bdCategory = self.tr("not available")
391 try:
392 tmCategory = result["TrendMicro category"]
393 except KeyError:
394 tmCategory = self.tr("not available")
395 try:
396 wtsCategory = result["Websense ThreatSeeker category"]
397 except KeyError:
398 wtsCategory = self.tr("not available")
399 try:
400 whois = result["whois"]
401 except KeyError:
402 whois = ""
403
404 from .VirusTotalDomainReportDialog import \
405 VirusTotalDomainReportDialog
406 self.__domainReportDlg = VirusTotalDomainReportDialog(
407 self.__lastDomain, resolutions, urls, subdomains,
408 bdCategory, tmCategory, wtsCategory, whois)
409 self.__domainReportDlg.show()
410 self.__replies.remove(reply)
411 reply.deleteLater()
412
413 def close(self):
414 """
415 Public slot to close the API.
416 """
417 for reply in self.__replies:
418 reply.abort()
419
420 self.__ipReportDlg and self.__ipReportDlg.close()
421 self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial