src/eric7/PipInterface/PipVulnerabilityChecker.py

branch
eric7
changeset 9221
bf71ee032bb4
parent 9209
b99e7fd55fd3
child 9413
80c06d472826
equal deleted inserted replaced
9220:e9e7eca7efee 9221:bf71ee032bb4
32 @dataclass 32 @dataclass
33 class Package: 33 class Package:
34 """ 34 """
35 Class containing the package data. 35 Class containing the package data.
36 """ 36 """
37 name: str # package name 37
38 version: str # version 38 name: str # package name
39 version: str # version
39 40
40 41
41 @dataclass 42 @dataclass
42 class Vulnerability: 43 class Vulnerability:
43 """ 44 """
44 Class containing the vulnerability data. 45 Class containing the vulnerability data.
45 """ 46 """
46 name: str # package name 47
47 spec: dict # package specification record 48 name: str # package name
48 version: str # package version 49 spec: dict # package specification record
49 cve: str # CVE ID 50 version: str # package version
50 advisory: str # CVE advisory text 51 cve: str # CVE ID
51 vulnerabilityId: str # vulnerability ID 52 advisory: str # CVE advisory text
53 vulnerabilityId: str # vulnerability ID
52 54
53 55
54 class VulnerabilityCheckError(enum.Enum): 56 class VulnerabilityCheckError(enum.Enum):
55 """ 57 """
56 Class defining various vulnerability check error states. 58 Class defining various vulnerability check error states.
57 """ 59 """
60
58 OK = 0 61 OK = 0
59 SummaryDbUnavailable = 1 62 SummaryDbUnavailable = 1
60 FullDbUnavailable = 2 63 FullDbUnavailable = 2
61 64
62 65
63 class PipVulnerabilityChecker(QObject): 66 class PipVulnerabilityChecker(QObject):
64 """ 67 """
65 Class implementing a Python package vulnerability checker. 68 Class implementing a Python package vulnerability checker.
66 """ 69 """
70
67 FullDbFile = "insecure_full.json" 71 FullDbFile = "insecure_full.json"
68 SummaryDbFile = "insecure.json" 72 SummaryDbFile = "insecure.json"
69 73
70 def __init__(self, pip, parent=None): 74 def __init__(self, pip, parent=None):
71 """ 75 """
72 Constructor 76 Constructor
73 77
74 @param pip reference to the global pip interface 78 @param pip reference to the global pip interface
75 @type Pip 79 @type Pip
76 @param parent reference to the parent widget (defaults to None) 80 @param parent reference to the parent widget (defaults to None)
77 @type QWidget (optional) 81 @type QWidget (optional)
78 """ 82 """
79 super().__init__(parent) 83 super().__init__(parent)
80 84
81 self.__pip = pip 85 self.__pip = pip
82 86
83 securityDir = os.path.join(Globals.getConfigDir(), "security") 87 securityDir = os.path.join(Globals.getConfigDir(), "security")
84 os.makedirs(securityDir, mode=0o700, exist_ok=True) 88 os.makedirs(securityDir, mode=0o700, exist_ok=True)
85 self.__cacheFile = os.path.join(securityDir, 89 self.__cacheFile = os.path.join(securityDir, "vulnerability_cache.json")
86 "vulnerability_cache.json")
87 if not os.path.exists(self.__cacheFile): 90 if not os.path.exists(self.__cacheFile):
88 self.__createCacheFile() 91 self.__createCacheFile()
89 92
90 def __createCacheFile(self): 93 def __createCacheFile(self):
91 """ 94 """
92 Private method to create the cache file. 95 Private method to create the cache file.
93 96
94 The cache file has the following structure. 97 The cache file has the following structure.
95 { 98 {
96 "insecure.json": { 99 "insecure.json": {
97 "cachedAt": 12345678 100 "cachedAt": 12345678
98 "db": {} 101 "db": {}
113 "db": {}, 116 "db": {},
114 }, 117 },
115 } 118 }
116 with open(self.__cacheFile, "w") as f: 119 with open(self.__cacheFile, "w") as f:
117 json.dump(structure, f, indent=2) 120 json.dump(structure, f, indent=2)
118 121
119 def __getDataFromCache(self, dbName): 122 def __getDataFromCache(self, dbName):
120 """ 123 """
121 Private method to get the vulnerability database from the cache. 124 Private method to get the vulnerability database from the cache.
122 125
123 @param dbName name of the vulnerability database 126 @param dbName name of the vulnerability database
124 @type str 127 @type str
125 @return dictionary containing the requested vulnerability data 128 @return dictionary containing the requested vulnerability data
126 @rtype dict 129 @rtype dict
127 """ 130 """
128 if os.path.exists(self.__cacheFile): 131 if os.path.exists(self.__cacheFile):
129 with open(self.__cacheFile, "r") as f: # __IGNORE_WARNING_Y117__ 132 with open(self.__cacheFile, "r") as f: # __IGNORE_WARNING_Y117__
130 with contextlib.suppress(json.JSONDecodeError, OSError): 133 with contextlib.suppress(json.JSONDecodeError, OSError):
131 cachedData = json.load(f) 134 cachedData = json.load(f)
132 if ( 135 if dbName in cachedData and "cachedAt" in cachedData[dbName]:
133 dbName in cachedData and
134 "cachedAt" in cachedData[dbName]
135 ):
136 cacheValidPeriod = Preferences.getPip( 136 cacheValidPeriod = Preferences.getPip(
137 "VulnerabilityDbCacheValidity") 137 "VulnerabilityDbCacheValidity"
138 )
138 if ( 139 if (
139 cachedData[dbName]["cachedAt"] + cacheValidPeriod > 140 cachedData[dbName]["cachedAt"] + cacheValidPeriod
140 time.time() 141 > time.time()
141 ): 142 ):
142 return cachedData[dbName]["db"] 143 return cachedData[dbName]["db"]
143 144
144 return {} 145 return {}
145 146
146 def __writeDataToCache(self, dbName, data): 147 def __writeDataToCache(self, dbName, data):
147 """ 148 """
148 Private method to write the vulnerability data for a database to the 149 Private method to write the vulnerability data for a database to the
149 cache. 150 cache.
150 151
151 @param dbName name of the vulnerability database 152 @param dbName name of the vulnerability database
152 @type str 153 @type str
153 @param data dictionary containing the vulnerability data 154 @param data dictionary containing the vulnerability data
154 @type dict 155 @type dict
155 """ 156 """
156 if not os.path.exists(self.__cacheFile): 157 if not os.path.exists(self.__cacheFile):
157 self.__createCacheFile() 158 self.__createCacheFile()
158 159
159 with open(self.__cacheFile, "r") as f: 160 with open(self.__cacheFile, "r") as f:
160 try: 161 try:
161 cache = json.load(f) 162 cache = json.load(f)
162 except json.JSONDecodeError: 163 except json.JSONDecodeError:
163 cache = {} 164 cache = {}
164 165
165 cache[dbName] = { 166 cache[dbName] = {
166 "cachedAt": time.time(), 167 "cachedAt": time.time(),
167 "db": data, 168 "db": data,
168 } 169 }
169 with open(self.__cacheFile, "w") as f: 170 with open(self.__cacheFile, "w") as f:
170 json.dump(cache, f, indent=2) 171 json.dump(cache, f, indent=2)
171 172
172 def __fetchVulnerabilityDatabase(self, full=False, forceUpdate=False): 173 def __fetchVulnerabilityDatabase(self, full=False, forceUpdate=False):
173 """ 174 """
174 Private method to get the data of the vulnerability database. 175 Private method to get the data of the vulnerability database.
175 176
176 If the cached data is still valid, this data will be used. 177 If the cached data is still valid, this data will be used.
177 Otherwise a copy of the requested database will be downloaded 178 Otherwise a copy of the requested database will be downloaded
178 and cached. 179 and cached.
179 180
180 @param full flag indicating to get the database containing the full 181 @param full flag indicating to get the database containing the full
181 data set (defaults to False) 182 data set (defaults to False)
182 @type bool (optional) 183 @type bool (optional)
183 @param forceUpdate flag indicating an update of the cache is required 184 @param forceUpdate flag indicating an update of the cache is required
184 (defaults to False) 185 (defaults to False)
186 @return dictionary containing the vulnerability data (full data set or 187 @return dictionary containing the vulnerability data (full data set or
187 just package name and version specifier) 188 just package name and version specifier)
188 """ 189 """
189 dbName = ( 190 dbName = (
190 PipVulnerabilityChecker.FullDbFile 191 PipVulnerabilityChecker.FullDbFile
191 if full else 192 if full
192 PipVulnerabilityChecker.SummaryDbFile 193 else PipVulnerabilityChecker.SummaryDbFile
193 ) 194 )
194 195
195 if not forceUpdate: 196 if not forceUpdate:
196 cachedData = self.__getDataFromCache(dbName) 197 cachedData = self.__getDataFromCache(dbName)
197 if cachedData: 198 if cachedData:
198 return cachedData 199 return cachedData
199 200
200 url = Preferences.getPip("VulnerabilityDbMirror") + dbName 201 url = Preferences.getPip("VulnerabilityDbMirror") + dbName
201 request = QNetworkRequest(QUrl(url)) 202 request = QNetworkRequest(QUrl(url))
202 reply = self.__pip.getNetworkAccessManager().get(request) 203 reply = self.__pip.getNetworkAccessManager().get(request)
203 while not reply.isFinished(): 204 while not reply.isFinished():
204 QCoreApplication.processEvents() 205 QCoreApplication.processEvents()
205 QThread.msleep(100) 206 QThread.msleep(100)
206 207
207 reply.deleteLater() 208 reply.deleteLater()
208 if reply.error() == QNetworkReply.NetworkError.NoError: 209 if reply.error() == QNetworkReply.NetworkError.NoError:
209 data = str(reply.readAll(), 210 data = str(reply.readAll(), Preferences.getSystem("IOEncoding"), "replace")
210 Preferences.getSystem("IOEncoding"),
211 'replace')
212 with contextlib.suppress(json.JSONDecodeError): 211 with contextlib.suppress(json.JSONDecodeError):
213 data = json.loads(data) 212 data = json.loads(data)
214 self.__writeDataToCache(dbName, data) 213 self.__writeDataToCache(dbName, data)
215 return data 214 return data
216 215
217 EricMessageBox.critical( 216 EricMessageBox.critical(
218 None, 217 None,
219 self.tr("Fetching Vulnerability Database"), 218 self.tr("Fetching Vulnerability Database"),
220 self.tr( 219 self.tr(
221 """<p>The vulnerability database <b>{0}</b> could not""" 220 """<p>The vulnerability database <b>{0}</b> could not"""
222 """ be loaded from <b>{1}</b>.</p><p>The vulnerability""" 221 """ be loaded from <b>{1}</b>.</p><p>The vulnerability"""
223 """ check is not available.</p>""" 222 """ check is not available.</p>"""
224 ).format(dbName, Preferences.getPip("VulnerabilityDbMirror")) 223 ).format(dbName, Preferences.getPip("VulnerabilityDbMirror")),
225 ) 224 )
226 return {} 225 return {}
227 226
228 def __getVulnerabilities(self, package, specifier, db): 227 def __getVulnerabilities(self, package, specifier, db):
229 """ 228 """
230 Private method to get the vulnerabilities for a package. 229 Private method to get the vulnerabilities for a package.
231 230
232 @param package name of the package 231 @param package name of the package
233 @type str 232 @type str
234 @param specifier package specifier 233 @param specifier package specifier
235 @type Specifier 234 @type Specifier
236 @param db vulnerability data 235 @param db vulnerability data
240 """ 239 """
241 for entry in db[package]: 240 for entry in db[package]:
242 for entrySpec in entry["specs"]: 241 for entrySpec in entry["specs"]:
243 if entrySpec == specifier: 242 if entrySpec == specifier:
244 yield entry 243 yield entry
245 244
246 def check(self, packages): 245 def check(self, packages):
247 """ 246 """
248 Public method to check the given packages for vulnerabilities. 247 Public method to check the given packages for vulnerabilities.
249 248
250 @param packages list of packages 249 @param packages list of packages
251 @type Package 250 @type Package
252 @return tuple containing an error status and a dictionary containing 251 @return tuple containing an error status and a dictionary containing
253 detected vulnerable packages keyed by package name 252 detected vulnerable packages keyed by package name
254 @rtype tuple of (VulnerabilityCheckError, list of Vulnerability) 253 @rtype tuple of (VulnerabilityCheckError, list of Vulnerability)
255 """ 254 """
256 db = self.__fetchVulnerabilityDatabase() 255 db = self.__fetchVulnerabilityDatabase()
257 if not db: 256 if not db:
258 return VulnerabilityCheckError.SummaryDbUnavailable, [] 257 return VulnerabilityCheckError.SummaryDbUnavailable, []
259 258
260 fullDb = None 259 fullDb = None
261 vulnerablePackages = frozenset(db.keys()) 260 vulnerablePackages = frozenset(db.keys())
262 vulnerabilities = collections.defaultdict(list) 261 vulnerabilities = collections.defaultdict(list)
263 262
264 for package in packages: 263 for package in packages:
265 # normalize the package name, the safety-db is converting 264 # normalize the package name, the safety-db is converting
266 # underscores to dashes and uses lowercase 265 # underscores to dashes and uses lowercase
267 name = package.name.replace("_", "-").lower() 266 name = package.name.replace("_", "-").lower()
268 267
269 if name in vulnerablePackages: 268 if name in vulnerablePackages:
270 # we have a candidate here, build the spec set 269 # we have a candidate here, build the spec set
271 for specifier in db[name]: 270 for specifier in db[name]:
272 specifierSet = SpecifierSet(specifiers=specifier) 271 specifierSet = SpecifierSet(specifiers=specifier)
273 if specifierSet.contains(package.version): 272 if specifierSet.contains(package.version):
274 if not fullDb: 273 if not fullDb:
275 fullDb = self.__fetchVulnerabilityDatabase( 274 fullDb = self.__fetchVulnerabilityDatabase(full=True)
276 full=True)
277 for data in self.__getVulnerabilities( 275 for data in self.__getVulnerabilities(
278 package=name, specifier=specifier, db=fullDb 276 package=name, specifier=specifier, db=fullDb
279 ): 277 ):
280 vulnarabilityId = ( 278 vulnarabilityId = data.get("id").replace("pyup.io-", "")
281 data.get("id").replace("pyup.io-", "")
282 )
283 cveId = data.get("cve", "") 279 cveId = data.get("cve", "")
284 if cveId: 280 if cveId:
285 cveId = cveId.split(",", 1)[0].strip() 281 cveId = cveId.split(",", 1)[0].strip()
286 vulnerabilities[package.name].append(Vulnerability( 282 vulnerabilities[package.name].append(
287 name=name, 283 Vulnerability(
288 spec=specifier, 284 name=name,
289 version=package.version, 285 spec=specifier,
290 cve=cveId, 286 version=package.version,
291 advisory=data.get("advisory", ""), 287 cve=cveId,
292 vulnerabilityId=vulnarabilityId 288 advisory=data.get("advisory", ""),
293 )) 289 vulnerabilityId=vulnarabilityId,
294 290 )
291 )
292
295 return VulnerabilityCheckError.OK, vulnerabilities 293 return VulnerabilityCheckError.OK, vulnerabilities
296 294
297 def updateVulnerabilityDb(self): 295 def updateVulnerabilityDb(self):
298 """ 296 """
299 Public method to update the cache of the vulnerability databases. 297 Public method to update the cache of the vulnerability databases.
300 """ 298 """
301 self.__fetchVulnerabilityDatabase(full=False, forceUpdate=True) 299 self.__fetchVulnerabilityDatabase(full=False, forceUpdate=True)

eric ide

mercurial