eric7/PipInterface/PipVulnerabilityChecker.py

branch
eric7
changeset 8978
38c3ddf21537
parent 8977
663521af48b2
child 9001
a00cd6b55728
equal deleted inserted replaced
8977:663521af48b2 8978:38c3ddf21537
8 8
9 The vulnerability data is provided by the open Python vulnerability database 9 The vulnerability data is provided by the open Python vulnerability database
10 <a href="https://github.com/pyupio/safety-db">Safety DB</a>. 10 <a href="https://github.com/pyupio/safety-db">Safety DB</a>.
11 """ 11 """
12 12
13 import collections
13 import contextlib 14 import contextlib
14 import enum 15 import enum
15 import json 16 import json
16 import os 17 import os
17 import time 18 import time
18 from collections import namedtuple
19 from dataclasses import dataclass 19 from dataclasses import dataclass
20 20
21 from packaging.specifiers import SpecifierSet 21 from packaging.specifiers import SpecifierSet
22 22
23 from PyQt6.QtCore import QCoreApplication, QObject, QThread, QUrl 23 from PyQt6.QtCore import QCoreApplication, QObject, QThread, QUrl
26 from EricWidgets import EricMessageBox 26 from EricWidgets import EricMessageBox
27 27
28 import Globals 28 import Globals
29 import Preferences 29 import Preferences
30 30
31 Package = namedtuple("Package", ["name", "version"]) 31 @dataclass
32 class Package:
33 """
34 Class containing the package data.
35 """
36 name: str # package name
37 version: str # version
32 38
33 39
34 @dataclass 40 @dataclass
35 class Vulnerability: 41 class Vulnerability:
36 """ 42 """
55 61
56 class PipVulnerabilityChecker(QObject): 62 class PipVulnerabilityChecker(QObject):
57 """ 63 """
58 Class implementing a Python package vulnerability checker. 64 Class implementing a Python package vulnerability checker.
59 """ 65 """
66 FullDbFile = "insecure_full.json"
67 SummaryDbFile = "insecure.json"
68
60 def __init__(self, pip, parent=None): 69 def __init__(self, pip, parent=None):
61 """ 70 """
62 Constructor 71 Constructor
63 72
64 @param pip reference to the global pip interface 73 @param pip reference to the global pip interface
157 "db": data, 166 "db": data,
158 } 167 }
159 with open(self.__cacheFile, "w") as f: 168 with open(self.__cacheFile, "w") as f:
160 json.dump(cache, f, indent=2) 169 json.dump(cache, f, indent=2)
161 170
162 def __fetchVulnerabilityDatabase(self, full=False): 171 def __fetchVulnerabilityDatabase(self, full=False, forceUpdate=False):
163 """ 172 """
164 Private method to get the data of the vulnerability database. 173 Private method to get the data of the vulnerability database.
165 174
166 If the cached data is still valid, this data will be used. 175 If the cached data is still valid, this data will be used.
167 Otherwise a copy of the requested database will be downloaded 176 Otherwise a copy of the requested database will be downloaded
168 and cached. 177 and cached.
169 178
170 @param full flag indicating to get the database containing the full 179 @param full flag indicating to get the database containing the full
171 data set (defaults to False) 180 data set (defaults to False)
172 @type bool (optional) 181 @type bool (optional)
182 @param forceUpdate flag indicating an update of the cache is required
183 (defaults to False)
184 @type bool (optional)
173 @return dictionary containing the vulnerability data (full data set or 185 @return dictionary containing the vulnerability data (full data set or
174 just package name and version specifier) 186 just package name and version specifier)
175 """ 187 """
176 dbName = "insecure_full.json" if full else "insecure.json" 188 dbName = (
177 189 PipVulnerabilityChecker.FullDbFile
178 cachedData = self.__getDataFromCache(dbName) 190 if full else
179 if cachedData: 191 PipVulnerabilityChecker.SummaryDbFile
180 return cachedData 192 )
193
194 if not forceUpdate:
195 cachedData = self.__getDataFromCache(dbName)
196 if cachedData:
197 return cachedData
181 198
182 url = Preferences.getPip("VulnerabilityDbMirror") + dbName 199 url = Preferences.getPip("VulnerabilityDbMirror") + dbName
183 request = QNetworkRequest(QUrl(url)) 200 request = QNetworkRequest(QUrl(url))
184 reply = self.__pip.getNetworkAccessManager().get(request) 201 reply = self.__pip.getNetworkAccessManager().get(request)
185 while not reply.isFinished(): 202 while not reply.isFinished():
227 """ 244 """
228 Public method to check the given packages for vulnerabilities. 245 Public method to check the given packages for vulnerabilities.
229 246
230 @param packages list of packages 247 @param packages list of packages
231 @type Package 248 @type Package
232 @return tuple containing an error status and the list of vulnerable 249 @return tuple containing an error status and a dictionary containing
233 packages detected 250 detected vulnerable packages keyed by package name
234 @rtype tuple of (VulnerabilityCheckError, list of Vulnerability) 251 @rtype tuple of (VulnerabilityCheckError, list of Vulnerability)
235 """ 252 """
236 db = self.__fetchVulnerabilityDatabase() 253 db = self.__fetchVulnerabilityDatabase()
237 if not db: 254 if not db:
238 return VulnerabilityCheckError.SummaryDbUnavailable, [] 255 return VulnerabilityCheckError.SummaryDbUnavailable, []
239 256
240 fullDb = None 257 fullDb = None
241 vulnerablePackages = frozenset(db.keys()) 258 vulnerablePackages = frozenset(db.keys())
242 vulnerabilities = [] # TODO: fill this list 259 vulnerabilities = collections.defaultdict(list)
243 260
244 for package in packages: 261 for package in packages:
245 # normalize the package name, the safety-db is converting 262 # normalize the package name, the safety-db is converting
246 # underscores to dashes and uses lowercase 263 # underscores to dashes and uses lowercase
247 name = package.name.replace("_", "-").lower() 264 name = package.name.replace("_", "-").lower()
258 package=name, specifier=specifier, db=fullDb 275 package=name, specifier=specifier, db=fullDb
259 ): 276 ):
260 vulnarabilityId = ( 277 vulnarabilityId = (
261 data.get("id").replace("pyup.io-", "") 278 data.get("id").replace("pyup.io-", "")
262 ) 279 )
263 cveId = data.get("cve") 280 cveId = data.get("cve", "")
264 if cveId: 281 if cveId:
265 cveId = cveId.split(",", 1)[0].strip() 282 cveId = cveId.split(",", 1)[0].strip()
283 vulnerabilities[package.name].append(Vulnerability(
284 name=name,
285 spec=specifier,
286 version=package.version,
287 cve=cveId,
288 advisory=data.get("advisory", ""),
289 vulnerabilityId=vulnarabilityId
290 ))
266 291
267 return VulnerabilityCheckError.OK, vulnerabilities 292 return VulnerabilityCheckError.OK, vulnerabilities
293
294 def updateVulnerabilityDb(self):
295 """
296 Public method to update the cache of the vulnerability databases.
297 """
298 self.__fetchVulnerabilityDatabase(full=False, forceUpdate=True)
299 self.__fetchVulnerabilityDatabase(full=True, forceUpdate=True)

eric ide

mercurial