|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2011 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the <a href="http://www.virustotal.com">VirusTotal</a> |
|
8 API class. |
|
9 """ |
|
10 |
|
11 import contextlib |
|
12 import json |
|
13 |
|
14 from PyQt6.QtCore import pyqtSignal, QObject, QUrl, QUrlQuery, QByteArray |
|
15 from PyQt6.QtNetwork import QNetworkRequest, QNetworkReply |
|
16 |
|
17 from EricWidgets import EricMessageBox |
|
18 |
|
19 import Preferences |
|
20 |
|
21 |
|
22 class VirusTotalAPI(QObject): |
|
23 """ |
|
24 Class implementing the <a href="http://www.virustotal.com">VirusTotal</a> |
|
25 API. |
|
26 |
|
27 @signal checkServiceKeyFinished(bool, str) emitted after the service key |
|
28 check has been performed. It gives a flag indicating validity |
|
29 (boolean) and an error message in case of a network error (string). |
|
30 @signal submitUrlError(str) emitted with the error string, if the URL scan |
|
31 submission returned an error. |
|
32 @signal urlScanReport(str) emitted with the URL of the URL scan report page |
|
33 @signal fileScanReport(str) emitted with the URL of the file scan report |
|
34 page |
|
35 """ |
|
36 checkServiceKeyFinished = pyqtSignal(bool, str) |
|
37 submitUrlError = pyqtSignal(str) |
|
38 urlScanReport = pyqtSignal(str) |
|
39 fileScanReport = pyqtSignal(str) |
|
40 |
|
41 TestServiceKeyScanID = ( |
|
42 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad" |
|
43 ) |
|
44 |
|
45 ServiceResult_ItemQueued = -2 |
|
46 ServiceResult_ItemNotPresent = 0 |
|
47 ServiceResult_ItemPresent = 1 |
|
48 |
|
49 # HTTP Status Codes |
|
50 ServiceCode_InvalidKey = 202 |
|
51 ServiceCode_RateLimitExceeded = 204 |
|
52 ServiceCode_InvalidPrivilege = 403 |
|
53 |
|
54 GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report" |
|
55 ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan" |
|
56 GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report" |
|
57 GetIpAddressReportPattern = ( |
|
58 "{0}://www.virustotal.com/vtapi/v2/ip-address/report" |
|
59 ) |
|
60 GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report" |
|
61 |
|
62 def __init__(self, parent=None): |
|
63 """ |
|
64 Constructor |
|
65 |
|
66 @param parent reference to the parent object (QObject) |
|
67 """ |
|
68 super().__init__(parent) |
|
69 |
|
70 self.__replies = [] |
|
71 |
|
72 self.__loadSettings() |
|
73 |
|
74 self.__lastIP = "" |
|
75 self.__lastDomain = "" |
|
76 self.__ipReportDlg = None |
|
77 self.__domainReportDlg = None |
|
78 |
|
79 def __loadSettings(self): |
|
80 """ |
|
81 Private method to load the settings. |
|
82 """ |
|
83 protocol = ( |
|
84 "https" |
|
85 if Preferences.getWebBrowser("VirusTotalSecure") else |
|
86 "http" |
|
87 ) |
|
88 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol) |
|
89 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol) |
|
90 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol) |
|
91 self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format( |
|
92 protocol) |
|
93 self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol) |
|
94 |
|
95 self.errorMessages = { |
|
96 204: self.tr("Request limit has been reached."), |
|
97 0: self.tr("Requested item is not present."), |
|
98 -2: self.tr("Requested item is still queued."), |
|
99 } |
|
100 |
|
101 def preferencesChanged(self): |
|
102 """ |
|
103 Public slot to handle a change of preferences. |
|
104 """ |
|
105 self.__loadSettings() |
|
106 |
|
107 def checkServiceKeyValidity(self, key, protocol=""): |
|
108 """ |
|
109 Public method to check the validity of the given service key. |
|
110 |
|
111 @param key service key (string) |
|
112 @param protocol protocol used to access VirusTotal (string) |
|
113 """ |
|
114 urlStr = ( |
|
115 self.GetFileReportUrl |
|
116 if protocol == "" else |
|
117 self.GetFileReportPattern.format(protocol) |
|
118 ) |
|
119 request = QNetworkRequest(QUrl(urlStr)) |
|
120 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, |
|
121 "application/x-www-form-urlencoded") |
|
122 params = QByteArray("apikey={0}&resource={1}".format( |
|
123 key, self.TestServiceKeyScanID).encode("utf-8")) |
|
124 |
|
125 import WebBrowser.WebBrowserWindow |
|
126 nam = ( |
|
127 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() |
|
128 ) |
|
129 reply = nam.post(request, params) |
|
130 reply.finished.connect( |
|
131 lambda: self.__checkServiceKeyValidityFinished(reply)) |
|
132 self.__replies.append(reply) |
|
133 |
|
134 def __checkServiceKeyValidityFinished(self, reply): |
|
135 """ |
|
136 Private slot to determine the result of the service key validity check. |
|
137 |
|
138 @param reply reference to the network reply |
|
139 @type QNetworkReply |
|
140 """ |
|
141 res = False |
|
142 msg = "" |
|
143 |
|
144 if reply.error() == QNetworkReply.NetworkError.NoError: |
|
145 res = True |
|
146 elif reply.error() == self.ServiceCode_InvalidKey: |
|
147 res = False |
|
148 else: |
|
149 msg = reply.errorString() |
|
150 self.__replies.remove(reply) |
|
151 reply.deleteLater() |
|
152 |
|
153 self.checkServiceKeyFinished.emit(res, msg) |
|
154 |
|
155 def submitUrl(self, url): |
|
156 """ |
|
157 Public method to submit an URL to be scanned. |
|
158 |
|
159 @param url url to be scanned (QUrl) |
|
160 """ |
|
161 request = QNetworkRequest(QUrl(self.ScanUrlUrl)) |
|
162 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, |
|
163 "application/x-www-form-urlencoded") |
|
164 params = QByteArray("apikey={0}&url=".format( |
|
165 Preferences.getWebBrowser("VirusTotalServiceKey")) |
|
166 .encode("utf-8")).append(QUrl.toPercentEncoding(url.toString())) |
|
167 |
|
168 import WebBrowser.WebBrowserWindow |
|
169 nam = ( |
|
170 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() |
|
171 ) |
|
172 reply = nam.post(request, params) |
|
173 reply.finished.connect(lambda: self.__submitUrlFinished(reply)) |
|
174 self.__replies.append(reply) |
|
175 |
|
176 def __submitUrlFinished(self, reply): |
|
177 """ |
|
178 Private slot to determine the result of the URL scan submission. |
|
179 |
|
180 @param reply reference to the network reply |
|
181 @type QNetworkReply |
|
182 """ |
|
183 if reply.error() == QNetworkReply.NetworkError.NoError: |
|
184 result = json.loads(str(reply.readAll(), "utf-8")) |
|
185 if result["response_code"] == self.ServiceResult_ItemPresent: |
|
186 self.urlScanReport.emit(result["permalink"]) |
|
187 self.__getUrlScanReportUrl(result["scan_id"]) |
|
188 else: |
|
189 if result["response_code"] in self.errorMessages: |
|
190 msg = self.errorMessages[result["response_code"]] |
|
191 else: |
|
192 msg = result["verbose_msg"] |
|
193 self.submitUrlError.emit(msg) |
|
194 elif reply.error() == self.ServiceCode_RateLimitExceeded: |
|
195 self.submitUrlError.emit( |
|
196 self.errorMessages[result[self.ServiceCode_RateLimitExceeded]]) |
|
197 else: |
|
198 self.submitUrlError.emit(reply.errorString()) |
|
199 self.__replies.remove(reply) |
|
200 reply.deleteLater() |
|
201 |
|
202 def __getUrlScanReportUrl(self, scanId): |
|
203 """ |
|
204 Private method to get the report URL for a URL scan. |
|
205 |
|
206 @param scanId ID of the scan to get the report URL for (string) |
|
207 """ |
|
208 request = QNetworkRequest(QUrl(self.GetUrlReportUrl)) |
|
209 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, |
|
210 "application/x-www-form-urlencoded") |
|
211 params = QByteArray("apikey={0}&resource={1}".format( |
|
212 Preferences.getWebBrowser("VirusTotalServiceKey"), scanId) |
|
213 .encode("utf-8")) |
|
214 |
|
215 import WebBrowser.WebBrowserWindow |
|
216 nam = ( |
|
217 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() |
|
218 ) |
|
219 reply = nam.post(request, params) |
|
220 reply.finished.connect( |
|
221 lambda: self.__getUrlScanReportUrlFinished(reply)) |
|
222 self.__replies.append(reply) |
|
223 |
|
224 def __getUrlScanReportUrlFinished(self, reply): |
|
225 """ |
|
226 Private slot to determine the result of the URL scan report URL. |
|
227 |
|
228 @param reply reference to the network reply |
|
229 @type QNetworkReply |
|
230 request. |
|
231 """ |
|
232 if reply.error() == QNetworkReply.NetworkError.NoError: |
|
233 result = json.loads(str(reply.readAll(), "utf-8")) |
|
234 if "filescan_id" in result and result["filescan_id"] is not None: |
|
235 self.__getFileScanReportUrl(result["filescan_id"]) |
|
236 self.__replies.remove(reply) |
|
237 reply.deleteLater() |
|
238 |
|
239 def __getFileScanReportUrl(self, scanId): |
|
240 """ |
|
241 Private method to get the report URL for a file scan. |
|
242 |
|
243 @param scanId ID of the scan to get the report URL for (string) |
|
244 """ |
|
245 request = QNetworkRequest(QUrl(self.GetFileReportUrl)) |
|
246 request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, |
|
247 "application/x-www-form-urlencoded") |
|
248 params = QByteArray("apikey={0}&resource={1}".format( |
|
249 Preferences.getWebBrowser("VirusTotalServiceKey"), scanId) |
|
250 .encode("utf-8")) |
|
251 |
|
252 import WebBrowser.WebBrowserWindow |
|
253 nam = ( |
|
254 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() |
|
255 ) |
|
256 reply = nam.post(request, params) |
|
257 reply.finished.connect( |
|
258 lambda: self.__getFileScanReportUrlFinished(reply)) |
|
259 self.__replies.append(reply) |
|
260 |
|
261 def __getFileScanReportUrlFinished(self, reply): |
|
262 """ |
|
263 Private slot to determine the result of the file scan report URL |
|
264 request. |
|
265 |
|
266 @param reply reference to the network reply |
|
267 @type QNetworkReply |
|
268 """ |
|
269 if reply.error() == QNetworkReply.NetworkError.NoError: |
|
270 result = json.loads(str(reply.readAll(), "utf-8")) |
|
271 self.fileScanReport.emit(result["permalink"]) |
|
272 self.__replies.remove(reply) |
|
273 reply.deleteLater() |
|
274 |
|
275 def getIpAddressReport(self, ipAddress): |
|
276 """ |
|
277 Public method to retrieve a report for an IP address. |
|
278 |
|
279 @param ipAddress valid IPv4 address in dotted quad notation |
|
280 @type str |
|
281 """ |
|
282 self.__lastIP = ipAddress |
|
283 |
|
284 queryItems = [ |
|
285 ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")), |
|
286 ("ip", ipAddress), |
|
287 ] |
|
288 url = QUrl(self.GetIpAddressReportUrl) |
|
289 query = QUrlQuery() |
|
290 query.setQueryItems(queryItems) |
|
291 url.setQuery(query) |
|
292 request = QNetworkRequest(url) |
|
293 |
|
294 import WebBrowser.WebBrowserWindow |
|
295 nam = ( |
|
296 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() |
|
297 ) |
|
298 reply = nam.get(request) |
|
299 reply.finished.connect( |
|
300 lambda: self.__getIpAddressReportFinished(reply)) |
|
301 self.__replies.append(reply) |
|
302 |
|
303 def __getIpAddressReportFinished(self, reply): |
|
304 """ |
|
305 Private slot to process the IP address report data. |
|
306 |
|
307 @param reply reference to the network reply |
|
308 @type QNetworkReply |
|
309 """ |
|
310 if reply.error() == QNetworkReply.NetworkError.NoError: |
|
311 result = json.loads(str(reply.readAll(), "utf-8")) |
|
312 if result["response_code"] == 0: |
|
313 EricMessageBox.information( |
|
314 None, |
|
315 self.tr("VirusTotal IP Address Report"), |
|
316 self.tr("""VirusTotal does not have any information for""" |
|
317 """ the given IP address.""")) |
|
318 elif result["response_code"] == -1: |
|
319 EricMessageBox.information( |
|
320 None, |
|
321 self.tr("VirusTotal IP Address Report"), |
|
322 self.tr("""The submitted IP address is invalid.""")) |
|
323 else: |
|
324 owner = result["as_owner"] |
|
325 resolutions = result["resolutions"] |
|
326 try: |
|
327 urls = result["detected_urls"] |
|
328 except KeyError: |
|
329 urls = [] |
|
330 |
|
331 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog |
|
332 self.__ipReportDlg = VirusTotalIpReportDialog( |
|
333 self.__lastIP, owner, resolutions, urls) |
|
334 self.__ipReportDlg.show() |
|
335 self.__replies.remove(reply) |
|
336 reply.deleteLater() |
|
337 |
|
338 def getDomainReport(self, domain): |
|
339 """ |
|
340 Public method to retrieve a report for a domain. |
|
341 |
|
342 @param domain domain name |
|
343 @type str |
|
344 """ |
|
345 self.__lastDomain = domain |
|
346 |
|
347 queryItems = [ |
|
348 ("apikey", Preferences.getWebBrowser("VirusTotalServiceKey")), |
|
349 ("domain", domain), |
|
350 ] |
|
351 url = QUrl(self.GetDomainReportUrl) |
|
352 query = QUrlQuery() |
|
353 query.setQueryItems(queryItems) |
|
354 url.setQuery(query) |
|
355 request = QNetworkRequest(url) |
|
356 |
|
357 import WebBrowser.WebBrowserWindow |
|
358 nam = ( |
|
359 WebBrowser.WebBrowserWindow.WebBrowserWindow.networkManager() |
|
360 ) |
|
361 reply = nam.get(request) |
|
362 reply.finished.connect(lambda: self.__getDomainReportFinished(reply)) |
|
363 self.__replies.append(reply) |
|
364 |
|
365 def __getDomainReportFinished(self, reply): |
|
366 """ |
|
367 Private slot to process the IP address report data. |
|
368 |
|
369 @param reply reference to the network reply |
|
370 @type QNetworkReply |
|
371 """ |
|
372 if reply.error() == QNetworkReply.NetworkError.NoError: |
|
373 result = json.loads(str(reply.readAll(), "utf-8")) |
|
374 if result["response_code"] == 0: |
|
375 EricMessageBox.information( |
|
376 None, |
|
377 self.tr("VirusTotal Domain Report"), |
|
378 self.tr("""VirusTotal does not have any information for""" |
|
379 """ the given domain.""")) |
|
380 elif result["response_code"] == -1: |
|
381 EricMessageBox.information( |
|
382 None, |
|
383 self.tr("VirusTotal Domain Report"), |
|
384 self.tr("""The submitted domain address is invalid.""")) |
|
385 else: |
|
386 resolutions = result["resolutions"] |
|
387 try: |
|
388 urls = result["detected_urls"] |
|
389 except KeyError: |
|
390 urls = [] |
|
391 try: |
|
392 subdomains = result["subdomains"] |
|
393 except KeyError: |
|
394 subdomains = [] |
|
395 |
|
396 categoriesMapping = { |
|
397 "bitdefender": ("BitDefender category",), |
|
398 "sophos": ("sophos category", "Sophos category"), |
|
399 "valkyrie": ("Comodo Valkyrie Verdict category",), |
|
400 "alpha": ("alphaMountain.ai category",), |
|
401 "forcepoint": ("Forcepoint ThreatSeeker category",), |
|
402 } |
|
403 categories = {} |
|
404 for key, vtCategories in categoriesMapping.items(): |
|
405 for vtCategory in vtCategories: |
|
406 with contextlib.suppress(KeyError): |
|
407 categories[key] = result[vtCategory] |
|
408 break |
|
409 else: |
|
410 categories[key] = "--" |
|
411 try: |
|
412 whois = result["whois"] |
|
413 except KeyError: |
|
414 whois = "" |
|
415 |
|
416 webutationData = { |
|
417 "adult": "--", |
|
418 "safety": "--", |
|
419 "verdict": "--" |
|
420 } |
|
421 with contextlib.suppress(KeyError): |
|
422 webutation = result["Webutation domain info"] |
|
423 with contextlib.suppress(KeyError): |
|
424 webutationData["adult"] = webutation["Adult content"] |
|
425 with contextlib.suppress(KeyError): |
|
426 webutationData["safety"] = webutation["Safety score"] |
|
427 with contextlib.suppress(KeyError): |
|
428 webutationData["verdict"] = webutation["Verdict"] |
|
429 |
|
430 from .VirusTotalDomainReportDialog import ( |
|
431 VirusTotalDomainReportDialog |
|
432 ) |
|
433 self.__domainReportDlg = VirusTotalDomainReportDialog( |
|
434 self.__lastDomain, resolutions, urls, subdomains, |
|
435 categories, webutationData, whois) |
|
436 self.__domainReportDlg.show() |
|
437 self.__replies.remove(reply) |
|
438 reply.deleteLater() |
|
439 |
|
440 def close(self): |
|
441 """ |
|
442 Public slot to close the API. |
|
443 """ |
|
444 for reply in self.__replies: |
|
445 reply.abort() |
|
446 |
|
447 self.__ipReportDlg and self.__ipReportDlg.close() |
|
448 self.__domainReportDlg and self.__domainReportDlg.close() |