Helpviewer/VirusTotal/VirusTotalApi.py

changeset 4336
473bf2a8676f
parent 4335
a25c157625c4
child 4631
5c1a96925da4
equal deleted inserted replaced
4335:a25c157625c4 4336:473bf2a8676f
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2011 - 2015 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the <a href="http://www.virustotal.com">VirusTotal</a>
8 API class.
9 """
10
11 from __future__ import unicode_literals
12 try:
13 str = unicode
14 except NameError:
15 pass
16
17 import json
18
19 from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal, qVersion
20 from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
21
22 from E5Gui import E5MessageBox
23
24 import Preferences
25
26
27 class VirusTotalAPI(QObject):
28 """
29 Class implementing the <a href="http://www.virustotal.com">VirusTotal</a>
30 API.
31
32 @signal checkServiceKeyFinished(bool, str) emitted after the service key
33 check has been performed. It gives a flag indicating validity
34 (boolean) and an error message in case of a network error (string).
35 @signal submitUrlError(str) emitted with the error string, if the URL scan
36 submission returned an error.
37 @signal urlScanReport(str) emitted with the URL of the URL scan report page
38 @signal fileScanReport(str) emitted with the URL of the file scan report
39 page
40 """
41 checkServiceKeyFinished = pyqtSignal(bool, str)
42 submitUrlError = pyqtSignal(str)
43 urlScanReport = pyqtSignal(str)
44 fileScanReport = pyqtSignal(str)
45
46 TestServiceKeyScanID = \
47 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
48
49 ServiceResult_ItemQueued = -2
50 ServiceResult_ItemNotPresent = 0
51 ServiceResult_ItemPresent = 1
52
53 # HTTP Status Codes
54 ServiceCode_InvalidKey = 202
55 ServiceCode_RateLimitExceeded = 204
56 ServiceCode_InvalidPrivilege = 403
57
58 GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
59 ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
60 GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
61 GetIpAddressReportPattern = \
62 "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
63 GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
64
65 def __init__(self, parent=None):
66 """
67 Constructor
68
69 @param parent reference to the parent object (QObject)
70 """
71 super(VirusTotalAPI, self).__init__(parent)
72
73 self.__replies = []
74
75 self.__loadSettings()
76
77 self.__lastIP = ""
78 self.__lastDomain = ""
79 self.__ipReportDlg = None
80 self.__domainReportDlg = None
81
82 def __loadSettings(self):
83 """
84 Private method to load the settings.
85 """
86 if Preferences.getHelp("VirusTotalSecure"):
87 protocol = "https"
88 else:
89 protocol = "http"
90 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
91 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
92 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
93 self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
94 protocol)
95 self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
96
97 self.errorMessages = {
98 204: self.tr("Request limit has been reached."),
99 0: self.tr("Requested item is not present."),
100 -2: self.tr("Requested item is still queued."),
101 }
102
103 def preferencesChanged(self):
104 """
105 Public slot to handle a change of preferences.
106 """
107 self.__loadSettings()
108
109 def checkServiceKeyValidity(self, key, protocol=""):
110 """
111 Public method to check the validity of the given service key.
112
113 @param key service key (string)
114 @param protocol protocol used to access VirusTotal (string)
115 """
116 if protocol == "":
117 urlStr = self.GetFileReportUrl
118 else:
119 urlStr = self.GetFileReportPattern.format(protocol)
120 request = QNetworkRequest(QUrl(urlStr))
121 request.setHeader(QNetworkRequest.ContentTypeHeader,
122 "application/x-www-form-urlencoded")
123 params = QByteArray("apikey={0}&resource={1}".format(
124 key, self.TestServiceKeyScanID).encode("utf-8"))
125
126 import Helpviewer.HelpWindow
127 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
128 reply = nam.post(request, params)
129 reply.finished.connect(self.__checkServiceKeyValidityFinished)
130 self.__replies.append(reply)
131
132 def __checkServiceKeyValidityFinished(self):
133 """
134 Private slot to determine the result of the service key validity check.
135 """
136 res = False
137 msg = ""
138
139 reply = self.sender()
140 if reply.error() == QNetworkReply.NoError:
141 res = True
142 elif reply.error() == self.ServiceCode_InvalidKey:
143 res = False
144 else:
145 msg = reply.errorString()
146 self.__replies.remove(reply)
147 reply.deleteLater()
148
149 self.checkServiceKeyFinished.emit(res, msg)
150
151 def submitUrl(self, url):
152 """
153 Public method to submit an URL to be scanned.
154
155 @param url url to be scanned (QUrl)
156 """
157 request = QNetworkRequest(QUrl(self.ScanUrlUrl))
158 request.setHeader(QNetworkRequest.ContentTypeHeader,
159 "application/x-www-form-urlencoded")
160 params = QByteArray("apikey={0}&url=".format(
161 Preferences.getHelp("VirusTotalServiceKey")).encode("utf-8"))\
162 .append(QUrl.toPercentEncoding(url.toString()))
163
164 import Helpviewer.HelpWindow
165 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
166 reply = nam.post(request, params)
167 reply.finished.connect(self.__submitUrlFinished)
168 self.__replies.append(reply)
169
170 def __submitUrlFinished(self):
171 """
172 Private slot to determine the result of the URL scan submission.
173 """
174 reply = self.sender()
175 if reply.error() == QNetworkReply.NoError:
176 result = json.loads(str(reply.readAll(), "utf-8"))
177 if result["response_code"] == self.ServiceResult_ItemPresent:
178 self.urlScanReport.emit(result["permalink"])
179 self.__getUrlScanReportUrl(result["scan_id"])
180 else:
181 if result["response_code"] in self.errorMessages:
182 msg = self.errorMessages[result["response_code"]]
183 else:
184 msg = result["verbose_msg"]
185 self.submitUrlError.emit(msg)
186 elif reply.error() == self.ServiceCode_RateLimitExceeded:
187 self.submitUrlError.emit(
188 self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
189 else:
190 self.submitUrlError.emit(reply.errorString())
191 self.__replies.remove(reply)
192 reply.deleteLater()
193
194 def __getUrlScanReportUrl(self, scanId):
195 """
196 Private method to get the report URL for a URL scan.
197
198 @param scanId ID of the scan to get the report URL for (string)
199 """
200 request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
201 request.setHeader(QNetworkRequest.ContentTypeHeader,
202 "application/x-www-form-urlencoded")
203 params = QByteArray("apikey={0}&resource={1}".format(
204 Preferences.getHelp("VirusTotalServiceKey"), scanId)
205 .encode("utf-8"))
206
207 import Helpviewer.HelpWindow
208 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
209 reply = nam.post(request, params)
210 reply.finished.connect(self.__getUrlScanReportUrlFinished)
211 self.__replies.append(reply)
212
213 def __getUrlScanReportUrlFinished(self):
214 """
215 Private slot to determine the result of the URL scan report URL
216 request.
217 """
218 reply = self.sender()
219 if reply.error() == QNetworkReply.NoError:
220 result = json.loads(str(reply.readAll(), "utf-8"))
221 if "filescan_id" in result and result["filescan_id"] is not None:
222 self.__getFileScanReportUrl(result["filescan_id"])
223 self.__replies.remove(reply)
224 reply.deleteLater()
225
226 def __getFileScanReportUrl(self, scanId):
227 """
228 Private method to get the report URL for a file scan.
229
230 @param scanId ID of the scan to get the report URL for (string)
231 """
232 request = QNetworkRequest(QUrl(self.GetFileReportUrl))
233 request.setHeader(QNetworkRequest.ContentTypeHeader,
234 "application/x-www-form-urlencoded")
235 params = QByteArray("apikey={0}&resource={1}".format(
236 Preferences.getHelp("VirusTotalServiceKey"), scanId)
237 .encode("utf-8"))
238
239 import Helpviewer.HelpWindow
240 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
241 reply = nam.post(request, params)
242 reply.finished.connect(self.__getFileScanReportUrlFinished)
243 self.__replies.append(reply)
244
245 def __getFileScanReportUrlFinished(self):
246 """
247 Private slot to determine the result of the file scan report URL
248 request.
249 """
250 reply = self.sender()
251 if reply.error() == QNetworkReply.NoError:
252 result = json.loads(str(reply.readAll(), "utf-8"))
253 self.fileScanReport.emit(result["permalink"])
254 self.__replies.remove(reply)
255 reply.deleteLater()
256
257 def getIpAddressReport(self, ipAddress):
258 """
259 Public method to retrieve a report for an IP address.
260
261 @param ipAddress valid IPv4 address in dotted quad notation
262 @type str
263 """
264 self.__lastIP = ipAddress
265
266 queryItems = [
267 ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
268 ("ip", ipAddress),
269 ]
270 url = QUrl(self.GetIpAddressReportUrl)
271 if qVersion() >= "5.0.0":
272 from PyQt5.QtCore import QUrlQuery
273 query = QUrlQuery()
274 query.setQueryItems(queryItems)
275 url.setQuery(query)
276 else:
277 url.setQueryItems(queryItems)
278 request = QNetworkRequest(url)
279
280 import Helpviewer.HelpWindow
281 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
282 reply = nam.get(request)
283 reply.finished.connect(self.__getIpAddressReportFinished)
284 self.__replies.append(reply)
285
286 def __getIpAddressReportFinished(self):
287 """
288 Private slot to process the IP address report data.
289 """
290 reply = self.sender()
291 if reply.error() == QNetworkReply.NoError:
292 result = json.loads(str(reply.readAll(), "utf-8"))
293 if result["response_code"] == 0:
294 E5MessageBox.information(
295 None,
296 self.tr("VirusTotal IP Address Report"),
297 self.tr("""VirusTotal does not have any information for"""
298 """ the given IP address."""))
299 elif result["response_code"] == -1:
300 E5MessageBox.information(
301 None,
302 self.tr("VirusTotal IP Address Report"),
303 self.tr("""The submitted IP address is invalid."""))
304 else:
305 owner = result["as_owner"]
306 resolutions = result["resolutions"]
307 try:
308 urls = result["detected_urls"]
309 except KeyError:
310 urls = []
311
312 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
313 self.__ipReportDlg = VirusTotalIpReportDialog(
314 self.__lastIP, owner, resolutions, urls)
315 self.__ipReportDlg.show()
316 self.__replies.remove(reply)
317 reply.deleteLater()
318
319 def getDomainReport(self, domain):
320 """
321 Public method to retrieve a report for a domain.
322
323 @param domain domain name
324 @type str
325 """
326 self.__lastDomain = domain
327
328 queryItems = [
329 ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
330 ("domain", domain),
331 ]
332 url = QUrl(self.GetDomainReportUrl)
333 if qVersion() >= "5.0.0":
334 from PyQt5.QtCore import QUrlQuery
335 query = QUrlQuery()
336 query.setQueryItems(queryItems)
337 url.setQuery(query)
338 else:
339 url.setQueryItems(queryItems)
340 request = QNetworkRequest(url)
341
342 import Helpviewer.HelpWindow
343 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
344 reply = nam.get(request)
345 reply.finished.connect(self.__getDomainReportFinished)
346 self.__replies.append(reply)
347
348 def __getDomainReportFinished(self):
349 """
350 Private slot to process the IP address report data.
351 """
352 reply = self.sender()
353 if reply.error() == QNetworkReply.NoError:
354 result = json.loads(str(reply.readAll(), "utf-8"))
355 if result["response_code"] == 0:
356 E5MessageBox.information(
357 None,
358 self.tr("VirusTotal Domain Report"),
359 self.tr("""VirusTotal does not have any information for"""
360 """ the given domain."""))
361 elif result["response_code"] == -1:
362 E5MessageBox.information(
363 None,
364 self.tr("VirusTotal Domain Report"),
365 self.tr("""The submitted domain address is invalid."""))
366 else:
367 resolutions = result["resolutions"]
368 try:
369 urls = result["detected_urls"]
370 except KeyError:
371 urls = []
372 try:
373 subdomains = result["subdomains"]
374 except KeyError:
375 subdomains = []
376 try:
377 bdCategory = result["BitDefender category"]
378 except KeyError:
379 bdCategory = self.tr("not available")
380 try:
381 tmCategory = result["TrendMicro category"]
382 except KeyError:
383 tmCategory = self.tr("not available")
384 try:
385 wtsCategory = result["Websense ThreatSeeker category"]
386 except KeyError:
387 wtsCategory = self.tr("not available")
388 try:
389 whois = result["whois"]
390 except KeyError:
391 whois = ""
392
393 from .VirusTotalDomainReportDialog import \
394 VirusTotalDomainReportDialog
395 self.__domainReportDlg = VirusTotalDomainReportDialog(
396 self.__lastDomain, resolutions, urls, subdomains,
397 bdCategory, tmCategory, wtsCategory, whois)
398 self.__domainReportDlg.show()
399 self.__replies.remove(reply)
400 reply.deleteLater()
401
402 def close(self):
403 """
404 Public slot to close the API.
405 """
406 for reply in self.__replies:
407 reply.abort()
408
409 self.__ipReportDlg and self.__ipReportDlg.close()
410 self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial