src/eric7/WebBrowser/SafeBrowsing/SafeBrowsingAPIClient.py

branch
eric7
changeset 9209
b99e7fd55fd3
parent 8881
54e42bc2437a
child 9221
bf71ee032bb4
equal deleted inserted replaced
9208:3fc8dfeb6ebe 9209:b99e7fd55fd3
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2017 - 2022 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the low level interface for Google Safe Browsing.
8 """
9
10 import json
11 import base64
12
13 from PyQt6.QtCore import (
14 pyqtSignal, QObject, QDateTime, QUrl, QByteArray, QCoreApplication,
15 QEventLoop
16 )
17 from PyQt6.QtNetwork import QNetworkRequest, QNetworkReply
18
19 from WebBrowser.WebBrowserWindow import WebBrowserWindow
20
21 from .SafeBrowsingThreatList import ThreatList
22
23
24 class SafeBrowsingAPIClient(QObject):
25 """
26 Class implementing the low level interface for Google Safe Browsing.
27
28 @signal networkError(str) emitted to indicate a network error
29 """
30 ClientId = "eric7_API_client"
31 ClientVersion = "2.0.0"
32
33 GsbUrlTemplate = "https://safebrowsing.googleapis.com/v4/{0}?key={1}"
34
35 networkError = pyqtSignal(str)
36
37 def __init__(self, apiKey, fairUse=True, parent=None):
38 """
39 Constructor
40
41 @param apiKey API key to be used
42 @type str
43 @param fairUse flag indicating to follow the fair use policy
44 @type bool
45 @param parent reference to the parent object
46 @type QObject
47 """
48 super().__init__(parent)
49
50 self.__apiKey = apiKey
51 self.__fairUse = fairUse
52
53 self.__nextRequestNoSoonerThan = QDateTime()
54 self.__failCount = 0
55
56 self.__lookupApiCache = {}
57 # Temporary cache used by the lookup API (v4)
58 # key: URL as string
59 # value: dictionary with these entries:
60 # "validUntil": (QDateTime)
61 # "threatInfo": (list of ThreatList)
62
63 def setApiKey(self, apiKey):
64 """
65 Public method to set the API key.
66
67 @param apiKey API key to be set
68 @type str
69 """
70 self.__apiKey = apiKey
71
72 def getThreatLists(self):
73 """
74 Public method to retrieve all available threat lists.
75
76 @return tuple containing list of threat lists and an error message
77 @rtype tuple of (list of dict containing 'threatType', 'platformType'
78 and 'threatEntryType', bool)
79 """
80 url = QUrl(self.GsbUrlTemplate.format("threatLists", self.__apiKey))
81 req = QNetworkRequest(url)
82 reply = WebBrowserWindow.networkManager().get(req)
83
84 while reply.isRunning():
85 QCoreApplication.processEvents(
86 QEventLoop.ProcessEventsFlag.AllEvents, 200)
87 # max. 200 ms processing
88
89 res = None
90 error = ""
91 if reply.error() != QNetworkReply.NetworkError.NoError:
92 error = reply.errorString()
93 self.networkError.emit(error)
94 else:
95 result = self.__extractData(reply)
96 res = result["threatLists"]
97
98 reply.deleteLater()
99 return res, error
100
101 #######################################################################
102 ## Methods below implement the 'Update API (v4)'
103 #######################################################################
104
105 def getThreatsUpdate(self, clientStates):
106 """
107 Public method to fetch hash prefix updates for the given threat list.
108
109 @param clientStates dictionary of client states with keys like
110 (threatType, platformType, threatEntryType)
111 @type dict
112 @return tuple containing the list of threat updates and an error
113 message
114 @rtype tuple of (list of dict, bool)
115 """
116 requestBody = {
117 "client": {
118 "clientId": self.ClientId,
119 "clientVersion": self.ClientVersion,
120 },
121 "listUpdateRequests": [],
122 }
123
124 for (threatType, platformType, threatEntryType), currentState in (
125 clientStates.items()
126 ):
127 requestBody["listUpdateRequests"].append(
128 {
129 "threatType": threatType,
130 "platformType": platformType,
131 "threatEntryType": threatEntryType,
132 "state": currentState,
133 "constraints": {
134 "supportedCompressions": ["RAW"],
135 }
136 }
137 )
138
139 data = QByteArray(json.dumps(requestBody).encode("utf-8"))
140 url = QUrl(self.GsbUrlTemplate.format("threatListUpdates:fetch",
141 self.__apiKey))
142 req = QNetworkRequest(url)
143 req.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
144 "application/json")
145 reply = WebBrowserWindow.networkManager().post(req, data)
146
147 while reply.isRunning():
148 QCoreApplication.processEvents(
149 QEventLoop.ProcessEventsFlag.AllEvents, 200)
150 # max. 200 ms processing
151
152 res = None
153 error = ""
154 if reply.error() != QNetworkReply.NetworkError.NoError:
155 error = reply.errorString()
156 self.networkError.emit(error)
157 else:
158 result = self.__extractData(reply)
159 res = result["listUpdateResponses"]
160
161 reply.deleteLater()
162 return res, error
163
164 def getFullHashes(self, prefixes, clientState):
165 """
166 Public method to find full hashes matching hash prefixes.
167
168 @param prefixes list of hash prefixes to find
169 @type list of bytes
170 @param clientState dictionary of client states with keys like
171 (threatType, platformType, threatEntryType)
172 @type dict
173 @return dictionary containing the list of found hashes and the
174 negative cache duration
175 @rtype dict
176 """
177 requestBody = {
178 "client": {
179 "clientId": self.ClientId,
180 "clientVersion": self.ClientVersion,
181 },
182 "clientStates": [],
183 "threatInfo": {
184 "threatTypes": [],
185 "platformTypes": [],
186 "threatEntryTypes": [],
187 "threatEntries": [],
188 },
189 }
190
191 for prefix in prefixes:
192 requestBody["threatInfo"]["threatEntries"].append(
193 {"hash": base64.b64encode(prefix).decode("ascii")})
194
195 for (threatType, platformType, threatEntryType), currentState in (
196 clientState.items()
197 ):
198 requestBody["clientStates"].append(currentState)
199 if threatType not in requestBody["threatInfo"]["threatTypes"]:
200 requestBody["threatInfo"]["threatTypes"].append(threatType)
201 if (
202 platformType not in
203 requestBody["threatInfo"]["platformTypes"]
204 ):
205 requestBody["threatInfo"]["platformTypes"].append(
206 platformType)
207 if (
208 threatEntryType not in
209 requestBody["threatInfo"]["threatEntryTypes"]
210 ):
211 requestBody["threatInfo"]["threatEntryTypes"].append(
212 threatEntryType)
213
214 data = QByteArray(json.dumps(requestBody).encode("utf-8"))
215 url = QUrl(self.GsbUrlTemplate.format("fullHashes:find",
216 self.__apiKey))
217 req = QNetworkRequest(url)
218 req.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
219 "application/json")
220 reply = WebBrowserWindow.networkManager().post(req, data)
221
222 while reply.isRunning():
223 QCoreApplication.processEvents(
224 QEventLoop.ProcessEventsFlag.AllEvents, 200)
225 # max. 200 ms processing
226
227 res = []
228 if reply.error() != QNetworkReply.NetworkError.NoError:
229 self.networkError.emit(reply.errorString())
230 else:
231 res = self.__extractData(reply)
232
233 reply.deleteLater()
234 return res
235
236 def __extractData(self, reply):
237 """
238 Private method to extract the data of a network reply.
239
240 @param reply reference to the network reply object
241 @type QNetworkReply
242 @return extracted data
243 @rtype list or dict
244 """
245 result = json.loads(str(reply.readAll(), "utf-8"))
246 self.__setWaitDuration(result.get("minimumWaitDuration"))
247 return result
248
249 def __setWaitDuration(self, minimumWaitDuration):
250 """
251 Private method to set the minimum wait duration.
252
253 @param minimumWaitDuration duration to be set
254 @type str
255 """
256 if not self.__fairUse or minimumWaitDuration is None:
257 self.__nextRequestNoSoonerThan = QDateTime()
258 else:
259 waitDuration = int(float(minimumWaitDuration.rstrip("s")))
260 self.__nextRequestNoSoonerThan = (
261 QDateTime.currentDateTime().addSecs(waitDuration)
262 )
263
264 def fairUseDelayExpired(self):
265 """
266 Public method to check, if the fair use wait period has expired.
267
268 @return flag indicating expiration
269 @rtype bool
270 """
271 return (
272 self.__fairUse and
273 QDateTime.currentDateTime() >= self.__nextRequestNoSoonerThan
274 ) or not self.__fairUse
275
276 def getFairUseDelayExpirationDateTime(self):
277 """
278 Public method to get the date and time the fair use delay will expire.
279
280 @return fair use delay expiration date and time
281 @rtype QDateTime
282 """
283 return self.__nextRequestNoSoonerThan
284
285 #######################################################################
286 ## Methods below implement the 'Lookup API (v4)'
287 #######################################################################
288
289 def lookupUrl(self, url, platforms):
290 """
291 Public method to send an URL to Google for checking.
292
293 @param url URL to be checked
294 @type QUrl
295 @param platforms list of platform types to check against
296 @type list of str
297 @return tuple containing the list of threat list info objects and
298 an error message
299 @rtype tuple of (list of ThreatList, str)
300 """
301 error = ""
302
303 # sanitize the URL by removing user info and query data
304 url = url.adjusted(
305 QUrl.UrlFormattingOption.RemoveUserInfo |
306 QUrl.UrlFormattingOption.RemoveQuery |
307 QUrl.UrlFormattingOption.RemoveFragment
308 )
309 urlStr = url.toString()
310
311 # check the local cache first
312 if urlStr in self.__lookupApiCache:
313 if (
314 self.__lookupApiCache[urlStr]["validUntil"] >
315 QDateTime.currentDateTime()
316 ):
317 # cached entry is still valid
318 return self.__lookupApiCache[urlStr]["threatInfo"], error
319 else:
320 del self.__lookupApiCache[urlStr]
321
322 # no valid entry found, ask the safe browsing server
323 requestBody = {
324 "client": {
325 "clientId": self.ClientId,
326 "clientVersion": self.ClientVersion,
327 },
328 "threatInfo": {
329 "threatTypes": SafeBrowsingAPIClient.definedThreatTypes(),
330 "platformTypes": platforms,
331 "threatEntryTypes":
332 SafeBrowsingAPIClient.definedThreatEntryTypes(),
333 "threatEntries": [
334 {"url": urlStr},
335 ],
336 },
337 }
338
339 data = QByteArray(json.dumps(requestBody).encode("utf-8"))
340 url = QUrl(self.GsbUrlTemplate.format("threatMatches:find",
341 self.__apiKey))
342 req = QNetworkRequest(url)
343 req.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader,
344 "application/json")
345 reply = WebBrowserWindow.networkManager().post(req, data)
346
347 while reply.isRunning():
348 QCoreApplication.processEvents(
349 QEventLoop.ProcessEventsFlag.AllEvents, 200)
350 # max. 200 ms processing
351
352 threats = []
353 if reply.error() != QNetworkReply.NetworkError.NoError:
354 error = reply.errorString()
355 self.networkError.emit(error)
356 else:
357 res = json.loads(str(reply.readAll(), "utf-8"))
358 if res and "matches" in res:
359 cacheDuration = 0
360 for match in res["matches"]:
361 threatInfo = ThreatList(
362 match["threatType"],
363 match["platformType"],
364 match["threatEntryType"],
365 )
366 threats.append(threatInfo)
367 if "cacheDuration" in match:
368 cacheDurationSec = int(
369 match["cacheDuration"].strip().rstrip("s")
370 .split(".")[0])
371 if cacheDurationSec > cacheDuration:
372 cacheDuration = cacheDurationSec
373 if cacheDuration > 0 and bool(threats):
374 validUntil = QDateTime.currentDateTime().addSecs(
375 cacheDuration)
376 self.__lookupApiCache[urlStr] = {
377 "validUntil": validUntil,
378 "threatInfo": threats
379 }
380
381 reply.deleteLater()
382 return threats, error
383
384 #######################################################################
385 ## Methods below implement global (class wide) functionality
386 #######################################################################
387
388 @classmethod
389 def getThreatMessage(cls, threatType):
390 """
391 Class method to get a warning message for the given threat type.
392
393 @param threatType threat type to get the message for
394 @type str
395 @return threat message
396 @rtype str
397 """
398 threatType = threatType.lower()
399 if threatType == "malware":
400 msg = QCoreApplication.translate(
401 "SafeBrowsingAPI",
402 "<h3>Malware Warning</h3>"
403 "<p>The web site you are about to visit may try to install"
404 " harmful programs on your computer in order to steal or"
405 " destroy your data.</p>")
406 elif threatType == "social_engineering":
407 msg = QCoreApplication.translate(
408 "SafeBrowsingAPI",
409 "<h3>Phishing Warning</h3>"
410 "<p>The web site you are about to visit may try to trick you"
411 " into doing something dangerous online, such as revealing"
412 " passwords or personal information, usually through a fake"
413 " website.</p>")
414 elif threatType == "unwanted_software":
415 msg = QCoreApplication.translate(
416 "SafeBrowsingAPI",
417 "<h3>Unwanted Software Warning</h3>"
418 "<p>The software you are about to download may negatively"
419 " affect your browsing or computing experience.</p>")
420 elif threatType == "potentially_harmful_application":
421 msg = QCoreApplication.translate(
422 "SafeBrowsingAPI",
423 "<h3>Potentially Harmful Application</h3>"
424 "<p>The web site you are about to visit may try to trick you"
425 " into installing applications, that may negatively affect"
426 " your browsing experience.</p>")
427 elif threatType == "malicious_binary":
428 msg = QCoreApplication.translate(
429 "SafeBrowsingAPI",
430 "<h3>Malicious Binary Warning</h3>"
431 "<p>The software you are about to download may be harmful"
432 " to your computer.</p>")
433 else:
434 # unknow threat
435 msg = QCoreApplication.translate(
436 "SafeBrowsingAPI",
437 "<h3>Unknown Threat Warning</h3>"
438 "<p>The web site you are about to visit was found in the Safe"
439 " Browsing Database but was not classified yet.</p>")
440
441 return msg
442
443 @classmethod
444 def getThreatType(cls, threatType):
445 """
446 Class method to get a display string for a given threat type.
447
448 @param threatType threat type to get display string for
449 @type str
450 @return display string
451 @rtype str
452 """
453 threatType = threatType.lower()
454 if threatType == "malware":
455 displayString = QCoreApplication.translate(
456 "SafeBrowsingAPI", "Malware")
457 elif threatType == "social_engineering":
458 displayString = QCoreApplication.translate(
459 "SafeBrowsingAPI", "Phishing")
460 elif threatType == "unwanted_software":
461 displayString = QCoreApplication.translate(
462 "SafeBrowsingAPI", "Unwanted Software")
463 elif threatType == "potentially_harmful_application":
464 displayString = QCoreApplication.translate(
465 "SafeBrowsingAPI", "Harmful Application")
466 elif threatType == "malcious_binary":
467 displayString = QCoreApplication.translate(
468 "SafeBrowsingAPI", "Malicious Binary")
469 else:
470 displayString = QCoreApplication.translate(
471 "SafeBrowsingAPI", "Unknown Threat")
472
473 return displayString
474
475 @classmethod
476 def definedThreatTypes(cls):
477 """
478 Class method to get all threat types defined in API v4.
479
480 @return list of defined threat types
481 @rtype list of str
482 """
483 return [
484 "THREAT_TYPE_UNSPECIFIED", "MALWARE", "SOCIAL_ENGINEERING",
485 "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION",
486 ]
487
488 @classmethod
489 def getThreatEntryString(cls, threatEntry):
490 """
491 Class method to get the threat entry string.
492
493 @param threatEntry threat entry type as defined in the v4 API
494 @type str
495 @return threat entry string
496 @rtype str
497 """
498 if threatEntry == "URL":
499 return "URL"
500 elif threatEntry == "EXECUTABLE":
501 return QCoreApplication.translate(
502 "SafeBrowsingAPI", "executable program")
503 else:
504 return QCoreApplication.translate(
505 "SafeBrowsingAPI", "unknown type")
506
507 @classmethod
508 def definedThreatEntryTypes(cls):
509 """
510 Class method to get all threat entry types defined in API v4.
511
512 @return list of all defined threat entry types
513 @rtype list of str
514 """
515 return [
516 "THREAT_ENTRY_TYPE_UNSPECIFIED", "URL", "EXECUTABLE",
517 ]
518
519 @classmethod
520 def getPlatformString(cls, platformType):
521 """
522 Class method to get the platform string for a given platform type.
523
524 @param platformType platform type as defined in the v4 API
525 @type str
526 @return platform string
527 @rtype str
528 """
529 platformStrings = {
530 "WINDOWS": "Windows",
531 "LINUX": "Linux",
532 "ANDROID": "Android",
533 "OSX": "macOS",
534 "IOS": "iOS",
535 "CHROME": "Chrome OS",
536 }
537 if platformType in platformStrings:
538 return platformStrings[platformType]
539
540 if platformType == "ANY_PLATFORM":
541 return QCoreApplication.translate(
542 "SafeBrowsingAPI", "any defined platform")
543 elif platformType == "ALL_PLATFORMS":
544 return QCoreApplication.translate(
545 "SafeBrowsingAPI", "all defined platforms")
546 else:
547 return QCoreApplication.translate(
548 "SafeBrowsingAPI", "unknown platform")
549
550 @classmethod
551 def getPlatformTypes(cls, platform):
552 """
553 Class method to get the platform types for a given platform.
554
555 @param platform platform string
556 @type str (one of 'linux', 'windows', 'macos')
557 @return list of platform types as defined in the v4 API for the
558 given platform
559 @rtype list of str
560 @exception ValueError raised to indicate an invalid platform string
561 """
562 platform = platform.lower()
563
564 if platform not in ("linux", "windows", "macos"):
565 raise ValueError("Unsupported platform")
566
567 platformTypes = ["ANY_PLATFORM", "ALL_PLATFORMS"]
568 if platform == "linux":
569 platformTypes.append("LINUX")
570 elif platform == "windows":
571 platformTypes.append("WINDOWS")
572 else:
573 platformTypes.append("OSX")
574
575 return platformTypes
576
577 @classmethod
578 def definedPlatformTypes(cls):
579 """
580 Class method to get all platform types defined in API v4.
581
582 @return list of all defined platform types
583 @rtype list of str
584 """
585 return [
586 "PLATFORM_TYPE_UNSPECIFIED", "WINDOWS", "LINUX", "ANDROID", "OSX",
587 "IOS", "ANY_PLATFORM", "ALL_PLATFORMS", "CHROME",
588 ]

eric ide

mercurial