eric7/WebBrowser/VirusTotal/VirusTotalApi.py

branch
eric7
changeset 8312
800c432b34c8
parent 8260
2161475d9639
child 8318
962bce857696
equal deleted inserted replaced
8311:4e8b98454baa 8312:800c432b34c8
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2011 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the <a href="http://www.virustotal.com">VirusTotal</a>
8 API class.
9 """
10
11 import json
12
13 from PyQt5.QtCore import pyqtSignal, QObject, QUrl, QUrlQuery, QByteArray
14 from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
15
16 from E5Gui import E5MessageBox
17
18 import Preferences
19
20
21 class VirusTotalAPI(QObject):
22 """
23 Class implementing the <a href="http://www.virustotal.com">VirusTotal</a>
24 API.
25
26 @signal checkServiceKeyFinished(bool, str) emitted after the service key
27 check has been performed. It gives a flag indicating validity
28 (boolean) and an error message in case of a network error (string).
29 @signal submitUrlError(str) emitted with the error string, if the URL scan
30 submission returned an error.
31 @signal urlScanReport(str) emitted with the URL of the URL scan report page
32 @signal fileScanReport(str) emitted with the URL of the file scan report
33 page
34 """
35 checkServiceKeyFinished = pyqtSignal(bool, str)
36 submitUrlError = pyqtSignal(str)
37 urlScanReport = pyqtSignal(str)
38 fileScanReport = pyqtSignal(str)
39
40 TestServiceKeyScanID = (
41 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
42 )
43
44 ServiceResult_ItemQueued = -2
45 ServiceResult_ItemNotPresent = 0
46 ServiceResult_ItemPresent = 1
47
48 # HTTP Status Codes
49 ServiceCode_InvalidKey = 202
50 ServiceCode_RateLimitExceeded = 204
51 ServiceCode_InvalidPrivilege = 403
52
53 GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
54 ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
55 GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
56 GetIpAddressReportPattern = (
57 "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
58 )
59 GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
60
61 def __init__(self, parent=None):
62 """
63 Constructor
64
65 @param parent reference to the parent object (QObject)
66 """
67 super().__init__(parent)
68
69 self.__replies = []
70
71 self.__loadSettings()
72
73 self.__lastIP = ""
74 self.__lastDomain = ""
75 self.__ipReportDlg = None
76 self.__domainReportDlg = None
77
78 def __loadSettings(self):
79 """
80 Private method to load the settings.
81 """
82 protocol = (
83 "https"
84 if Preferences.getWebBrowser("VirusTotalSecure") else
85 "http"
86 )
87 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
88 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
89 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
90 self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
91 protocol)
92 self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
93
94 self.errorMessages = {
95 204: self.tr("Request limit has been reached."),
96 0: self.tr("Requested item is not present."),
97 -2: self.tr("Requested item is still queued."),
98 }
99
100 def preferencesChanged(self):
101 """
102 Public slot to handle a change of preferences.
103 """
104 self.__loadSettings()
105
106 def checkServiceKeyValidity(self, key, protocol=""):
107 """
108 Public method to check the validity of the given service key.
109
110 @param key service key (string)
111 @param protocol protocol used to access VirusTotal (string)
112 """
113 urlStr = (
114 self.GetFileReportUrl
115 if protocol == "" else
116 self.GetFileReportPattern.format(protocol)
117 )
118 request = QNetworkRequest(QUrl(urlStr))
119 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
120 "application/x-www-form-urlencoded")
121 params = QByteArray("apikey={0}&resource={1}".format(
122 key, self.TestServiceKeyScanID).encode("utf-8"))
123
124 import WebBrowser.WebBrowserWindow
125 nam = (
126 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
127 )
128 reply = nam.post(request, params)
129 reply.finished.connect(
130 lambda: self.__checkServiceKeyValidityFinished(reply))
131 self.__replies.append(reply)
132
133 def __checkServiceKeyValidityFinished(self, reply):
134 """
135 Private slot to determine the result of the service key validity check.
136
137 @param reply reference to the network reply
138 @type QNetworkReply
139 """
140 res = False
141 msg = ""
142
143 if reply.error() == QNetworkReply.NetworkError.NoError:
144 res = True
145 elif reply.error() == self.ServiceCode_InvalidKey:
146 res = False
147 else:
148 msg = reply.errorString()
149 self.__replies.remove(reply)
150 reply.deleteLater()
151
152 self.checkServiceKeyFinished.emit(res, msg)
153
154 def submitUrl(self, url):
155 """
156 Public method to submit an URL to be scanned.
157
158 @param url url to be scanned (QUrl)
159 """
160 request = QNetworkRequest(QUrl(self.ScanUrlUrl))
161 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
162 "application/x-www-form-urlencoded")
163 params = QByteArray("apikey={0}&url=".format(
164 Preferences.getWebBrowser("VirusTotalServiceKey"))
165 .encode("utf-8")).append(QUrl.toPercentEncoding(url.toString()))
166
167 import WebBrowser.WebBrowserWindow
168 nam = (
169 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
170 )
171 reply = nam.post(request, params)
172 reply.finished.connect(self.__submitUrlFinished)
173 self.__replies.append(reply)
174
175 def __submitUrlFinished(self, reply):
176 """
177 Private slot to determine the result of the URL scan submission.
178
179 @param reply reference to the network reply
180 @type QNetworkReply
181 """
182 if reply.error() == QNetworkReply.NetworkError.NoError:
183 result = json.loads(str(reply.readAll(), "utf-8"))
184 if result["response_code"] == self.ServiceResult_ItemPresent:
185 self.urlScanReport.emit(result["permalink"])
186 self.__getUrlScanReportUrl(result["scan_id"])
187 else:
188 if result["response_code"] in self.errorMessages:
189 msg = self.errorMessages[result["response_code"]]
190 else:
191 msg = result["verbose_msg"]
192 self.submitUrlError.emit(msg)
193 elif reply.error() == self.ServiceCode_RateLimitExceeded:
194 self.submitUrlError.emit(
195 self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
196 else:
197 self.submitUrlError.emit(reply.errorString())
198 self.__replies.remove(reply)
199 reply.deleteLater()
200
201 def __getUrlScanReportUrl(self, scanId):
202 """
203 Private method to get the report URL for a URL scan.
204
205 @param scanId ID of the scan to get the report URL for (string)
206 """
207 request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
208 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
209 "application/x-www-form-urlencoded")
210 params = QByteArray("apikey={0}&resource={1}".format(
211 Preferences.getWebBrowser("VirusTotalServiceKey"), scanId)
212 .encode("utf-8"))
213
214 import WebBrowser.WebBrowserWindow
215 nam = (
216 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
217 )
218 reply = nam.post(request, params)
219 reply.finished.connect(self.__getUrlScanReportUrlFinished)
220 self.__replies.append(reply)
221
222 def __getUrlScanReportUrlFinished(self, reply):
223 """
224 Private slot to determine the result of the URL scan report URL.
225
226 @param reply reference to the network reply
227 @type QNetworkReply
228 request.
229 """
230 if reply.error() == QNetworkReply.NetworkError.NoError:
231 result = json.loads(str(reply.readAll(), "utf-8"))
232 if "filescan_id" in result and result["filescan_id"] is not None:
233 self.__getFileScanReportUrl(result["filescan_id"])
234 self.__replies.remove(reply)
235 reply.deleteLater()
236
237 def __getFileScanReportUrl(self, scanId):
238 """
239 Private method to get the report URL for a file scan.
240
241 @param scanId ID of the scan to get the report URL for (string)
242 """
243 request = QNetworkRequest(QUrl(self.GetFileReportUrl))
244 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
245 "application/x-www-form-urlencoded")
246 params = QByteArray("apikey={0}&resource={1}".format(
247 Preferences.getWebBrowser("VirusTotalServiceKey"), scanId)
248 .encode("utf-8"))
249
250 import WebBrowser.WebBrowserWindow
251 nam = (
252 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
253 )
254 reply = nam.post(request, params)
255 reply.finished.connect(self.__getFileScanReportUrlFinished)
256 self.__replies.append(reply)
257
258 def __getFileScanReportUrlFinished(self, reply):
259 """
260 Private slot to determine the result of the file scan report URL
261 request.
262
263 @param reply reference to the network reply
264 @type QNetworkReply
265 """
266 if reply.error() == QNetworkReply.NetworkError.NoError:
267 result = json.loads(str(reply.readAll(), "utf-8"))
268 self.fileScanReport.emit(result["permalink"])
269 self.__replies.remove(reply)
270 reply.deleteLater()
271
272 def getIpAddressReport(self, ipAddress):
273 """
274 Public method to retrieve a report for an IP address.
275
276 @param ipAddress valid IPv4 address in dotted quad notation
277 @type str
278 """
279 self.__lastIP = ipAddress
280
281 queryItems = [
282 ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")),
283 ("ip", ipAddress),
284 ]
285 url = QUrl(self.GetIpAddressReportUrl)
286 query = QUrlQuery()
287 query.setQueryItems(queryItems)
288 url.setQuery(query)
289 request = QNetworkRequest(url)
290
291 import WebBrowser.WebBrowserWindow
292 nam = (
293 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
294 )
295 reply = nam.get(request)
296 reply.finished.connect(self.__getIpAddressReportFinished)
297 self.__replies.append(reply)
298
299 def __getIpAddressReportFinished(self, reply):
300 """
301 Private slot to process the IP address report data.
302
303 @param reply reference to the network reply
304 @type QNetworkReply
305 """
306 if reply.error() == QNetworkReply.NetworkError.NoError:
307 result = json.loads(str(reply.readAll(), "utf-8"))
308 if result["response_code"] == 0:
309 E5MessageBox.information(
310 None,
311 self.tr("VirusTotal IP Address Report"),
312 self.tr("""VirusTotal does not have any information for"""
313 """ the given IP address."""))
314 elif result["response_code"] == -1:
315 E5MessageBox.information(
316 None,
317 self.tr("VirusTotal IP Address Report"),
318 self.tr("""The submitted IP address is invalid."""))
319 else:
320 owner = result["as_owner"]
321 resolutions = result["resolutions"]
322 try:
323 urls = result["detected_urls"]
324 except KeyError:
325 urls = []
326
327 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
328 self.__ipReportDlg = VirusTotalIpReportDialog(
329 self.__lastIP, owner, resolutions, urls)
330 self.__ipReportDlg.show()
331 self.__replies.remove(reply)
332 reply.deleteLater()
333
334 def getDomainReport(self, domain):
335 """
336 Public method to retrieve a report for a domain.
337
338 @param domain domain name
339 @type str
340 """
341 self.__lastDomain = domain
342
343 queryItems = [
344 ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")),
345 ("domain", domain),
346 ]
347 url = QUrl(self.GetDomainReportUrl)
348 query = QUrlQuery()
349 query.setQueryItems(queryItems)
350 url.setQuery(query)
351 request = QNetworkRequest(url)
352
353 import WebBrowser.WebBrowserWindow
354 nam = (
355 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
356 )
357 reply = nam.get(request)
358 reply.finished.connect(lambda: self.__getDomainReportFinished(reply))
359 self.__replies.append(reply)
360
361 def __getDomainReportFinished(self, reply):
362 """
363 Private slot to process the IP address report data.
364
365 @param reply reference to the network reply
366 @type QNetworkReply
367 """
368 if reply.error() == QNetworkReply.NetworkError.NoError:
369 result = json.loads(str(reply.readAll(), "utf-8"))
370 if result["response_code"] == 0:
371 E5MessageBox.information(
372 None,
373 self.tr("VirusTotal Domain Report"),
374 self.tr("""VirusTotal does not have any information for"""
375 """ the given domain."""))
376 elif result["response_code"] == -1:
377 E5MessageBox.information(
378 None,
379 self.tr("VirusTotal Domain Report"),
380 self.tr("""The submitted domain address is invalid."""))
381 else:
382 resolutions = result["resolutions"]
383 try:
384 urls = result["detected_urls"]
385 except KeyError:
386 urls = []
387 try:
388 subdomains = result["subdomains"]
389 except KeyError:
390 subdomains = []
391 try:
392 bdCategory = result["BitDefender category"]
393 except KeyError:
394 bdCategory = self.tr("not available")
395 try:
396 tmCategory = result["TrendMicro category"]
397 except KeyError:
398 tmCategory = self.tr("not available")
399 try:
400 wtsCategory = result["Websense ThreatSeeker category"]
401 except KeyError:
402 wtsCategory = self.tr("not available")
403 try:
404 whois = result["whois"]
405 except KeyError:
406 whois = ""
407
408 from .VirusTotalDomainReportDialog import (
409 VirusTotalDomainReportDialog
410 )
411 self.__domainReportDlg = VirusTotalDomainReportDialog(
412 self.__lastDomain, resolutions, urls, subdomains,
413 bdCategory, tmCategory, wtsCategory, whois)
414 self.__domainReportDlg.show()
415 self.__replies.remove(reply)
416 reply.deleteLater()
417
418 def close(self):
419 """
420 Public slot to close the API.
421 """
422 for reply in self.__replies:
423 reply.abort()
424
425 self.__ipReportDlg and self.__ipReportDlg.close()
426 self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial