8 |
8 |
9 The vulnerability data is provided by the open Python vulnerability database |
9 The vulnerability data is provided by the open Python vulnerability database |
10 <a href="https://github.com/pyupio/safety-db">Safety DB</a>. |
10 <a href="https://github.com/pyupio/safety-db">Safety DB</a>. |
11 """ |
11 """ |
12 |
12 |
|
13 import collections |
13 import contextlib |
14 import contextlib |
14 import enum |
15 import enum |
15 import json |
16 import json |
16 import os |
17 import os |
17 import time |
18 import time |
18 from collections import namedtuple |
|
19 from dataclasses import dataclass |
19 from dataclasses import dataclass |
20 |
20 |
21 from packaging.specifiers import SpecifierSet |
21 from packaging.specifiers import SpecifierSet |
22 |
22 |
23 from PyQt6.QtCore import QCoreApplication, QObject, QThread, QUrl |
23 from PyQt6.QtCore import QCoreApplication, QObject, QThread, QUrl |
55 |
61 |
56 class PipVulnerabilityChecker(QObject): |
62 class PipVulnerabilityChecker(QObject): |
57 """ |
63 """ |
58 Class implementing a Python package vulnerability checker. |
64 Class implementing a Python package vulnerability checker. |
59 """ |
65 """ |
|
66 FullDbFile = "insecure_full.json" |
|
67 SummaryDbFile = "insecure.json" |
|
68 |
60 def __init__(self, pip, parent=None): |
69 def __init__(self, pip, parent=None): |
61 """ |
70 """ |
62 Constructor |
71 Constructor |
63 |
72 |
64 @param pip reference to the global pip interface |
73 @param pip reference to the global pip interface |
157 "db": data, |
166 "db": data, |
158 } |
167 } |
159 with open(self.__cacheFile, "w") as f: |
168 with open(self.__cacheFile, "w") as f: |
160 json.dump(cache, f, indent=2) |
169 json.dump(cache, f, indent=2) |
161 |
170 |
162 def __fetchVulnerabilityDatabase(self, full=False): |
171 def __fetchVulnerabilityDatabase(self, full=False, forceUpdate=False): |
163 """ |
172 """ |
164 Private method to get the data of the vulnerability database. |
173 Private method to get the data of the vulnerability database. |
165 |
174 |
166 If the cached data is still valid, this data will be used. |
175 If the cached data is still valid, this data will be used. |
167 Otherwise a copy of the requested database will be downloaded |
176 Otherwise a copy of the requested database will be downloaded |
168 and cached. |
177 and cached. |
169 |
178 |
170 @param full flag indicating to get the database containing the full |
179 @param full flag indicating to get the database containing the full |
171 data set (defaults to False) |
180 data set (defaults to False) |
172 @type bool (optional) |
181 @type bool (optional) |
|
182 @param forceUpdate flag indicating an update of the cache is required |
|
183 (defaults to False) |
|
184 @type bool (optional) |
173 @return dictionary containing the vulnerability data (full data set or |
185 @return dictionary containing the vulnerability data (full data set or |
174 just package name and version specifier) |
186 just package name and version specifier) |
175 """ |
187 """ |
176 dbName = "insecure_full.json" if full else "insecure.json" |
188 dbName = ( |
177 |
189 PipVulnerabilityChecker.FullDbFile |
178 cachedData = self.__getDataFromCache(dbName) |
190 if full else |
179 if cachedData: |
191 PipVulnerabilityChecker.SummaryDbFile |
180 return cachedData |
192 ) |
|
193 |
|
194 if not forceUpdate: |
|
195 cachedData = self.__getDataFromCache(dbName) |
|
196 if cachedData: |
|
197 return cachedData |
181 |
198 |
182 url = Preferences.getPip("VulnerabilityDbMirror") + dbName |
199 url = Preferences.getPip("VulnerabilityDbMirror") + dbName |
183 request = QNetworkRequest(QUrl(url)) |
200 request = QNetworkRequest(QUrl(url)) |
184 reply = self.__pip.getNetworkAccessManager().get(request) |
201 reply = self.__pip.getNetworkAccessManager().get(request) |
185 while not reply.isFinished(): |
202 while not reply.isFinished(): |
227 """ |
244 """ |
228 Public method to check the given packages for vulnerabilities. |
245 Public method to check the given packages for vulnerabilities. |
229 |
246 |
230 @param packages list of packages |
247 @param packages list of packages |
231 @type Package |
248 @type Package |
232 @return tuple containing an error status and the list of vulnerable |
249 @return tuple containing an error status and a dictionary containing |
233 packages detected |
250 detected vulnerable packages keyed by package name |
234 @rtype tuple of (VulnerabilityCheckError, list of Vulnerability) |
251 @rtype tuple of (VulnerabilityCheckError, list of Vulnerability) |
235 """ |
252 """ |
236 db = self.__fetchVulnerabilityDatabase() |
253 db = self.__fetchVulnerabilityDatabase() |
237 if not db: |
254 if not db: |
238 return VulnerabilityCheckError.SummaryDbUnavailable, [] |
255 return VulnerabilityCheckError.SummaryDbUnavailable, [] |
239 |
256 |
240 fullDb = None |
257 fullDb = None |
241 vulnerablePackages = frozenset(db.keys()) |
258 vulnerablePackages = frozenset(db.keys()) |
242 vulnerabilities = [] # TODO: fill this list |
259 vulnerabilities = collections.defaultdict(list) |
243 |
260 |
244 for package in packages: |
261 for package in packages: |
245 # normalize the package name, the safety-db is converting |
262 # normalize the package name, the safety-db is converting |
246 # underscores to dashes and uses lowercase |
263 # underscores to dashes and uses lowercase |
247 name = package.name.replace("_", "-").lower() |
264 name = package.name.replace("_", "-").lower() |
258 package=name, specifier=specifier, db=fullDb |
275 package=name, specifier=specifier, db=fullDb |
259 ): |
276 ): |
260 vulnarabilityId = ( |
277 vulnarabilityId = ( |
261 data.get("id").replace("pyup.io-", "") |
278 data.get("id").replace("pyup.io-", "") |
262 ) |
279 ) |
263 cveId = data.get("cve") |
280 cveId = data.get("cve", "") |
264 if cveId: |
281 if cveId: |
265 cveId = cveId.split(",", 1)[0].strip() |
282 cveId = cveId.split(",", 1)[0].strip() |
|
283 vulnerabilities[package.name].append(Vulnerability( |
|
284 name=name, |
|
285 spec=specifier, |
|
286 version=package.version, |
|
287 cve=cveId, |
|
288 advisory=data.get("advisory", ""), |
|
289 vulnerabilityId=vulnarabilityId |
|
290 )) |
266 |
291 |
267 return VulnerabilityCheckError.OK, vulnerabilities |
292 return VulnerabilityCheckError.OK, vulnerabilities |
|
293 |
|
294 def updateVulnerabilityDb(self): |
|
295 """ |
|
296 Public method to update the cache of the vulnerability databases. |
|
297 """ |
|
298 self.__fetchVulnerabilityDatabase(full=False, forceUpdate=True) |
|
299 self.__fetchVulnerabilityDatabase(full=True, forceUpdate=True) |