WebBrowser/VirusTotal/VirusTotalApi.py

branch
QtWebEngine
changeset 4753
8d2ea02ed785
parent 4631
5c1a96925da4
child 4768
57da9217196b
equal deleted inserted replaced
4752:a3bcc42a82a9 4753:8d2ea02ed785
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2011 - 2016 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the <a href="http://www.virustotal.com">VirusTotal</a>
8 API class.
9 """
10
11 from __future__ import unicode_literals
12 try:
13 str = unicode # __IGNORE_EXCEPTION__
14 except NameError:
15 pass
16
17 import json
18
19 from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal, qVersion
20 from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
21
22 from E5Gui import E5MessageBox
23
24 import Preferences
25
26
27 class VirusTotalAPI(QObject):
28 """
29 Class implementing the <a href="http://www.virustotal.com">VirusTotal</a>
30 API.
31
32 @signal checkServiceKeyFinished(bool, str) emitted after the service key
33 check has been performed. It gives a flag indicating validity
34 (boolean) and an error message in case of a network error (string).
35 @signal submitUrlError(str) emitted with the error string, if the URL scan
36 submission returned an error.
37 @signal urlScanReport(str) emitted with the URL of the URL scan report page
38 @signal fileScanReport(str) emitted with the URL of the file scan report
39 page
40 """
41 checkServiceKeyFinished = pyqtSignal(bool, str)
42 submitUrlError = pyqtSignal(str)
43 urlScanReport = pyqtSignal(str)
44 fileScanReport = pyqtSignal(str)
45
46 TestServiceKeyScanID = \
47 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
48
49 ServiceResult_ItemQueued = -2
50 ServiceResult_ItemNotPresent = 0
51 ServiceResult_ItemPresent = 1
52
53 # HTTP Status Codes
54 ServiceCode_InvalidKey = 202
55 ServiceCode_RateLimitExceeded = 204
56 ServiceCode_InvalidPrivilege = 403
57
58 GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
59 ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
60 GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
61 GetIpAddressReportPattern = \
62 "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
63 GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
64
65 def __init__(self, parent=None):
66 """
67 Constructor
68
69 @param parent reference to the parent object (QObject)
70 """
71 super(VirusTotalAPI, self).__init__(parent)
72
73 self.__replies = []
74
75 self.__loadSettings()
76
77 self.__lastIP = ""
78 self.__lastDomain = ""
79 self.__ipReportDlg = None
80 self.__domainReportDlg = None
81
82 def __loadSettings(self):
83 """
84 Private method to load the settings.
85 """
86 if Preferences.getHelp("VirusTotalSecure"):
87 protocol = "https"
88 else:
89 protocol = "http"
90 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
91 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
92 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
93 self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
94 protocol)
95 self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
96
97 self.errorMessages = {
98 204: self.tr("Request limit has been reached."),
99 0: self.tr("Requested item is not present."),
100 -2: self.tr("Requested item is still queued."),
101 }
102
103 def preferencesChanged(self):
104 """
105 Public slot to handle a change of preferences.
106 """
107 self.__loadSettings()
108
109 def checkServiceKeyValidity(self, key, protocol=""):
110 """
111 Public method to check the validity of the given service key.
112
113 @param key service key (string)
114 @param protocol protocol used to access VirusTotal (string)
115 """
116 if protocol == "":
117 urlStr = self.GetFileReportUrl
118 else:
119 urlStr = self.GetFileReportPattern.format(protocol)
120 request = QNetworkRequest(QUrl(urlStr))
121 request.setHeader(QNetworkRequest.ContentTypeHeader,
122 "application/x-www-form-urlencoded")
123 params = QByteArray("apikey={0}&resource={1}".format(
124 key, self.TestServiceKeyScanID).encode("utf-8"))
125
126 import WebBrowser.WebBrowserWindow
127 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
128 .networkManager()
129 reply = nam.post(request, params)
130 reply.finished.connect(self.__checkServiceKeyValidityFinished)
131 self.__replies.append(reply)
132
133 def __checkServiceKeyValidityFinished(self):
134 """
135 Private slot to determine the result of the service key validity check.
136 """
137 res = False
138 msg = ""
139
140 reply = self.sender()
141 if reply.error() == QNetworkReply.NoError:
142 res = True
143 elif reply.error() == self.ServiceCode_InvalidKey:
144 res = False
145 else:
146 msg = reply.errorString()
147 self.__replies.remove(reply)
148 reply.deleteLater()
149
150 self.checkServiceKeyFinished.emit(res, msg)
151
152 def submitUrl(self, url):
153 """
154 Public method to submit an URL to be scanned.
155
156 @param url url to be scanned (QUrl)
157 """
158 request = QNetworkRequest(QUrl(self.ScanUrlUrl))
159 request.setHeader(QNetworkRequest.ContentTypeHeader,
160 "application/x-www-form-urlencoded")
161 params = QByteArray("apikey={0}&url=".format(
162 Preferences.getHelp("VirusTotalServiceKey")).encode("utf-8"))\
163 .append(QUrl.toPercentEncoding(url.toString()))
164
165 import WebBrowser.WebBrowserWindow
166 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
167 .networkManager()
168 reply = nam.post(request, params)
169 reply.finished.connect(self.__submitUrlFinished)
170 self.__replies.append(reply)
171
172 def __submitUrlFinished(self):
173 """
174 Private slot to determine the result of the URL scan submission.
175 """
176 reply = self.sender()
177 if reply.error() == QNetworkReply.NoError:
178 result = json.loads(str(reply.readAll(), "utf-8"))
179 if result["response_code"] == self.ServiceResult_ItemPresent:
180 self.urlScanReport.emit(result["permalink"])
181 self.__getUrlScanReportUrl(result["scan_id"])
182 else:
183 if result["response_code"] in self.errorMessages:
184 msg = self.errorMessages[result["response_code"]]
185 else:
186 msg = result["verbose_msg"]
187 self.submitUrlError.emit(msg)
188 elif reply.error() == self.ServiceCode_RateLimitExceeded:
189 self.submitUrlError.emit(
190 self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
191 else:
192 self.submitUrlError.emit(reply.errorString())
193 self.__replies.remove(reply)
194 reply.deleteLater()
195
196 def __getUrlScanReportUrl(self, scanId):
197 """
198 Private method to get the report URL for a URL scan.
199
200 @param scanId ID of the scan to get the report URL for (string)
201 """
202 request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
203 request.setHeader(QNetworkRequest.ContentTypeHeader,
204 "application/x-www-form-urlencoded")
205 params = QByteArray("apikey={0}&resource={1}".format(
206 Preferences.getHelp("VirusTotalServiceKey"), scanId)
207 .encode("utf-8"))
208
209 import WebBrowser.WebBrowserWindow
210 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
211 .networkManager()
212 reply = nam.post(request, params)
213 reply.finished.connect(self.__getUrlScanReportUrlFinished)
214 self.__replies.append(reply)
215
216 def __getUrlScanReportUrlFinished(self):
217 """
218 Private slot to determine the result of the URL scan report URL
219 request.
220 """
221 reply = self.sender()
222 if reply.error() == QNetworkReply.NoError:
223 result = json.loads(str(reply.readAll(), "utf-8"))
224 if "filescan_id" in result and result["filescan_id"] is not None:
225 self.__getFileScanReportUrl(result["filescan_id"])
226 self.__replies.remove(reply)
227 reply.deleteLater()
228
229 def __getFileScanReportUrl(self, scanId):
230 """
231 Private method to get the report URL for a file scan.
232
233 @param scanId ID of the scan to get the report URL for (string)
234 """
235 request = QNetworkRequest(QUrl(self.GetFileReportUrl))
236 request.setHeader(QNetworkRequest.ContentTypeHeader,
237 "application/x-www-form-urlencoded")
238 params = QByteArray("apikey={0}&resource={1}".format(
239 Preferences.getHelp("VirusTotalServiceKey"), scanId)
240 .encode("utf-8"))
241
242 import WebBrowser.WebBrowserWindow
243 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
244 .networkManager()
245 reply = nam.post(request, params)
246 reply.finished.connect(self.__getFileScanReportUrlFinished)
247 self.__replies.append(reply)
248
249 def __getFileScanReportUrlFinished(self):
250 """
251 Private slot to determine the result of the file scan report URL
252 request.
253 """
254 reply = self.sender()
255 if reply.error() == QNetworkReply.NoError:
256 result = json.loads(str(reply.readAll(), "utf-8"))
257 self.fileScanReport.emit(result["permalink"])
258 self.__replies.remove(reply)
259 reply.deleteLater()
260
261 def getIpAddressReport(self, ipAddress):
262 """
263 Public method to retrieve a report for an IP address.
264
265 @param ipAddress valid IPv4 address in dotted quad notation
266 @type str
267 """
268 self.__lastIP = ipAddress
269
270 queryItems = [
271 ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
272 ("ip", ipAddress),
273 ]
274 url = QUrl(self.GetIpAddressReportUrl)
275 if qVersion() >= "5.0.0":
276 from PyQt5.QtCore import QUrlQuery
277 query = QUrlQuery()
278 query.setQueryItems(queryItems)
279 url.setQuery(query)
280 else:
281 url.setQueryItems(queryItems)
282 request = QNetworkRequest(url)
283
284 import WebBrowser.WebBrowserWindow
285 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
286 .networkManager()
287 reply = nam.get(request)
288 reply.finished.connect(self.__getIpAddressReportFinished)
289 self.__replies.append(reply)
290
291 def __getIpAddressReportFinished(self):
292 """
293 Private slot to process the IP address report data.
294 """
295 reply = self.sender()
296 if reply.error() == QNetworkReply.NoError:
297 result = json.loads(str(reply.readAll(), "utf-8"))
298 if result["response_code"] == 0:
299 E5MessageBox.information(
300 None,
301 self.tr("VirusTotal IP Address Report"),
302 self.tr("""VirusTotal does not have any information for"""
303 """ the given IP address."""))
304 elif result["response_code"] == -1:
305 E5MessageBox.information(
306 None,
307 self.tr("VirusTotal IP Address Report"),
308 self.tr("""The submitted IP address is invalid."""))
309 else:
310 owner = result["as_owner"]
311 resolutions = result["resolutions"]
312 try:
313 urls = result["detected_urls"]
314 except KeyError:
315 urls = []
316
317 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
318 self.__ipReportDlg = VirusTotalIpReportDialog(
319 self.__lastIP, owner, resolutions, urls)
320 self.__ipReportDlg.show()
321 self.__replies.remove(reply)
322 reply.deleteLater()
323
324 def getDomainReport(self, domain):
325 """
326 Public method to retrieve a report for a domain.
327
328 @param domain domain name
329 @type str
330 """
331 self.__lastDomain = domain
332
333 queryItems = [
334 ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
335 ("domain", domain),
336 ]
337 url = QUrl(self.GetDomainReportUrl)
338 if qVersion() >= "5.0.0":
339 from PyQt5.QtCore import QUrlQuery
340 query = QUrlQuery()
341 query.setQueryItems(queryItems)
342 url.setQuery(query)
343 else:
344 url.setQueryItems(queryItems)
345 request = QNetworkRequest(url)
346
347 import WebBrowser.WebBrowserWindow
348 nam = WebBrowser.WebBrowserWindow.WebBrowserWindow\
349 .networkManager()
350 reply = nam.get(request)
351 reply.finished.connect(self.__getDomainReportFinished)
352 self.__replies.append(reply)
353
354 def __getDomainReportFinished(self):
355 """
356 Private slot to process the IP address report data.
357 """
358 reply = self.sender()
359 if reply.error() == QNetworkReply.NoError:
360 result = json.loads(str(reply.readAll(), "utf-8"))
361 if result["response_code"] == 0:
362 E5MessageBox.information(
363 None,
364 self.tr("VirusTotal Domain Report"),
365 self.tr("""VirusTotal does not have any information for"""
366 """ the given domain."""))
367 elif result["response_code"] == -1:
368 E5MessageBox.information(
369 None,
370 self.tr("VirusTotal Domain Report"),
371 self.tr("""The submitted domain address is invalid."""))
372 else:
373 resolutions = result["resolutions"]
374 try:
375 urls = result["detected_urls"]
376 except KeyError:
377 urls = []
378 try:
379 subdomains = result["subdomains"]
380 except KeyError:
381 subdomains = []
382 try:
383 bdCategory = result["BitDefender category"]
384 except KeyError:
385 bdCategory = self.tr("not available")
386 try:
387 tmCategory = result["TrendMicro category"]
388 except KeyError:
389 tmCategory = self.tr("not available")
390 try:
391 wtsCategory = result["Websense ThreatSeeker category"]
392 except KeyError:
393 wtsCategory = self.tr("not available")
394 try:
395 whois = result["whois"]
396 except KeyError:
397 whois = ""
398
399 from .VirusTotalDomainReportDialog import \
400 VirusTotalDomainReportDialog
401 self.__domainReportDlg = VirusTotalDomainReportDialog(
402 self.__lastDomain, resolutions, urls, subdomains,
403 bdCategory, tmCategory, wtsCategory, whois)
404 self.__domainReportDlg.show()
405 self.__replies.remove(reply)
406 reply.deleteLater()
407
408 def close(self):
409 """
410 Public slot to close the API.
411 """
412 for reply in self.__replies:
413 reply.abort()
414
415 self.__ipReportDlg and self.__ipReportDlg.close()
416 self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial