eric6/WebBrowser/SafeBrowsing/SafeBrowsingAPIClient.py

changeset 6942
2602857055c5
parent 6645
ad476851d7e0
child 7192
a22eee00b052
equal deleted inserted replaced
6941:f99d60d6b59b 6942:2602857055c5
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2017 - 2019 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the low level interface for Google Safe Browsing.
8 """
9
10 from __future__ import unicode_literals
11 try:
12 str = unicode # __IGNORE_EXCEPTION__
13 except NameError:
14 pass
15
16 import json
17 import base64
18
19 from PyQt5.QtCore import pyqtSignal, QObject, QDateTime, QUrl, QByteArray, \
20 QCoreApplication, QEventLoop
21 from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
22
23 from WebBrowser.WebBrowserWindow import WebBrowserWindow
24
25 from .SafeBrowsingThreatList import ThreatList
26
27
28 class SafeBrowsingAPIClient(QObject):
29 """
30 Class implementing the low level interface for Google Safe Browsing.
31
32 @signal networkError(str) emitted to indicate a network error
33 """
34 ClientId = "eric6_API_client"
35 ClientVersion = "2.0.0"
36
37 GsbUrlTemplate = "https://safebrowsing.googleapis.com/v4/{0}?key={1}"
38
39 networkError = pyqtSignal(str)
40
41 def __init__(self, apiKey, fairUse=True, parent=None):
42 """
43 Constructor
44
45 @param apiKey API key to be used
46 @type str
47 @param fairUse flag indicating to follow the fair use policy
48 @type bool
49 @param parent reference to the parent object
50 @type QObject
51 """
52 super(SafeBrowsingAPIClient, self).__init__(parent)
53
54 self.__apiKey = apiKey
55 self.__fairUse = fairUse
56
57 self.__nextRequestNoSoonerThan = QDateTime()
58 self.__failCount = 0
59
60 self.__lookupApiCache = {}
61 # Temporary cache used by the lookup API (v4)
62 # key: URL as string
63 # value: dictionary with these entries:
64 # "validUntil": (QDateTime)
65 # "threatInfo": (list of ThreatList)
66
67 def setApiKey(self, apiKey):
68 """
69 Public method to set the API key.
70
71 @param apiKey API key to be set
72 @type str
73 """
74 self.__apiKey = apiKey
75
76 def getThreatLists(self):
77 """
78 Public method to retrieve all available threat lists.
79
80 @return tuple containing list of threat lists and an error message
81 @rtype tuple of (list of dict containing 'threatType', 'platformType'
82 and 'threatEntryType', bool)
83 """
84 url = QUrl(self.GsbUrlTemplate.format("threatLists", self.__apiKey))
85 req = QNetworkRequest(url)
86 reply = WebBrowserWindow.networkManager().get(req)
87
88 while reply.isRunning():
89 QCoreApplication.processEvents(QEventLoop.AllEvents, 200)
90 # max. 200 ms processing
91
92 res = None
93 error = ""
94 if reply.error() != QNetworkReply.NoError:
95 error = reply.errorString()
96 self.networkError.emit(error)
97 else:
98 result = self.__extractData(reply)
99 res = result["threatLists"]
100
101 reply.deleteLater()
102 return res, error
103
104 #######################################################################
105 ## Methods below implement the 'Update API (v4)'
106 #######################################################################
107
108 def getThreatsUpdate(self, clientStates):
109 """
110 Public method to fetch hash prefix updates for the given threat list.
111
112 @param clientStates dictionary of client states with keys like
113 (threatType, platformType, threatEntryType)
114 @type dict
115 @return tuple containing the list of threat updates and an error
116 message
117 @rtype tuple of (list of dict, bool)
118 """
119 requestBody = {
120 "client": {
121 "clientId": self.ClientId,
122 "clientVersion": self.ClientVersion,
123 },
124 "listUpdateRequests": [],
125 }
126
127 for (threatType, platformType, threatEntryType), currentState in \
128 clientStates.items():
129 requestBody["listUpdateRequests"].append(
130 {
131 "threatType": threatType,
132 "platformType": platformType,
133 "threatEntryType": threatEntryType,
134 "state": currentState,
135 "constraints": {
136 "supportedCompressions": ["RAW"],
137 }
138 }
139 )
140
141 data = QByteArray(json.dumps(requestBody).encode("utf-8"))
142 url = QUrl(self.GsbUrlTemplate.format("threatListUpdates:fetch",
143 self.__apiKey))
144 req = QNetworkRequest(url)
145 req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
146 reply = WebBrowserWindow.networkManager().post(req, data)
147
148 while reply.isRunning():
149 QCoreApplication.processEvents(QEventLoop.AllEvents, 200)
150 # max. 200 ms processing
151
152 res = None
153 error = ""
154 if reply.error() != QNetworkReply.NoError:
155 error = reply.errorString()
156 self.networkError.emit(error)
157 else:
158 result = self.__extractData(reply)
159 res = result["listUpdateResponses"]
160
161 reply.deleteLater()
162 return res, error
163
164 def getFullHashes(self, prefixes, clientState):
165 """
166 Public method to find full hashes matching hash prefixes.
167
168 @param prefixes list of hash prefixes to find
169 @type list of str (Python 2) or list of bytes (Python 3)
170 @param clientState dictionary of client states with keys like
171 (threatType, platformType, threatEntryType)
172 @type dict
173 @return dictionary containing the list of found hashes and the
174 negative cache duration
175 @rtype dict
176 """
177 requestBody = {
178 "client": {
179 "clientId": self.ClientId,
180 "clientVersion": self.ClientVersion,
181 },
182 "clientStates": [],
183 "threatInfo": {
184 "threatTypes": [],
185 "platformTypes": [],
186 "threatEntryTypes": [],
187 "threatEntries": [],
188 },
189 }
190
191 for prefix in prefixes:
192 requestBody["threatInfo"]["threatEntries"].append(
193 {"hash": base64.b64encode(prefix).decode("ascii")})
194
195 for (threatType, platformType, threatEntryType), currentState in \
196 clientState.items():
197 requestBody["clientStates"].append(currentState)
198 if threatType not in requestBody["threatInfo"]["threatTypes"]:
199 requestBody["threatInfo"]["threatTypes"].append(threatType)
200 if platformType not in \
201 requestBody["threatInfo"]["platformTypes"]:
202 requestBody["threatInfo"]["platformTypes"].append(
203 platformType)
204 if threatEntryType not in \
205 requestBody["threatInfo"]["threatEntryTypes"]:
206 requestBody["threatInfo"]["threatEntryTypes"].append(
207 threatEntryType)
208
209 data = QByteArray(json.dumps(requestBody).encode("utf-8"))
210 url = QUrl(self.GsbUrlTemplate.format("fullHashes:find",
211 self.__apiKey))
212 req = QNetworkRequest(url)
213 req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
214 reply = WebBrowserWindow.networkManager().post(req, data)
215
216 while reply.isRunning():
217 QCoreApplication.processEvents(QEventLoop.AllEvents, 200)
218 # max. 200 ms processing
219
220 res = []
221 if reply.error() != QNetworkReply.NoError:
222 self.networkError.emit(reply.errorString())
223 else:
224 res = self.__extractData(reply)
225
226 reply.deleteLater()
227 return res
228
229 def __extractData(self, reply):
230 """
231 Private method to extract the data of a network reply.
232
233 @param reply reference to the network reply object
234 @type QNetworkReply
235 @return extracted data
236 @rtype list or dict
237 """
238 result = json.loads(str(reply.readAll(), "utf-8"))
239 self.__setWaitDuration(result.get("minimumWaitDuration"))
240 return result
241
242 def __setWaitDuration(self, minimumWaitDuration):
243 """
244 Private method to set the minimum wait duration.
245
246 @param minimumWaitDuration duration to be set
247 @type str
248 """
249 if not self.__fairUse or minimumWaitDuration is None:
250 self.__nextRequestNoSoonerThan = QDateTime()
251 else:
252 waitDuration = int(float(minimumWaitDuration.rstrip("s")))
253 self.__nextRequestNoSoonerThan = \
254 QDateTime.currentDateTime().addSecs(waitDuration)
255
256 def fairUseDelayExpired(self):
257 """
258 Public method to check, if the fair use wait period has expired.
259
260 @return flag indicating expiration
261 @rtype bool
262 """
263 return (
264 self.__fairUse and
265 QDateTime.currentDateTime() >= self.__nextRequestNoSoonerThan
266 ) or not self.__fairUse
267
268 def getFairUseDelayExpirationDateTime(self):
269 """
270 Public method to get the date and time the fair use delay will expire.
271
272 @return fair use delay expiration date and time
273 @rtype QDateTime
274 """
275 return self.__nextRequestNoSoonerThan
276
277 #######################################################################
278 ## Methods below implement the 'Lookup API (v4)'
279 #######################################################################
280
281 def lookupUrl(self, url, platforms):
282 """
283 Public method to send an URL to Google for checking.
284
285 @param url URL to be checked
286 @type QUrl
287 @param platforms list of platform types to check against
288 @type list of str
289 @return tuple containing the list of threat list info objects and
290 an error message
291 @rtype tuple of (list of ThreatList, str)
292 """
293 error = ""
294
295 # sanitize the URL by removing user info and query data
296 url = url.adjusted(
297 QUrl.RemoveUserInfo | QUrl.RemoveQuery | QUrl.RemoveFragment
298 )
299 urlStr = url.toString()
300
301 # check the local cache first
302 if urlStr in self.__lookupApiCache:
303 if self.__lookupApiCache[urlStr]["validUntil"] > \
304 QDateTime.currentDateTime():
305 # cached entry is still valid
306 return self.__lookupApiCache[urlStr]["threatInfo"], error
307 else:
308 del self.__lookupApiCache[urlStr]
309
310 # no valid entry found, ask the safe browsing server
311 requestBody = {
312 "client": {
313 "clientId": self.ClientId,
314 "clientVersion": self.ClientVersion,
315 },
316 "threatInfo": {
317 "threatTypes": SafeBrowsingAPIClient.definedThreatTypes(),
318 "platformTypes": platforms,
319 "threatEntryTypes":
320 SafeBrowsingAPIClient.definedThreatEntryTypes(),
321 "threatEntries": [
322 {"url": urlStr},
323 ],
324 },
325 }
326
327 data = QByteArray(json.dumps(requestBody).encode("utf-8"))
328 url = QUrl(self.GsbUrlTemplate.format("threatMatches:find",
329 self.__apiKey))
330 req = QNetworkRequest(url)
331 req.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
332 reply = WebBrowserWindow.networkManager().post(req, data)
333
334 while reply.isRunning():
335 QCoreApplication.processEvents(QEventLoop.AllEvents, 200)
336 # max. 200 ms processing
337
338 threats = []
339 if reply.error() != QNetworkReply.NoError:
340 error = reply.errorString()
341 self.networkError.emit(error)
342 else:
343 res = json.loads(str(reply.readAll(), "utf-8"))
344 if res and "matches" in res:
345 cacheDuration = 0
346 for match in res["matches"]:
347 threatInfo = ThreatList(
348 match["threatType"],
349 match["platformType"],
350 match["threatEntryType"],
351 )
352 threats.append(threatInfo)
353 if "cacheDuration" in match:
354 cacheDurationSec = int(
355 match["cacheDuration"].strip().rstrip("s")
356 .split(".")[0])
357 if cacheDurationSec > cacheDuration:
358 cacheDuration = cacheDurationSec
359 if cacheDuration > 0 and bool(threats):
360 validUntil = QDateTime.currentDateTime().addSecs(
361 cacheDuration)
362 self.__lookupApiCache[urlStr] = {
363 "validUntil": validUntil,
364 "threatInfo": threats
365 }
366
367 reply.deleteLater()
368 return threats, error
369
370 #######################################################################
371 ## Methods below implement global (class wide) functionality
372 #######################################################################
373
374 @classmethod
375 def getThreatMessage(cls, threatType):
376 """
377 Class method to get a warning message for the given threat type.
378
379 @param threatType threat type to get the message for
380 @type str
381 @return threat message
382 @rtype str
383 """
384 threatType = threatType.lower()
385 if threatType == "malware":
386 msg = QCoreApplication.translate(
387 "SafeBrowsingAPI",
388 "<h3>Malware Warning</h3>"
389 "<p>The web site you are about to visit may try to install"
390 " harmful programs on your computer in order to steal or"
391 " destroy your data.</p>")
392 elif threatType == "social_engineering":
393 msg = QCoreApplication.translate(
394 "SafeBrowsingAPI",
395 "<h3>Phishing Warning</h3>"
396 "<p>The web site you are about to visit may try to trick you"
397 " into doing something dangerous online, such as revealing"
398 " passwords or personal information, usually through a fake"
399 " website.</p>")
400 elif threatType == "unwanted_software":
401 msg = QCoreApplication.translate(
402 "SafeBrowsingAPI",
403 "<h3>Unwanted Software Warning</h3>"
404 "<p>The software you are about to download may negatively"
405 " affect your browsing or computing experience.</p>")
406 elif threatType == "potentially_harmful_application":
407 msg = QCoreApplication.translate(
408 "SafeBrowsingAPI",
409 "<h3>Potentially Harmful Application</h3>"
410 "<p>The web site you are about to visit may try to trick you"
411 " into installing applications, that may negatively affect"
412 " your browsing experience.</p>")
413 elif threatType == "malicious_binary":
414 msg = QCoreApplication.translate(
415 "SafeBrowsingAPI",
416 "<h3>Malicious Binary Warning</h3>"
417 "<p>The software you are about to download may be harmful"
418 " to your computer.</p>")
419 else:
420 # unknow threat
421 msg = QCoreApplication.translate(
422 "SafeBrowsingAPI",
423 "<h3>Unknown Threat Warning</h3>"
424 "<p>The web site you are about to visit was found in the Safe"
425 " Browsing Database but was not classified yet.</p>")
426
427 return msg
428
429 @classmethod
430 def getThreatType(cls, threatType):
431 """
432 Class method to get a display string for a given threat type.
433
434 @param threatType threat type to get display string for
435 @type str
436 @return display string
437 @rtype str
438 """
439 threatType = threatType.lower()
440 if threatType == "malware":
441 displayString = QCoreApplication.translate(
442 "SafeBrowsingAPI", "Malware")
443 elif threatType == "social_engineering":
444 displayString = QCoreApplication.translate(
445 "SafeBrowsingAPI", "Phishing")
446 elif threatType == "unwanted_software":
447 displayString = QCoreApplication.translate(
448 "SafeBrowsingAPI", "Unwanted Software")
449 elif threatType == "potentially_harmful_application":
450 displayString = QCoreApplication.translate(
451 "SafeBrowsingAPI", "Harmful Application")
452 elif threatType == "malcious_binary":
453 displayString = QCoreApplication.translate(
454 "SafeBrowsingAPI", "Malicious Binary")
455 else:
456 displayString = QCoreApplication.translate(
457 "SafeBrowsingAPI", "Unknown Threat")
458
459 return displayString
460
461 @classmethod
462 def definedThreatTypes(cls):
463 """
464 Class method to get all threat types defined in API v4.
465
466 @return list of defined threat types
467 @rtype list of str
468 """
469 return [
470 "THREAT_TYPE_UNSPECIFIED", "MALWARE", "SOCIAL_ENGINEERING",
471 "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION",
472 ]
473
474 @classmethod
475 def getThreatEntryString(cls, threatEntry):
476 """
477 Class method to get the threat entry string.
478
479 @param threatEntry threat entry type as defined in the v4 API
480 @type str
481 @return threat entry string
482 @rtype str
483 """
484 if threatEntry == "URL":
485 return "URL"
486 elif threatEntry == "EXECUTABLE":
487 return QCoreApplication.translate(
488 "SafeBrowsingAPI", "executable program")
489 else:
490 return QCoreApplication.translate(
491 "SafeBrowsingAPI", "unknown type")
492
493 @classmethod
494 def definedThreatEntryTypes(cls):
495 """
496 Class method to get all threat entry types defined in API v4.
497
498 @return list of all defined threat entry types
499 @rtype list of str
500 """
501 return [
502 "THREAT_ENTRY_TYPE_UNSPECIFIED", "URL", "EXECUTABLE",
503 ]
504
505 @classmethod
506 def getPlatformString(cls, platformType):
507 """
508 Class method to get the platform string for a given platform type.
509
510 @param platformType platform type as defined in the v4 API
511 @type str
512 @return platform string
513 @rtype str
514 """
515 platformStrings = {
516 "WINDOWS": "Windows",
517 "LINUX": "Linux",
518 "ANDROID": "Android",
519 "OSX": "macOS",
520 "IOS": "iOS",
521 "CHROME": "Chrome OS",
522 }
523 if platformType in platformStrings:
524 return platformStrings[platformType]
525
526 if platformType == "ANY_PLATFORM":
527 return QCoreApplication.translate(
528 "SafeBrowsingAPI", "any defined platform")
529 elif platformType == "ALL_PLATFORMS":
530 return QCoreApplication.translate(
531 "SafeBrowsingAPI", "all defined platforms")
532 else:
533 return QCoreApplication.translate(
534 "SafeBrowsingAPI", "unknown platform")
535
536 @classmethod
537 def getPlatformTypes(cls, platform):
538 """
539 Class method to get the platform types for a given platform.
540
541 @param platform platform string
542 @type str (one of 'linux', 'windows', 'macos')
543 @return list of platform types as defined in the v4 API for the
544 given platform
545 @rtype list of str
546 @exception ValueError raised to indicate an invalid platform string
547 """
548 platform = platform.lower()
549
550 platformTypes = ["ANY_PLATFORM", "ALL_PLATFORMS"]
551 if platform == "linux":
552 platformTypes.append("LINUX")
553 elif platform == "windows":
554 platformTypes.append("WINDOWS")
555 elif platform == "macos":
556 platformTypes.append("OSX")
557 else:
558 raise ValueError("Unsupported platform")
559
560 return platformTypes
561
562 @classmethod
563 def definedPlatformTypes(cls):
564 """
565 Class method to get all platform types defined in API v4.
566
567 @return list of all defined platform types
568 @rtype list of str
569 """
570 return [
571 "PLATFORM_TYPE_UNSPECIFIED", "WINDOWS", "LINUX", "ANDROID", "OSX",
572 "IOS", "ANY_PLATFORM", "ALL_PLATFORMS", "CHROME",
573 ]

eric ide

mercurial