|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing a Python package vulnerability checker. |
|
8 |
|
9 The vulnerability data is provided by the open Python vulnerability database |
|
10 <a href="https://github.com/pyupio/safety-db">Safety DB</a>. |
|
11 """ |
|
12 |
|
13 import collections |
|
14 import contextlib |
|
15 import enum |
|
16 import json |
|
17 import os |
|
18 import time |
|
19 from dataclasses import dataclass |
|
20 |
|
21 from packaging.specifiers import SpecifierSet |
|
22 |
|
23 from PyQt6.QtCore import QCoreApplication, QObject, QThread, QUrl |
|
24 from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest |
|
25 |
|
26 from EricWidgets import EricMessageBox |
|
27 |
|
28 import Globals |
|
29 import Preferences |
|
30 |
|
31 |
|
32 @dataclass |
|
33 class Package: |
|
34 """ |
|
35 Class containing the package data. |
|
36 """ |
|
37 name: str # package name |
|
38 version: str # version |
|
39 |
|
40 |
|
41 @dataclass |
|
42 class Vulnerability: |
|
43 """ |
|
44 Class containing the vulnerability data. |
|
45 """ |
|
46 name: str # package name |
|
47 spec: dict # package specification record |
|
48 version: str # package version |
|
49 cve: str # CVE ID |
|
50 advisory: str # CVE advisory text |
|
51 vulnerabilityId: str # vulnerability ID |
|
52 |
|
53 |
|
54 class VulnerabilityCheckError(enum.Enum): |
|
55 """ |
|
56 Class defining various vulnerability check error states. |
|
57 """ |
|
58 OK = 0 |
|
59 SummaryDbUnavailable = 1 |
|
60 FullDbUnavailable = 2 |
|
61 |
|
62 |
|
63 class PipVulnerabilityChecker(QObject): |
|
64 """ |
|
65 Class implementing a Python package vulnerability checker. |
|
66 """ |
|
67 FullDbFile = "insecure_full.json" |
|
68 SummaryDbFile = "insecure.json" |
|
69 |
|
70 def __init__(self, pip, parent=None): |
|
71 """ |
|
72 Constructor |
|
73 |
|
74 @param pip reference to the global pip interface |
|
75 @type Pip |
|
76 @param parent reference to the parent widget (defaults to None) |
|
77 @type QWidget (optional) |
|
78 """ |
|
79 super().__init__(parent) |
|
80 |
|
81 self.__pip = pip |
|
82 |
|
83 securityDir = os.path.join(Globals.getConfigDir(), "security") |
|
84 os.makedirs(securityDir, mode=0o700, exist_ok=True) |
|
85 self.__cacheFile = os.path.join(securityDir, |
|
86 "vulnerability_cache.json") |
|
87 if not os.path.exists(self.__cacheFile): |
|
88 self.__createCacheFile() |
|
89 |
|
90 def __createCacheFile(self): |
|
91 """ |
|
92 Private method to create the cache file. |
|
93 |
|
94 The cache file has the following structure. |
|
95 { |
|
96 "insecure.json": { |
|
97 "cachedAt": 12345678 |
|
98 "db": {} |
|
99 }, |
|
100 "insecure_full.json": { |
|
101 "cachedAt": 12345678 |
|
102 "db": {} |
|
103 }, |
|
104 } |
|
105 """ |
|
106 structure = { |
|
107 "insecure.json": { |
|
108 "cachedAt": 0, |
|
109 "db": {}, |
|
110 }, |
|
111 "insecure_full.json": { |
|
112 "cachedAt": 0, |
|
113 "db": {}, |
|
114 }, |
|
115 } |
|
116 with open(self.__cacheFile, "w") as f: |
|
117 json.dump(structure, f, indent=2) |
|
118 |
|
119 def __getDataFromCache(self, dbName): |
|
120 """ |
|
121 Private method to get the vulnerability database from the cache. |
|
122 |
|
123 @param dbName name of the vulnerability database |
|
124 @type str |
|
125 @return dictionary containing the requested vulnerability data |
|
126 @rtype dict |
|
127 """ |
|
128 if os.path.exists(self.__cacheFile): |
|
129 with open(self.__cacheFile, "r") as f: # __IGNORE_WARNING_Y117__ |
|
130 with contextlib.suppress(json.JSONDecodeError, OSError): |
|
131 cachedData = json.load(f) |
|
132 if ( |
|
133 dbName in cachedData and |
|
134 "cachedAt" in cachedData[dbName] |
|
135 ): |
|
136 cacheValidPeriod = Preferences.getPip( |
|
137 "VulnerabilityDbCacheValidity") |
|
138 if ( |
|
139 cachedData[dbName]["cachedAt"] + cacheValidPeriod > |
|
140 time.time() |
|
141 ): |
|
142 return cachedData[dbName]["db"] |
|
143 |
|
144 return {} |
|
145 |
|
146 def __writeDataToCache(self, dbName, data): |
|
147 """ |
|
148 Private method to write the vulnerability data for a database to the |
|
149 cache. |
|
150 |
|
151 @param dbName name of the vulnerability database |
|
152 @type str |
|
153 @param data dictionary containing the vulnerability data |
|
154 @type dict |
|
155 """ |
|
156 if not os.path.exists(self.__cacheFile): |
|
157 self.__createCacheFile() |
|
158 |
|
159 with open(self.__cacheFile, "r") as f: |
|
160 try: |
|
161 cache = json.load(f) |
|
162 except json.JSONDecodeError: |
|
163 cache = {} |
|
164 |
|
165 cache[dbName] = { |
|
166 "cachedAt": time.time(), |
|
167 "db": data, |
|
168 } |
|
169 with open(self.__cacheFile, "w") as f: |
|
170 json.dump(cache, f, indent=2) |
|
171 |
|
172 def __fetchVulnerabilityDatabase(self, full=False, forceUpdate=False): |
|
173 """ |
|
174 Private method to get the data of the vulnerability database. |
|
175 |
|
176 If the cached data is still valid, this data will be used. |
|
177 Otherwise a copy of the requested database will be downloaded |
|
178 and cached. |
|
179 |
|
180 @param full flag indicating to get the database containing the full |
|
181 data set (defaults to False) |
|
182 @type bool (optional) |
|
183 @param forceUpdate flag indicating an update of the cache is required |
|
184 (defaults to False) |
|
185 @type bool (optional) |
|
186 @return dictionary containing the vulnerability data (full data set or |
|
187 just package name and version specifier) |
|
188 """ |
|
189 dbName = ( |
|
190 PipVulnerabilityChecker.FullDbFile |
|
191 if full else |
|
192 PipVulnerabilityChecker.SummaryDbFile |
|
193 ) |
|
194 |
|
195 if not forceUpdate: |
|
196 cachedData = self.__getDataFromCache(dbName) |
|
197 if cachedData: |
|
198 return cachedData |
|
199 |
|
200 url = Preferences.getPip("VulnerabilityDbMirror") + dbName |
|
201 request = QNetworkRequest(QUrl(url)) |
|
202 reply = self.__pip.getNetworkAccessManager().get(request) |
|
203 while not reply.isFinished(): |
|
204 QCoreApplication.processEvents() |
|
205 QThread.msleep(100) |
|
206 |
|
207 reply.deleteLater() |
|
208 if reply.error() == QNetworkReply.NetworkError.NoError: |
|
209 data = str(reply.readAll(), |
|
210 Preferences.getSystem("IOEncoding"), |
|
211 'replace') |
|
212 with contextlib.suppress(json.JSONDecodeError): |
|
213 data = json.loads(data) |
|
214 self.__writeDataToCache(dbName, data) |
|
215 return data |
|
216 |
|
217 EricMessageBox.critical( |
|
218 None, |
|
219 self.tr("Fetching Vulnerability Database"), |
|
220 self.tr( |
|
221 """<p>The vulnerability database <b>{0}</b> could not""" |
|
222 """ be loaded from <b>{1}</b>.</p><p>The vulnerability""" |
|
223 """ check is not available.</p>""" |
|
224 ).format(dbName, Preferences.getPip("VulnerabilityDbMirror")) |
|
225 ) |
|
226 return {} |
|
227 |
|
228 def __getVulnerabilities(self, package, specifier, db): |
|
229 """ |
|
230 Private method to get the vulnerabilities for a package. |
|
231 |
|
232 @param package name of the package |
|
233 @type str |
|
234 @param specifier package specifier |
|
235 @type Specifier |
|
236 @param db vulnerability data |
|
237 @type dict |
|
238 @yield dictionary containing the vulnerability data for the package |
|
239 @ytype dict |
|
240 """ |
|
241 for entry in db[package]: |
|
242 for entrySpec in entry["specs"]: |
|
243 if entrySpec == specifier: |
|
244 yield entry |
|
245 |
|
246 def check(self, packages): |
|
247 """ |
|
248 Public method to check the given packages for vulnerabilities. |
|
249 |
|
250 @param packages list of packages |
|
251 @type Package |
|
252 @return tuple containing an error status and a dictionary containing |
|
253 detected vulnerable packages keyed by package name |
|
254 @rtype tuple of (VulnerabilityCheckError, list of Vulnerability) |
|
255 """ |
|
256 db = self.__fetchVulnerabilityDatabase() |
|
257 if not db: |
|
258 return VulnerabilityCheckError.SummaryDbUnavailable, [] |
|
259 |
|
260 fullDb = None |
|
261 vulnerablePackages = frozenset(db.keys()) |
|
262 vulnerabilities = collections.defaultdict(list) |
|
263 |
|
264 for package in packages: |
|
265 # normalize the package name, the safety-db is converting |
|
266 # underscores to dashes and uses lowercase |
|
267 name = package.name.replace("_", "-").lower() |
|
268 |
|
269 if name in vulnerablePackages: |
|
270 # we have a candidate here, build the spec set |
|
271 for specifier in db[name]: |
|
272 specifierSet = SpecifierSet(specifiers=specifier) |
|
273 if specifierSet.contains(package.version): |
|
274 if not fullDb: |
|
275 fullDb = self.__fetchVulnerabilityDatabase( |
|
276 full=True) |
|
277 for data in self.__getVulnerabilities( |
|
278 package=name, specifier=specifier, db=fullDb |
|
279 ): |
|
280 vulnarabilityId = ( |
|
281 data.get("id").replace("pyup.io-", "") |
|
282 ) |
|
283 cveId = data.get("cve", "") |
|
284 if cveId: |
|
285 cveId = cveId.split(",", 1)[0].strip() |
|
286 vulnerabilities[package.name].append(Vulnerability( |
|
287 name=name, |
|
288 spec=specifier, |
|
289 version=package.version, |
|
290 cve=cveId, |
|
291 advisory=data.get("advisory", ""), |
|
292 vulnerabilityId=vulnarabilityId |
|
293 )) |
|
294 |
|
295 return VulnerabilityCheckError.OK, vulnerabilities |
|
296 |
|
297 def updateVulnerabilityDb(self): |
|
298 """ |
|
299 Public method to update the cache of the vulnerability databases. |
|
300 """ |
|
301 self.__fetchVulnerabilityDatabase(full=False, forceUpdate=True) |
|
302 self.__fetchVulnerabilityDatabase(full=True, forceUpdate=True) |