|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2017 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the low level interface for Google Safe Browsing. |
|
8 """ |
|
9 |
|
10 import json |
|
11 import base64 |
|
12 |
|
13 from PyQt6.QtCore import ( |
|
14 pyqtSignal, QObject, QDateTime, QUrl, QByteArray, QCoreApplication, |
|
15 QEventLoop |
|
16 ) |
|
17 from PyQt6.QtNetwork import QNetworkRequest, QNetworkReply |
|
18 |
|
19 from WebBrowser.WebBrowserWindow import WebBrowserWindow |
|
20 |
|
21 from .SafeBrowsingThreatList import ThreatList |
|
22 |
|
23 |
|
24 class SafeBrowsingAPIClient(QObject): |
|
25 """ |
|
26 Class implementing the low level interface for Google Safe Browsing. |
|
27 |
|
28 @signal networkError(str) emitted to indicate a network error |
|
29 """ |
|
30 ClientId = "eric7_API_client" |
|
31 ClientVersion = "2.0.0" |
|
32 |
|
33 GsbUrlTemplate = "https://safebrowsing.googleapis.com/v4/{0}?key={1}" |
|
34 |
|
35 networkError = pyqtSignal(str) |
|
36 |
|
37 def __init__(self, apiKey, fairUse=True, parent=None): |
|
38 """ |
|
39 Constructor |
|
40 |
|
41 @param apiKey API key to be used |
|
42 @type str |
|
43 @param fairUse flag indicating to follow the fair use policy |
|
44 @type bool |
|
45 @param parent reference to the parent object |
|
46 @type QObject |
|
47 """ |
|
48 super().__init__(parent) |
|
49 |
|
50 self.__apiKey = apiKey |
|
51 self.__fairUse = fairUse |
|
52 |
|
53 self.__nextRequestNoSoonerThan = QDateTime() |
|
54 self.__failCount = 0 |
|
55 |
|
56 self.__lookupApiCache = {} |
|
57 # Temporary cache used by the lookup API (v4) |
|
58 # key: URL as string |
|
59 # value: dictionary with these entries: |
|
60 # "validUntil": (QDateTime) |
|
61 # "threatInfo": (list of ThreatList) |
|
62 |
|
63 def setApiKey(self, apiKey): |
|
64 """ |
|
65 Public method to set the API key. |
|
66 |
|
67 @param apiKey API key to be set |
|
68 @type str |
|
69 """ |
|
70 self.__apiKey = apiKey |
|
71 |
|
72 def getThreatLists(self): |
|
73 """ |
|
74 Public method to retrieve all available threat lists. |
|
75 |
|
76 @return tuple containing list of threat lists and an error message |
|
77 @rtype tuple of (list of dict containing 'threatType', 'platformType' |
|
78 and 'threatEntryType', bool) |
|
79 """ |
|
80 url = QUrl(self.GsbUrlTemplate.format("threatLists", self.__apiKey)) |
|
81 req = QNetworkRequest(url) |
|
82 reply = WebBrowserWindow.networkManager().get(req) |
|
83 |
|
84 while reply.isRunning(): |
|
85 QCoreApplication.processEvents( |
|
86 QEventLoop.ProcessEventsFlag.AllEvents, 200) |
|
87 # max. 200 ms processing |
|
88 |
|
89 res = None |
|
90 error = "" |
|
91 if reply.error() != QNetworkReply.NetworkError.NoError: |
|
92 error = reply.errorString() |
|
93 self.networkError.emit(error) |
|
94 else: |
|
95 result = self.__extractData(reply) |
|
96 res = result["threatLists"] |
|
97 |
|
98 reply.deleteLater() |
|
99 return res, error |
|
100 |
|
101 ####################################################################### |
|
102 ## Methods below implement the 'Update API (v4)' |
|
103 ####################################################################### |
|
104 |
|
105 def getThreatsUpdate(self, clientStates): |
|
106 """ |
|
107 Public method to fetch hash prefix updates for the given threat list. |
|
108 |
|
109 @param clientStates dictionary of client states with keys like |
|
110 (threatType, platformType, threatEntryType) |
|
111 @type dict |
|
112 @return tuple containing the list of threat updates and an error |
|
113 message |
|
114 @rtype tuple of (list of dict, bool) |
|
115 """ |
|
116 requestBody = { |
|
117 "client": { |
|
118 "clientId": self.ClientId, |
|
119 "clientVersion": self.ClientVersion, |
|
120 }, |
|
121 "listUpdateRequests": [], |
|
122 } |
|
123 |
|
124 for (threatType, platformType, threatEntryType), currentState in ( |
|
125 clientStates.items() |
|
126 ): |
|
127 requestBody["listUpdateRequests"].append( |
|
128 { |
|
129 "threatType": threatType, |
|
130 "platformType": platformType, |
|
131 "threatEntryType": threatEntryType, |
|
132 "state": currentState, |
|
133 "constraints": { |
|
134 "supportedCompressions": ["RAW"], |
|
135 } |
|
136 } |
|
137 ) |
|
138 |
|
139 data = QByteArray(json.dumps(requestBody).encode("utf-8")) |
|
140 url = QUrl(self.GsbUrlTemplate.format("threatListUpdates:fetch", |
|
141 self.__apiKey)) |
|
142 req = QNetworkRequest(url) |
|
143 req.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, |
|
144 "application/json") |
|
145 reply = WebBrowserWindow.networkManager().post(req, data) |
|
146 |
|
147 while reply.isRunning(): |
|
148 QCoreApplication.processEvents( |
|
149 QEventLoop.ProcessEventsFlag.AllEvents, 200) |
|
150 # max. 200 ms processing |
|
151 |
|
152 res = None |
|
153 error = "" |
|
154 if reply.error() != QNetworkReply.NetworkError.NoError: |
|
155 error = reply.errorString() |
|
156 self.networkError.emit(error) |
|
157 else: |
|
158 result = self.__extractData(reply) |
|
159 res = result["listUpdateResponses"] |
|
160 |
|
161 reply.deleteLater() |
|
162 return res, error |
|
163 |
|
164 def getFullHashes(self, prefixes, clientState): |
|
165 """ |
|
166 Public method to find full hashes matching hash prefixes. |
|
167 |
|
168 @param prefixes list of hash prefixes to find |
|
169 @type list of bytes |
|
170 @param clientState dictionary of client states with keys like |
|
171 (threatType, platformType, threatEntryType) |
|
172 @type dict |
|
173 @return dictionary containing the list of found hashes and the |
|
174 negative cache duration |
|
175 @rtype dict |
|
176 """ |
|
177 requestBody = { |
|
178 "client": { |
|
179 "clientId": self.ClientId, |
|
180 "clientVersion": self.ClientVersion, |
|
181 }, |
|
182 "clientStates": [], |
|
183 "threatInfo": { |
|
184 "threatTypes": [], |
|
185 "platformTypes": [], |
|
186 "threatEntryTypes": [], |
|
187 "threatEntries": [], |
|
188 }, |
|
189 } |
|
190 |
|
191 for prefix in prefixes: |
|
192 requestBody["threatInfo"]["threatEntries"].append( |
|
193 {"hash": base64.b64encode(prefix).decode("ascii")}) |
|
194 |
|
195 for (threatType, platformType, threatEntryType), currentState in ( |
|
196 clientState.items() |
|
197 ): |
|
198 requestBody["clientStates"].append(currentState) |
|
199 if threatType not in requestBody["threatInfo"]["threatTypes"]: |
|
200 requestBody["threatInfo"]["threatTypes"].append(threatType) |
|
201 if ( |
|
202 platformType not in |
|
203 requestBody["threatInfo"]["platformTypes"] |
|
204 ): |
|
205 requestBody["threatInfo"]["platformTypes"].append( |
|
206 platformType) |
|
207 if ( |
|
208 threatEntryType not in |
|
209 requestBody["threatInfo"]["threatEntryTypes"] |
|
210 ): |
|
211 requestBody["threatInfo"]["threatEntryTypes"].append( |
|
212 threatEntryType) |
|
213 |
|
214 data = QByteArray(json.dumps(requestBody).encode("utf-8")) |
|
215 url = QUrl(self.GsbUrlTemplate.format("fullHashes:find", |
|
216 self.__apiKey)) |
|
217 req = QNetworkRequest(url) |
|
218 req.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, |
|
219 "application/json") |
|
220 reply = WebBrowserWindow.networkManager().post(req, data) |
|
221 |
|
222 while reply.isRunning(): |
|
223 QCoreApplication.processEvents( |
|
224 QEventLoop.ProcessEventsFlag.AllEvents, 200) |
|
225 # max. 200 ms processing |
|
226 |
|
227 res = [] |
|
228 if reply.error() != QNetworkReply.NetworkError.NoError: |
|
229 self.networkError.emit(reply.errorString()) |
|
230 else: |
|
231 res = self.__extractData(reply) |
|
232 |
|
233 reply.deleteLater() |
|
234 return res |
|
235 |
|
236 def __extractData(self, reply): |
|
237 """ |
|
238 Private method to extract the data of a network reply. |
|
239 |
|
240 @param reply reference to the network reply object |
|
241 @type QNetworkReply |
|
242 @return extracted data |
|
243 @rtype list or dict |
|
244 """ |
|
245 result = json.loads(str(reply.readAll(), "utf-8")) |
|
246 self.__setWaitDuration(result.get("minimumWaitDuration")) |
|
247 return result |
|
248 |
|
249 def __setWaitDuration(self, minimumWaitDuration): |
|
250 """ |
|
251 Private method to set the minimum wait duration. |
|
252 |
|
253 @param minimumWaitDuration duration to be set |
|
254 @type str |
|
255 """ |
|
256 if not self.__fairUse or minimumWaitDuration is None: |
|
257 self.__nextRequestNoSoonerThan = QDateTime() |
|
258 else: |
|
259 waitDuration = int(float(minimumWaitDuration.rstrip("s"))) |
|
260 self.__nextRequestNoSoonerThan = ( |
|
261 QDateTime.currentDateTime().addSecs(waitDuration) |
|
262 ) |
|
263 |
|
264 def fairUseDelayExpired(self): |
|
265 """ |
|
266 Public method to check, if the fair use wait period has expired. |
|
267 |
|
268 @return flag indicating expiration |
|
269 @rtype bool |
|
270 """ |
|
271 return ( |
|
272 self.__fairUse and |
|
273 QDateTime.currentDateTime() >= self.__nextRequestNoSoonerThan |
|
274 ) or not self.__fairUse |
|
275 |
|
276 def getFairUseDelayExpirationDateTime(self): |
|
277 """ |
|
278 Public method to get the date and time the fair use delay will expire. |
|
279 |
|
280 @return fair use delay expiration date and time |
|
281 @rtype QDateTime |
|
282 """ |
|
283 return self.__nextRequestNoSoonerThan |
|
284 |
|
285 ####################################################################### |
|
286 ## Methods below implement the 'Lookup API (v4)' |
|
287 ####################################################################### |
|
288 |
|
289 def lookupUrl(self, url, platforms): |
|
290 """ |
|
291 Public method to send an URL to Google for checking. |
|
292 |
|
293 @param url URL to be checked |
|
294 @type QUrl |
|
295 @param platforms list of platform types to check against |
|
296 @type list of str |
|
297 @return tuple containing the list of threat list info objects and |
|
298 an error message |
|
299 @rtype tuple of (list of ThreatList, str) |
|
300 """ |
|
301 error = "" |
|
302 |
|
303 # sanitize the URL by removing user info and query data |
|
304 url = url.adjusted( |
|
305 QUrl.UrlFormattingOption.RemoveUserInfo | |
|
306 QUrl.UrlFormattingOption.RemoveQuery | |
|
307 QUrl.UrlFormattingOption.RemoveFragment |
|
308 ) |
|
309 urlStr = url.toString() |
|
310 |
|
311 # check the local cache first |
|
312 if urlStr in self.__lookupApiCache: |
|
313 if ( |
|
314 self.__lookupApiCache[urlStr]["validUntil"] > |
|
315 QDateTime.currentDateTime() |
|
316 ): |
|
317 # cached entry is still valid |
|
318 return self.__lookupApiCache[urlStr]["threatInfo"], error |
|
319 else: |
|
320 del self.__lookupApiCache[urlStr] |
|
321 |
|
322 # no valid entry found, ask the safe browsing server |
|
323 requestBody = { |
|
324 "client": { |
|
325 "clientId": self.ClientId, |
|
326 "clientVersion": self.ClientVersion, |
|
327 }, |
|
328 "threatInfo": { |
|
329 "threatTypes": SafeBrowsingAPIClient.definedThreatTypes(), |
|
330 "platformTypes": platforms, |
|
331 "threatEntryTypes": |
|
332 SafeBrowsingAPIClient.definedThreatEntryTypes(), |
|
333 "threatEntries": [ |
|
334 {"url": urlStr}, |
|
335 ], |
|
336 }, |
|
337 } |
|
338 |
|
339 data = QByteArray(json.dumps(requestBody).encode("utf-8")) |
|
340 url = QUrl(self.GsbUrlTemplate.format("threatMatches:find", |
|
341 self.__apiKey)) |
|
342 req = QNetworkRequest(url) |
|
343 req.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, |
|
344 "application/json") |
|
345 reply = WebBrowserWindow.networkManager().post(req, data) |
|
346 |
|
347 while reply.isRunning(): |
|
348 QCoreApplication.processEvents( |
|
349 QEventLoop.ProcessEventsFlag.AllEvents, 200) |
|
350 # max. 200 ms processing |
|
351 |
|
352 threats = [] |
|
353 if reply.error() != QNetworkReply.NetworkError.NoError: |
|
354 error = reply.errorString() |
|
355 self.networkError.emit(error) |
|
356 else: |
|
357 res = json.loads(str(reply.readAll(), "utf-8")) |
|
358 if res and "matches" in res: |
|
359 cacheDuration = 0 |
|
360 for match in res["matches"]: |
|
361 threatInfo = ThreatList( |
|
362 match["threatType"], |
|
363 match["platformType"], |
|
364 match["threatEntryType"], |
|
365 ) |
|
366 threats.append(threatInfo) |
|
367 if "cacheDuration" in match: |
|
368 cacheDurationSec = int( |
|
369 match["cacheDuration"].strip().rstrip("s") |
|
370 .split(".")[0]) |
|
371 if cacheDurationSec > cacheDuration: |
|
372 cacheDuration = cacheDurationSec |
|
373 if cacheDuration > 0 and bool(threats): |
|
374 validUntil = QDateTime.currentDateTime().addSecs( |
|
375 cacheDuration) |
|
376 self.__lookupApiCache[urlStr] = { |
|
377 "validUntil": validUntil, |
|
378 "threatInfo": threats |
|
379 } |
|
380 |
|
381 reply.deleteLater() |
|
382 return threats, error |
|
383 |
|
384 ####################################################################### |
|
385 ## Methods below implement global (class wide) functionality |
|
386 ####################################################################### |
|
387 |
|
388 @classmethod |
|
389 def getThreatMessage(cls, threatType): |
|
390 """ |
|
391 Class method to get a warning message for the given threat type. |
|
392 |
|
393 @param threatType threat type to get the message for |
|
394 @type str |
|
395 @return threat message |
|
396 @rtype str |
|
397 """ |
|
398 threatType = threatType.lower() |
|
399 if threatType == "malware": |
|
400 msg = QCoreApplication.translate( |
|
401 "SafeBrowsingAPI", |
|
402 "<h3>Malware Warning</h3>" |
|
403 "<p>The web site you are about to visit may try to install" |
|
404 " harmful programs on your computer in order to steal or" |
|
405 " destroy your data.</p>") |
|
406 elif threatType == "social_engineering": |
|
407 msg = QCoreApplication.translate( |
|
408 "SafeBrowsingAPI", |
|
409 "<h3>Phishing Warning</h3>" |
|
410 "<p>The web site you are about to visit may try to trick you" |
|
411 " into doing something dangerous online, such as revealing" |
|
412 " passwords or personal information, usually through a fake" |
|
413 " website.</p>") |
|
414 elif threatType == "unwanted_software": |
|
415 msg = QCoreApplication.translate( |
|
416 "SafeBrowsingAPI", |
|
417 "<h3>Unwanted Software Warning</h3>" |
|
418 "<p>The software you are about to download may negatively" |
|
419 " affect your browsing or computing experience.</p>") |
|
420 elif threatType == "potentially_harmful_application": |
|
421 msg = QCoreApplication.translate( |
|
422 "SafeBrowsingAPI", |
|
423 "<h3>Potentially Harmful Application</h3>" |
|
424 "<p>The web site you are about to visit may try to trick you" |
|
425 " into installing applications, that may negatively affect" |
|
426 " your browsing experience.</p>") |
|
427 elif threatType == "malicious_binary": |
|
428 msg = QCoreApplication.translate( |
|
429 "SafeBrowsingAPI", |
|
430 "<h3>Malicious Binary Warning</h3>" |
|
431 "<p>The software you are about to download may be harmful" |
|
432 " to your computer.</p>") |
|
433 else: |
|
434 # unknow threat |
|
435 msg = QCoreApplication.translate( |
|
436 "SafeBrowsingAPI", |
|
437 "<h3>Unknown Threat Warning</h3>" |
|
438 "<p>The web site you are about to visit was found in the Safe" |
|
439 " Browsing Database but was not classified yet.</p>") |
|
440 |
|
441 return msg |
|
442 |
|
443 @classmethod |
|
444 def getThreatType(cls, threatType): |
|
445 """ |
|
446 Class method to get a display string for a given threat type. |
|
447 |
|
448 @param threatType threat type to get display string for |
|
449 @type str |
|
450 @return display string |
|
451 @rtype str |
|
452 """ |
|
453 threatType = threatType.lower() |
|
454 if threatType == "malware": |
|
455 displayString = QCoreApplication.translate( |
|
456 "SafeBrowsingAPI", "Malware") |
|
457 elif threatType == "social_engineering": |
|
458 displayString = QCoreApplication.translate( |
|
459 "SafeBrowsingAPI", "Phishing") |
|
460 elif threatType == "unwanted_software": |
|
461 displayString = QCoreApplication.translate( |
|
462 "SafeBrowsingAPI", "Unwanted Software") |
|
463 elif threatType == "potentially_harmful_application": |
|
464 displayString = QCoreApplication.translate( |
|
465 "SafeBrowsingAPI", "Harmful Application") |
|
466 elif threatType == "malcious_binary": |
|
467 displayString = QCoreApplication.translate( |
|
468 "SafeBrowsingAPI", "Malicious Binary") |
|
469 else: |
|
470 displayString = QCoreApplication.translate( |
|
471 "SafeBrowsingAPI", "Unknown Threat") |
|
472 |
|
473 return displayString |
|
474 |
|
475 @classmethod |
|
476 def definedThreatTypes(cls): |
|
477 """ |
|
478 Class method to get all threat types defined in API v4. |
|
479 |
|
480 @return list of defined threat types |
|
481 @rtype list of str |
|
482 """ |
|
483 return [ |
|
484 "THREAT_TYPE_UNSPECIFIED", "MALWARE", "SOCIAL_ENGINEERING", |
|
485 "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION", |
|
486 ] |
|
487 |
|
488 @classmethod |
|
489 def getThreatEntryString(cls, threatEntry): |
|
490 """ |
|
491 Class method to get the threat entry string. |
|
492 |
|
493 @param threatEntry threat entry type as defined in the v4 API |
|
494 @type str |
|
495 @return threat entry string |
|
496 @rtype str |
|
497 """ |
|
498 if threatEntry == "URL": |
|
499 return "URL" |
|
500 elif threatEntry == "EXECUTABLE": |
|
501 return QCoreApplication.translate( |
|
502 "SafeBrowsingAPI", "executable program") |
|
503 else: |
|
504 return QCoreApplication.translate( |
|
505 "SafeBrowsingAPI", "unknown type") |
|
506 |
|
507 @classmethod |
|
508 def definedThreatEntryTypes(cls): |
|
509 """ |
|
510 Class method to get all threat entry types defined in API v4. |
|
511 |
|
512 @return list of all defined threat entry types |
|
513 @rtype list of str |
|
514 """ |
|
515 return [ |
|
516 "THREAT_ENTRY_TYPE_UNSPECIFIED", "URL", "EXECUTABLE", |
|
517 ] |
|
518 |
|
519 @classmethod |
|
520 def getPlatformString(cls, platformType): |
|
521 """ |
|
522 Class method to get the platform string for a given platform type. |
|
523 |
|
524 @param platformType platform type as defined in the v4 API |
|
525 @type str |
|
526 @return platform string |
|
527 @rtype str |
|
528 """ |
|
529 platformStrings = { |
|
530 "WINDOWS": "Windows", |
|
531 "LINUX": "Linux", |
|
532 "ANDROID": "Android", |
|
533 "OSX": "macOS", |
|
534 "IOS": "iOS", |
|
535 "CHROME": "Chrome OS", |
|
536 } |
|
537 if platformType in platformStrings: |
|
538 return platformStrings[platformType] |
|
539 |
|
540 if platformType == "ANY_PLATFORM": |
|
541 return QCoreApplication.translate( |
|
542 "SafeBrowsingAPI", "any defined platform") |
|
543 elif platformType == "ALL_PLATFORMS": |
|
544 return QCoreApplication.translate( |
|
545 "SafeBrowsingAPI", "all defined platforms") |
|
546 else: |
|
547 return QCoreApplication.translate( |
|
548 "SafeBrowsingAPI", "unknown platform") |
|
549 |
|
550 @classmethod |
|
551 def getPlatformTypes(cls, platform): |
|
552 """ |
|
553 Class method to get the platform types for a given platform. |
|
554 |
|
555 @param platform platform string |
|
556 @type str (one of 'linux', 'windows', 'macos') |
|
557 @return list of platform types as defined in the v4 API for the |
|
558 given platform |
|
559 @rtype list of str |
|
560 @exception ValueError raised to indicate an invalid platform string |
|
561 """ |
|
562 platform = platform.lower() |
|
563 |
|
564 if platform not in ("linux", "windows", "macos"): |
|
565 raise ValueError("Unsupported platform") |
|
566 |
|
567 platformTypes = ["ANY_PLATFORM", "ALL_PLATFORMS"] |
|
568 if platform == "linux": |
|
569 platformTypes.append("LINUX") |
|
570 elif platform == "windows": |
|
571 platformTypes.append("WINDOWS") |
|
572 else: |
|
573 platformTypes.append("OSX") |
|
574 |
|
575 return platformTypes |
|
576 |
|
577 @classmethod |
|
578 def definedPlatformTypes(cls): |
|
579 """ |
|
580 Class method to get all platform types defined in API v4. |
|
581 |
|
582 @return list of all defined platform types |
|
583 @rtype list of str |
|
584 """ |
|
585 return [ |
|
586 "PLATFORM_TYPE_UNSPECIFIED", "WINDOWS", "LINUX", "ANDROID", "OSX", |
|
587 "IOS", "ANY_PLATFORM", "ALL_PLATFORMS", "CHROME", |
|
588 ] |