src/eric7/WebBrowser/VirusTotal/VirusTotalApi.py

branch
eric7
changeset 9209
b99e7fd55fd3
parent 8881
54e42bc2437a
child 9221
bf71ee032bb4
equal deleted inserted replaced
9208:3fc8dfeb6ebe 9209:b99e7fd55fd3
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2011 - 2022 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the <a href="http://www.virustotal.com">VirusTotal</a>
8 API class.
9 """
10
11 import contextlib
12 import json
13
14 from PyQt6.QtCore import pyqtSignal, QObject, QUrl, QUrlQuery, QByteArray
15 from PyQt6.QtNetwork import QNetworkRequest, QNetworkReply
16
17 from EricWidgets import EricMessageBox
18
19 import Preferences
20
21
22 class VirusTotalAPI(QObject):
23 """
24 Class implementing the <a href="http://www.virustotal.com">VirusTotal</a>
25 API.
26
27 @signal checkServiceKeyFinished(bool, str) emitted after the service key
28 check has been performed. It gives a flag indicating validity
29 (boolean) and an error message in case of a network error (string).
30 @signal submitUrlError(str) emitted with the error string, if the URL scan
31 submission returned an error.
32 @signal urlScanReport(str) emitted with the URL of the URL scan report page
33 @signal fileScanReport(str) emitted with the URL of the file scan report
34 page
35 """
36 checkServiceKeyFinished = pyqtSignal(bool, str)
37 submitUrlError = pyqtSignal(str)
38 urlScanReport = pyqtSignal(str)
39 fileScanReport = pyqtSignal(str)
40
41 TestServiceKeyScanID = (
42 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
43 )
44
45 ServiceResult_ItemQueued = -2
46 ServiceResult_ItemNotPresent = 0
47 ServiceResult_ItemPresent = 1
48
49 # HTTP Status Codes
50 ServiceCode_InvalidKey = 202
51 ServiceCode_RateLimitExceeded = 204
52 ServiceCode_InvalidPrivilege = 403
53
54 GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
55 ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
56 GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
57 GetIpAddressReportPattern = (
58 "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
59 )
60 GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
61
62 def __init__(self, parent=None):
63 """
64 Constructor
65
66 @param parent reference to the parent object (QObject)
67 """
68 super().__init__(parent)
69
70 self.__replies = []
71
72 self.__loadSettings()
73
74 self.__lastIP = ""
75 self.__lastDomain = ""
76 self.__ipReportDlg = None
77 self.__domainReportDlg = None
78
79 def __loadSettings(self):
80 """
81 Private method to load the settings.
82 """
83 protocol = (
84 "https"
85 if Preferences.getWebBrowser("VirusTotalSecure") else
86 "http"
87 )
88 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
89 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
90 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
91 self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
92 protocol)
93 self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
94
95 self.errorMessages = {
96 204: self.tr("Request limit has been reached."),
97 0: self.tr("Requested item is not present."),
98 -2: self.tr("Requested item is still queued."),
99 }
100
101 def preferencesChanged(self):
102 """
103 Public slot to handle a change of preferences.
104 """
105 self.__loadSettings()
106
107 def checkServiceKeyValidity(self, key, protocol=""):
108 """
109 Public method to check the validity of the given service key.
110
111 @param key service key (string)
112 @param protocol protocol used to access VirusTotal (string)
113 """
114 urlStr = (
115 self.GetFileReportUrl
116 if protocol == "" else
117 self.GetFileReportPattern.format(protocol)
118 )
119 request = QNetworkRequest(QUrl(urlStr))
120 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
121 "application/x-www-form-urlencoded")
122 params = QByteArray("apikey={0}&resource={1}".format(
123 key, self.TestServiceKeyScanID).encode("utf-8"))
124
125 import WebBrowser.WebBrowserWindow
126 nam = (
127 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
128 )
129 reply = nam.post(request, params)
130 reply.finished.connect(
131 lambda: self.__checkServiceKeyValidityFinished(reply))
132 self.__replies.append(reply)
133
134 def __checkServiceKeyValidityFinished(self, reply):
135 """
136 Private slot to determine the result of the service key validity check.
137
138 @param reply reference to the network reply
139 @type QNetworkReply
140 """
141 res = False
142 msg = ""
143
144 if reply.error() == QNetworkReply.NetworkError.NoError:
145 res = True
146 elif reply.error() == self.ServiceCode_InvalidKey:
147 res = False
148 else:
149 msg = reply.errorString()
150 self.__replies.remove(reply)
151 reply.deleteLater()
152
153 self.checkServiceKeyFinished.emit(res, msg)
154
155 def submitUrl(self, url):
156 """
157 Public method to submit an URL to be scanned.
158
159 @param url url to be scanned (QUrl)
160 """
161 request = QNetworkRequest(QUrl(self.ScanUrlUrl))
162 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
163 "application/x-www-form-urlencoded")
164 params = QByteArray("apikey={0}&url=".format(
165 Preferences.getWebBrowser("VirusTotalServiceKey"))
166 .encode("utf-8")).append(QUrl.toPercentEncoding(url.toString()))
167
168 import WebBrowser.WebBrowserWindow
169 nam = (
170 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
171 )
172 reply = nam.post(request, params)
173 reply.finished.connect(lambda: self.__submitUrlFinished(reply))
174 self.__replies.append(reply)
175
176 def __submitUrlFinished(self, reply):
177 """
178 Private slot to determine the result of the URL scan submission.
179
180 @param reply reference to the network reply
181 @type QNetworkReply
182 """
183 if reply.error() == QNetworkReply.NetworkError.NoError:
184 result = json.loads(str(reply.readAll(), "utf-8"))
185 if result["response_code"] == self.ServiceResult_ItemPresent:
186 self.urlScanReport.emit(result["permalink"])
187 self.__getUrlScanReportUrl(result["scan_id"])
188 else:
189 if result["response_code"] in self.errorMessages:
190 msg = self.errorMessages[result["response_code"]]
191 else:
192 msg = result["verbose_msg"]
193 self.submitUrlError.emit(msg)
194 elif reply.error() == self.ServiceCode_RateLimitExceeded:
195 self.submitUrlError.emit(
196 self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
197 else:
198 self.submitUrlError.emit(reply.errorString())
199 self.__replies.remove(reply)
200 reply.deleteLater()
201
202 def __getUrlScanReportUrl(self, scanId):
203 """
204 Private method to get the report URL for a URL scan.
205
206 @param scanId ID of the scan to get the report URL for (string)
207 """
208 request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
209 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
210 "application/x-www-form-urlencoded")
211 params = QByteArray("apikey={0}&resource={1}".format(
212 Preferences.getWebBrowser("VirusTotalServiceKey"), scanId)
213 .encode("utf-8"))
214
215 import WebBrowser.WebBrowserWindow
216 nam = (
217 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
218 )
219 reply = nam.post(request, params)
220 reply.finished.connect(
221 lambda: self.__getUrlScanReportUrlFinished(reply))
222 self.__replies.append(reply)
223
224 def __getUrlScanReportUrlFinished(self, reply):
225 """
226 Private slot to determine the result of the URL scan report URL.
227
228 @param reply reference to the network reply
229 @type QNetworkReply
230 request.
231 """
232 if reply.error() == QNetworkReply.NetworkError.NoError:
233 result = json.loads(str(reply.readAll(), "utf-8"))
234 if "filescan_id" in result and result["filescan_id"] is not None:
235 self.__getFileScanReportUrl(result["filescan_id"])
236 self.__replies.remove(reply)
237 reply.deleteLater()
238
239 def __getFileScanReportUrl(self, scanId):
240 """
241 Private method to get the report URL for a file scan.
242
243 @param scanId ID of the scan to get the report URL for (string)
244 """
245 request = QNetworkRequest(QUrl(self.GetFileReportUrl))
246 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
247 "application/x-www-form-urlencoded")
248 params = QByteArray("apikey={0}&resource={1}".format(
249 Preferences.getWebBrowser("VirusTotalServiceKey"), scanId)
250 .encode("utf-8"))
251
252 import WebBrowser.WebBrowserWindow
253 nam = (
254 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
255 )
256 reply = nam.post(request, params)
257 reply.finished.connect(
258 lambda: self.__getFileScanReportUrlFinished(reply))
259 self.__replies.append(reply)
260
261 def __getFileScanReportUrlFinished(self, reply):
262 """
263 Private slot to determine the result of the file scan report URL
264 request.
265
266 @param reply reference to the network reply
267 @type QNetworkReply
268 """
269 if reply.error() == QNetworkReply.NetworkError.NoError:
270 result = json.loads(str(reply.readAll(), "utf-8"))
271 self.fileScanReport.emit(result["permalink"])
272 self.__replies.remove(reply)
273 reply.deleteLater()
274
275 def getIpAddressReport(self, ipAddress):
276 """
277 Public method to retrieve a report for an IP address.
278
279 @param ipAddress valid IPv4 address in dotted quad notation
280 @type str
281 """
282 self.__lastIP = ipAddress
283
284 queryItems = [
285 ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")),
286 ("ip", ipAddress),
287 ]
288 url = QUrl(self.GetIpAddressReportUrl)
289 query = QUrlQuery()
290 query.setQueryItems(queryItems)
291 url.setQuery(query)
292 request = QNetworkRequest(url)
293
294 import WebBrowser.WebBrowserWindow
295 nam = (
296 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
297 )
298 reply = nam.get(request)
299 reply.finished.connect(
300 lambda: self.__getIpAddressReportFinished(reply))
301 self.__replies.append(reply)
302
303 def __getIpAddressReportFinished(self, reply):
304 """
305 Private slot to process the IP address report data.
306
307 @param reply reference to the network reply
308 @type QNetworkReply
309 """
310 if reply.error() == QNetworkReply.NetworkError.NoError:
311 result = json.loads(str(reply.readAll(), "utf-8"))
312 if result["response_code"] == 0:
313 EricMessageBox.information(
314 None,
315 self.tr("VirusTotal IP Address Report"),
316 self.tr("""VirusTotal does not have any information for"""
317 """ the given IP address."""))
318 elif result["response_code"] == -1:
319 EricMessageBox.information(
320 None,
321 self.tr("VirusTotal IP Address Report"),
322 self.tr("""The submitted IP address is invalid."""))
323 else:
324 owner = result["as_owner"]
325 resolutions = result["resolutions"]
326 try:
327 urls = result["detected_urls"]
328 except KeyError:
329 urls = []
330
331 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
332 self.__ipReportDlg = VirusTotalIpReportDialog(
333 self.__lastIP, owner, resolutions, urls)
334 self.__ipReportDlg.show()
335 self.__replies.remove(reply)
336 reply.deleteLater()
337
338 def getDomainReport(self, domain):
339 """
340 Public method to retrieve a report for a domain.
341
342 @param domain domain name
343 @type str
344 """
345 self.__lastDomain = domain
346
347 queryItems = [
348 ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")),
349 ("domain", domain),
350 ]
351 url = QUrl(self.GetDomainReportUrl)
352 query = QUrlQuery()
353 query.setQueryItems(queryItems)
354 url.setQuery(query)
355 request = QNetworkRequest(url)
356
357 import WebBrowser.WebBrowserWindow
358 nam = (
359 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager()
360 )
361 reply = nam.get(request)
362 reply.finished.connect(lambda: self.__getDomainReportFinished(reply))
363 self.__replies.append(reply)
364
365 def __getDomainReportFinished(self, reply):
366 """
367 Private slot to process the IP address report data.
368
369 @param reply reference to the network reply
370 @type QNetworkReply
371 """
372 if reply.error() == QNetworkReply.NetworkError.NoError:
373 result = json.loads(str(reply.readAll(), "utf-8"))
374 if result["response_code"] == 0:
375 EricMessageBox.information(
376 None,
377 self.tr("VirusTotal Domain Report"),
378 self.tr("""VirusTotal does not have any information for"""
379 """ the given domain."""))
380 elif result["response_code"] == -1:
381 EricMessageBox.information(
382 None,
383 self.tr("VirusTotal Domain Report"),
384 self.tr("""The submitted domain address is invalid."""))
385 else:
386 resolutions = result["resolutions"]
387 try:
388 urls = result["detected_urls"]
389 except KeyError:
390 urls = []
391 try:
392 subdomains = result["subdomains"]
393 except KeyError:
394 subdomains = []
395
396 categoriesMapping = {
397 "bitdefender": ("BitDefender category",),
398 "sophos": ("sophos category", "Sophos category"),
399 "valkyrie": ("Comodo Valkyrie Verdict category",),
400 "alpha": ("alphaMountain.ai category",),
401 "forcepoint": ("Forcepoint ThreatSeeker category",),
402 }
403 categories = {}
404 for key, vtCategories in categoriesMapping.items():
405 for vtCategory in vtCategories:
406 with contextlib.suppress(KeyError):
407 categories[key] = result[vtCategory]
408 break
409 else:
410 categories[key] = "--"
411 try:
412 whois = result["whois"]
413 except KeyError:
414 whois = ""
415
416 webutationData = {
417 "adult": "--",
418 "safety": "--",
419 "verdict": "--"
420 }
421 with contextlib.suppress(KeyError):
422 webutation = result["Webutation domain info"]
423 with contextlib.suppress(KeyError):
424 webutationData["adult"] = webutation["Adult content"]
425 with contextlib.suppress(KeyError):
426 webutationData["safety"] = webutation["Safety score"]
427 with contextlib.suppress(KeyError):
428 webutationData["verdict"] = webutation["Verdict"]
429
430 from .VirusTotalDomainReportDialog import (
431 VirusTotalDomainReportDialog
432 )
433 self.__domainReportDlg = VirusTotalDomainReportDialog(
434 self.__lastDomain, resolutions, urls, subdomains,
435 categories, webutationData, whois)
436 self.__domainReportDlg.show()
437 self.__replies.remove(reply)
438 reply.deleteLater()
439
440 def close(self):
441 """
442 Public slot to close the API.
443 """
444 for reply in self.__replies:
445 reply.abort()
446
447 self.__ipReportDlg and self.__ipReportDlg.close()
448 self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial