Helpviewer/VirusTotalApi.py

changeset 4335
a25c157625c4
parent 4332
64034d85c709
equal deleted inserted replaced
4334:423ddcc8c815 4335:a25c157625c4
14 except NameError: 14 except NameError:
15 pass 15 pass
16 16
17 import json 17 import json
18 18
19 from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal 19 from PyQt5.QtCore import QObject, QUrl, QByteArray, pyqtSignal, qVersion
20 from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply, \ 20 from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
21 QNetworkAccessManager 21
22 from E5Gui import E5MessageBox
22 23
23 import Preferences 24 import Preferences
24 25
25 26
26 class VirusTotalAPI(QObject): 27 class VirusTotalAPI(QObject):
43 fileScanReport = pyqtSignal(str) 44 fileScanReport = pyqtSignal(str)
44 45
45 TestServiceKeyScanID = \ 46 TestServiceKeyScanID = \
46 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad" 47 "4feed2c2e352f105f6188efd1d5a558f24aee6971bdf96d5fdb19c197d6d3fad"
47 48
48 ServiceResult_RequestLimitReached = -2 49 ServiceResult_ItemQueued = -2
49 ServiceResult_InvalidServiceKey = -1
50 ServiceResult_ItemNotPresent = 0 50 ServiceResult_ItemNotPresent = 0
51 ServiceResult_ItemPresent = 1 51 ServiceResult_ItemPresent = 1
52 52
53 GetFileReportPattern = "{0}://www.virustotal.com/api/get_file_report.json" 53 # HTTP Status Codes
54 ScanUrlPattern = "{0}://www.virustotal.com/api/scan_url.json" 54 ServiceCode_InvalidKey = 202
55 GetUrlReportPattern = "{0}://www.virustotal.com/api/get_url_report.json" 55 ServiceCode_RateLimitExceeded = 204
56 56 ServiceCode_InvalidPrivilege = 403
57 ReportUrlScanPagePattern = \ 57
58 "http://www.virustotal.com/url-scan/report.html?id={0}" 58 GetFileReportPattern = "{0}://www.virustotal.com/vtapi/v2/file/report"
59 ReportFileScanPagePattern = \ 59 ScanUrlPattern = "{0}://www.virustotal.com/vtapi/v2/url/scan"
60 "http://www.virustotal.com/file-scan/report.html?id={0}" 60 GetUrlReportPattern = "{0}://www.virustotal.com/vtapi/v2/url/report"
61 61 GetIpAddressReportPattern = \
62 SearchUrl = "http://www.virustotal.com/search.html" 62 "{0}://www.virustotal.com/vtapi/v2/ip-address/report"
63 GetDomainReportPattern = "{0}://www.virustotal.com/vtapi/v2/domain/report"
63 64
64 def __init__(self, parent=None): 65 def __init__(self, parent=None):
65 """ 66 """
66 Constructor 67 Constructor
67 68
70 super(VirusTotalAPI, self).__init__(parent) 71 super(VirusTotalAPI, self).__init__(parent)
71 72
72 self.__replies = [] 73 self.__replies = []
73 74
74 self.__loadSettings() 75 self.__loadSettings()
76
77 self.__lastIP = ""
78 self.__lastDomain = ""
79 self.__ipReportDlg = None
80 self.__domainReportDlg = None
75 81
76 def __loadSettings(self): 82 def __loadSettings(self):
77 """ 83 """
78 Private method to load the settings. 84 Private method to load the settings.
79 """ 85 """
82 else: 88 else:
83 protocol = "http" 89 protocol = "http"
84 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol) 90 self.GetFileReportUrl = self.GetFileReportPattern.format(protocol)
85 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol) 91 self.ScanUrlUrl = self.ScanUrlPattern.format(protocol)
86 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol) 92 self.GetUrlReportUrl = self.GetUrlReportPattern.format(protocol)
93 self.GetIpAddressReportUrl = self.GetIpAddressReportPattern.format(
94 protocol)
95 self.GetDomainReportUrl = self.GetDomainReportPattern.format(protocol)
87 96
88 self.errorMessages = { 97 self.errorMessages = {
89 -2: self.tr("Request limit has been reached."), 98 204: self.tr("Request limit has been reached."),
90 -1: self.tr("Invalid key given."), 99 0: self.tr("Requested item is not present."),
91 0: self.tr("Requested item is not present.") 100 -2: self.tr("Requested item is still queued."),
92 } 101 }
93 102
94 def preferencesChanged(self): 103 def preferencesChanged(self):
95 """ 104 """
96 Public slot to handle a change of preferences. 105 Public slot to handle a change of preferences.
109 else: 118 else:
110 urlStr = self.GetFileReportPattern.format(protocol) 119 urlStr = self.GetFileReportPattern.format(protocol)
111 request = QNetworkRequest(QUrl(urlStr)) 120 request = QNetworkRequest(QUrl(urlStr))
112 request.setHeader(QNetworkRequest.ContentTypeHeader, 121 request.setHeader(QNetworkRequest.ContentTypeHeader,
113 "application/x-www-form-urlencoded") 122 "application/x-www-form-urlencoded")
114 params = QByteArray("key={0}&resource={1}".format( 123 params = QByteArray("apikey={0}&resource={1}".format(
115 key, self.TestServiceKeyScanID).encode("utf-8")) 124 key, self.TestServiceKeyScanID).encode("utf-8"))
116 125
117 import Helpviewer.HelpWindow 126 import Helpviewer.HelpWindow
118 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() 127 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
119 reply = nam.post(request, params) 128 reply = nam.post(request, params)
127 res = False 136 res = False
128 msg = "" 137 msg = ""
129 138
130 reply = self.sender() 139 reply = self.sender()
131 if reply.error() == QNetworkReply.NoError: 140 if reply.error() == QNetworkReply.NoError:
132 result = json.loads(str(reply.readAll(), "utf-8")) 141 res = True
133 if result["result"] != self.ServiceResult_InvalidServiceKey: 142 elif reply.error() == self.ServiceCode_InvalidKey:
134 res = True 143 res = False
135 else: 144 else:
136 msg = reply.errorString() 145 msg = reply.errorString()
137 self.__replies.remove(reply) 146 self.__replies.remove(reply)
147 reply.deleteLater()
138 148
139 self.checkServiceKeyFinished.emit(res, msg) 149 self.checkServiceKeyFinished.emit(res, msg)
140 150
141 def submitUrl(self, url): 151 def submitUrl(self, url):
142 """ 152 """
145 @param url url to be scanned (QUrl) 155 @param url url to be scanned (QUrl)
146 """ 156 """
147 request = QNetworkRequest(QUrl(self.ScanUrlUrl)) 157 request = QNetworkRequest(QUrl(self.ScanUrlUrl))
148 request.setHeader(QNetworkRequest.ContentTypeHeader, 158 request.setHeader(QNetworkRequest.ContentTypeHeader,
149 "application/x-www-form-urlencoded") 159 "application/x-www-form-urlencoded")
150 params = QByteArray("key={0}&url=".format( 160 params = QByteArray("apikey={0}&url=".format(
151 Preferences.getHelp("VirusTotalServiceKey")).encode("utf-8"))\ 161 Preferences.getHelp("VirusTotalServiceKey")).encode("utf-8"))\
152 .append(QUrl.toPercentEncoding(url.toString())) 162 .append(QUrl.toPercentEncoding(url.toString()))
153 163
154 import Helpviewer.HelpWindow 164 import Helpviewer.HelpWindow
155 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() 165 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
162 Private slot to determine the result of the URL scan submission. 172 Private slot to determine the result of the URL scan submission.
163 """ 173 """
164 reply = self.sender() 174 reply = self.sender()
165 if reply.error() == QNetworkReply.NoError: 175 if reply.error() == QNetworkReply.NoError:
166 result = json.loads(str(reply.readAll(), "utf-8")) 176 result = json.loads(str(reply.readAll(), "utf-8"))
167 if result["result"] == self.ServiceResult_ItemPresent: 177 if result["response_code"] == self.ServiceResult_ItemPresent:
168 self.urlScanReport.emit( 178 self.urlScanReport.emit(result["permalink"])
169 self.ReportUrlScanPagePattern.format(result["scan_id"])) 179 self.__getUrlScanReportUrl(result["scan_id"])
170 self.__getFileScanReportUrl(result["scan_id"])
171 else: 180 else:
172 self.submitUrlError.emit(self.errorMessages[result["result"]]) 181 if result["response_code"] in self.errorMessages:
182 msg = self.errorMessages[result["response_code"]]
183 else:
184 msg = result["verbose_msg"]
185 self.submitUrlError.emit(msg)
186 elif reply.error() == self.ServiceCode_RateLimitExceeded:
187 self.submitUrlError.emit(
188 self.errorMessages[result[self.ServiceCode_RateLimitExceeded]])
173 else: 189 else:
174 self.submitUrlError.emit(reply.errorString()) 190 self.submitUrlError.emit(reply.errorString())
175 self.__replies.remove(reply) 191 self.__replies.remove(reply)
176 192 reply.deleteLater()
177 def __getFileScanReportUrl(self, scanId): 193
178 """ 194 def __getUrlScanReportUrl(self, scanId):
179 Private method to get the report URL for a file scan. 195 """
196 Private method to get the report URL for a URL scan.
180 197
181 @param scanId ID of the scan to get the report URL for (string) 198 @param scanId ID of the scan to get the report URL for (string)
182 """ 199 """
183 request = QNetworkRequest(QUrl(self.GetUrlReportUrl)) 200 request = QNetworkRequest(QUrl(self.GetUrlReportUrl))
184 request.setHeader(QNetworkRequest.ContentTypeHeader, 201 request.setHeader(QNetworkRequest.ContentTypeHeader,
185 "application/x-www-form-urlencoded") 202 "application/x-www-form-urlencoded")
186 params = QByteArray("key={0}&resource={1}".format( 203 params = QByteArray("apikey={0}&resource={1}".format(
187 Preferences.getHelp("VirusTotalServiceKey"), scanId) 204 Preferences.getHelp("VirusTotalServiceKey"), scanId)
188 .encode("utf-8")) 205 .encode("utf-8"))
189 206
190 import Helpviewer.HelpWindow 207 import Helpviewer.HelpWindow
191 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager() 208 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
192 reply = nam.post(request, params) 209 reply = nam.post(request, params)
210 reply.finished.connect(self.__getUrlScanReportUrlFinished)
211 self.__replies.append(reply)
212
213 def __getUrlScanReportUrlFinished(self):
214 """
215 Private slot to determine the result of the URL scan report URL
216 request.
217 """
218 reply = self.sender()
219 if reply.error() == QNetworkReply.NoError:
220 result = json.loads(str(reply.readAll(), "utf-8"))
221 if "filescan_id" in result and result["filescan_id"] is not None:
222 self.__getFileScanReportUrl(result["filescan_id"])
223 self.__replies.remove(reply)
224 reply.deleteLater()
225
226 def __getFileScanReportUrl(self, scanId):
227 """
228 Private method to get the report URL for a file scan.
229
230 @param scanId ID of the scan to get the report URL for (string)
231 """
232 request = QNetworkRequest(QUrl(self.GetFileReportUrl))
233 request.setHeader(QNetworkRequest.ContentTypeHeader,
234 "application/x-www-form-urlencoded")
235 params = QByteArray("apikey={0}&resource={1}".format(
236 Preferences.getHelp("VirusTotalServiceKey"), scanId)
237 .encode("utf-8"))
238
239 import Helpviewer.HelpWindow
240 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
241 reply = nam.post(request, params)
193 reply.finished.connect(self.__getFileScanReportUrlFinished) 242 reply.finished.connect(self.__getFileScanReportUrlFinished)
194 self.__replies.append(reply) 243 self.__replies.append(reply)
195 244
196 def __getFileScanReportUrlFinished(self): 245 def __getFileScanReportUrlFinished(self):
197 """ 246 """
199 request. 248 request.
200 """ 249 """
201 reply = self.sender() 250 reply = self.sender()
202 if reply.error() == QNetworkReply.NoError: 251 if reply.error() == QNetworkReply.NoError:
203 result = json.loads(str(reply.readAll(), "utf-8")) 252 result = json.loads(str(reply.readAll(), "utf-8"))
204 if "file-report" in result: 253 self.fileScanReport.emit(result["permalink"])
205 self.fileScanReport.emit( 254 self.__replies.remove(reply)
206 self.ReportFileScanPagePattern.format( 255 reply.deleteLater()
207 result["file-report"])) 256
208 self.__replies.remove(reply) 257 def getIpAddressReport(self, ipAddress):
209 258 """
210 @classmethod 259 Public method to retrieve a report for an IP address.
211 def getSearchRequestData(cls, term): 260
212 """ 261 @param ipAddress valid IPv4 address in dotted quad notation
213 Class method to assemble the search request data structure. 262 @type str
214 263 """
215 @param term search term (string) 264 self.__lastIP = ipAddress
216 @return tuple of network request object, operation and parameters 265
217 (QNetworkRequest, QNetworkAccessManager.Operation, QByteArray) 266 queryItems = [
218 """ 267 ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
219 request = QNetworkRequest(QUrl(cls.SearchUrl)) 268 ("ip", ipAddress),
220 request.setHeader(QNetworkRequest.ContentTypeHeader, 269 ]
221 "application/x-www-form-urlencoded") 270 url = QUrl(self.GetIpAddressReportUrl)
222 op = QNetworkAccessManager.PostOperation 271 if qVersion() >= "5.0.0":
223 params = QByteArray(b"chain=").append(QUrl.toPercentEncoding(term)) 272 from PyQt5.QtCore import QUrlQuery
224 273 query = QUrlQuery()
225 return (request, op, params) 274 query.setQueryItems(queryItems)
275 url.setQuery(query)
276 else:
277 url.setQueryItems(queryItems)
278 request = QNetworkRequest(url)
279
280 import Helpviewer.HelpWindow
281 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
282 reply = nam.get(request)
283 reply.finished.connect(self.__getIpAddressReportFinished)
284 self.__replies.append(reply)
285
286 def __getIpAddressReportFinished(self):
287 """
288 Private slot to process the IP address report data.
289 """
290 reply = self.sender()
291 if reply.error() == QNetworkReply.NoError:
292 result = json.loads(str(reply.readAll(), "utf-8"))
293 if result["response_code"] == 0:
294 E5MessageBox.information(
295 None,
296 self.tr("VirusTotal IP Address Report"),
297 self.tr("""VirusTotal does not have any information for"""
298 """ the given IP address."""))
299 elif result["response_code"] == -1:
300 E5MessageBox.information(
301 None,
302 self.tr("VirusTotal IP Address Report"),
303 self.tr("""The submitted IP address is invalid."""))
304 else:
305 owner = result["as_owner"]
306 resolutions = result["resolutions"]
307 try:
308 urls = result["detected_urls"]
309 except KeyError:
310 urls = []
311
312 from .VirusTotalIpReportDialog import VirusTotalIpReportDialog
313 self.__ipReportDlg = VirusTotalIpReportDialog(
314 self.__lastIP, owner, resolutions, urls)
315 self.__ipReportDlg.show()
316 self.__replies.remove(reply)
317 reply.deleteLater()
318
319 def getDomainReport(self, domain):
320 """
321 Public method to retrieve a report for a domain.
322
323 @param domain domain name
324 @type str
325 """
326 self.__lastDomain = domain
327
328 queryItems = [
329 ("apikey", Preferences.getHelp("VirusTotalServiceKey")),
330 ("domain", domain),
331 ]
332 url = QUrl(self.GetDomainReportUrl)
333 if qVersion() >= "5.0.0":
334 from PyQt5.QtCore import QUrlQuery
335 query = QUrlQuery()
336 query.setQueryItems(queryItems)
337 url.setQuery(query)
338 else:
339 url.setQueryItems(queryItems)
340 request = QNetworkRequest(url)
341
342 import Helpviewer.HelpWindow
343 nam = Helpviewer.HelpWindow.HelpWindow.networkAccessManager()
344 reply = nam.get(request)
345 reply.finished.connect(self.__getDomainReportFinished)
346 self.__replies.append(reply)
347
348 def __getDomainReportFinished(self):
349 """
350 Private slot to process the IP address report data.
351 """
352 reply = self.sender()
353 if reply.error() == QNetworkReply.NoError:
354 result = json.loads(str(reply.readAll(), "utf-8"))
355 if result["response_code"] == 0:
356 E5MessageBox.information(
357 None,
358 self.tr("VirusTotal Domain Report"),
359 self.tr("""VirusTotal does not have any information for"""
360 """ the given domain."""))
361 elif result["response_code"] == -1:
362 E5MessageBox.information(
363 None,
364 self.tr("VirusTotal Domain Report"),
365 self.tr("""The submitted domain address is invalid."""))
366 else:
367 resolutions = result["resolutions"]
368 try:
369 urls = result["detected_urls"]
370 except KeyError:
371 urls = []
372 try:
373 subdomains = result["subdomains"]
374 except KeyError:
375 subdomains = []
376 try:
377 bdCategory = result["BitDefender category"]
378 except KeyError:
379 bdCategory = self.tr("not available")
380 try:
381 tmCategory = result["TrendMicro category"]
382 except KeyError:
383 tmCategory = self.tr("not available")
384 try:
385 wtsCategory = result["Websense ThreatSeeker category"]
386 except KeyError:
387 wtsCategory = self.tr("not available")
388 try:
389 categories = result["categories"]
390 except KeyError:
391 categories = []
392
393 from .VirusTotalDomainReportDialog import \
394 VirusTotalDomainReportDialog
395 self.__domainReportDlg = VirusTotalDomainReportDialog(
396 self.__lastDomain, resolutions, urls, subdomains,
397 bdCategory, tmCategory, wtsCategory, categories)
398 self.__domainReportDlg.show()
399 self.__replies.remove(reply)
400 reply.deleteLater()
401
402 def close(self):
403 """
404 Public slot to close the API.
405 """
406 for reply in self.__replies:
407 reply.abort()
408
409 self.__ipReportDlg and self.__ipReportDlg.close()
410 self.__domainReportDlg and self.__domainReportDlg.close()

eric ide

mercurial