src/eric7/PipInterface/PipVulnerabilityChecker.py

branch
eric7
changeset 9209
b99e7fd55fd3
parent 9052
c06475635841
child 9221
bf71ee032bb4
equal deleted inserted replaced
9208:3fc8dfeb6ebe 9209:b99e7fd55fd3
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing a Python package vulnerability checker.
8
9 The vulnerability data is provided by the open Python vulnerability database
10 <a href="https://github.com/pyupio/safety-db">Safety DB</a>.
11 """
12
13 import collections
14 import contextlib
15 import enum
16 import json
17 import os
18 import time
19 from dataclasses import dataclass
20
21 from packaging.specifiers import SpecifierSet
22
23 from PyQt6.QtCore import QCoreApplication, QObject, QThread, QUrl
24 from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest
25
26 from EricWidgets import EricMessageBox
27
28 import Globals
29 import Preferences
30
31
32 @dataclass
33 class Package:
34 """
35 Class containing the package data.
36 """
37 name: str # package name
38 version: str # version
39
40
41 @dataclass
42 class Vulnerability:
43 """
44 Class containing the vulnerability data.
45 """
46 name: str # package name
47 spec: dict # package specification record
48 version: str # package version
49 cve: str # CVE ID
50 advisory: str # CVE advisory text
51 vulnerabilityId: str # vulnerability ID
52
53
54 class VulnerabilityCheckError(enum.Enum):
55 """
56 Class defining various vulnerability check error states.
57 """
58 OK = 0
59 SummaryDbUnavailable = 1
60 FullDbUnavailable = 2
61
62
63 class PipVulnerabilityChecker(QObject):
64 """
65 Class implementing a Python package vulnerability checker.
66 """
67 FullDbFile = "insecure_full.json"
68 SummaryDbFile = "insecure.json"
69
70 def __init__(self, pip, parent=None):
71 """
72 Constructor
73
74 @param pip reference to the global pip interface
75 @type Pip
76 @param parent reference to the parent widget (defaults to None)
77 @type QWidget (optional)
78 """
79 super().__init__(parent)
80
81 self.__pip = pip
82
83 securityDir = os.path.join(Globals.getConfigDir(), "security")
84 os.makedirs(securityDir, mode=0o700, exist_ok=True)
85 self.__cacheFile = os.path.join(securityDir,
86 "vulnerability_cache.json")
87 if not os.path.exists(self.__cacheFile):
88 self.__createCacheFile()
89
90 def __createCacheFile(self):
91 """
92 Private method to create the cache file.
93
94 The cache file has the following structure.
95 {
96 "insecure.json": {
97 "cachedAt": 12345678
98 "db": {}
99 },
100 "insecure_full.json": {
101 "cachedAt": 12345678
102 "db": {}
103 },
104 }
105 """
106 structure = {
107 "insecure.json": {
108 "cachedAt": 0,
109 "db": {},
110 },
111 "insecure_full.json": {
112 "cachedAt": 0,
113 "db": {},
114 },
115 }
116 with open(self.__cacheFile, "w") as f:
117 json.dump(structure, f, indent=2)
118
119 def __getDataFromCache(self, dbName):
120 """
121 Private method to get the vulnerability database from the cache.
122
123 @param dbName name of the vulnerability database
124 @type str
125 @return dictionary containing the requested vulnerability data
126 @rtype dict
127 """
128 if os.path.exists(self.__cacheFile):
129 with open(self.__cacheFile, "r") as f: # __IGNORE_WARNING_Y117__
130 with contextlib.suppress(json.JSONDecodeError, OSError):
131 cachedData = json.load(f)
132 if (
133 dbName in cachedData and
134 "cachedAt" in cachedData[dbName]
135 ):
136 cacheValidPeriod = Preferences.getPip(
137 "VulnerabilityDbCacheValidity")
138 if (
139 cachedData[dbName]["cachedAt"] + cacheValidPeriod >
140 time.time()
141 ):
142 return cachedData[dbName]["db"]
143
144 return {}
145
146 def __writeDataToCache(self, dbName, data):
147 """
148 Private method to write the vulnerability data for a database to the
149 cache.
150
151 @param dbName name of the vulnerability database
152 @type str
153 @param data dictionary containing the vulnerability data
154 @type dict
155 """
156 if not os.path.exists(self.__cacheFile):
157 self.__createCacheFile()
158
159 with open(self.__cacheFile, "r") as f:
160 try:
161 cache = json.load(f)
162 except json.JSONDecodeError:
163 cache = {}
164
165 cache[dbName] = {
166 "cachedAt": time.time(),
167 "db": data,
168 }
169 with open(self.__cacheFile, "w") as f:
170 json.dump(cache, f, indent=2)
171
172 def __fetchVulnerabilityDatabase(self, full=False, forceUpdate=False):
173 """
174 Private method to get the data of the vulnerability database.
175
176 If the cached data is still valid, this data will be used.
177 Otherwise a copy of the requested database will be downloaded
178 and cached.
179
180 @param full flag indicating to get the database containing the full
181 data set (defaults to False)
182 @type bool (optional)
183 @param forceUpdate flag indicating an update of the cache is required
184 (defaults to False)
185 @type bool (optional)
186 @return dictionary containing the vulnerability data (full data set or
187 just package name and version specifier)
188 """
189 dbName = (
190 PipVulnerabilityChecker.FullDbFile
191 if full else
192 PipVulnerabilityChecker.SummaryDbFile
193 )
194
195 if not forceUpdate:
196 cachedData = self.__getDataFromCache(dbName)
197 if cachedData:
198 return cachedData
199
200 url = Preferences.getPip("VulnerabilityDbMirror") + dbName
201 request = QNetworkRequest(QUrl(url))
202 reply = self.__pip.getNetworkAccessManager().get(request)
203 while not reply.isFinished():
204 QCoreApplication.processEvents()
205 QThread.msleep(100)
206
207 reply.deleteLater()
208 if reply.error() == QNetworkReply.NetworkError.NoError:
209 data = str(reply.readAll(),
210 Preferences.getSystem("IOEncoding"),
211 'replace')
212 with contextlib.suppress(json.JSONDecodeError):
213 data = json.loads(data)
214 self.__writeDataToCache(dbName, data)
215 return data
216
217 EricMessageBox.critical(
218 None,
219 self.tr("Fetching Vulnerability Database"),
220 self.tr(
221 """<p>The vulnerability database <b>{0}</b> could not"""
222 """ be loaded from <b>{1}</b>.</p><p>The vulnerability"""
223 """ check is not available.</p>"""
224 ).format(dbName, Preferences.getPip("VulnerabilityDbMirror"))
225 )
226 return {}
227
228 def __getVulnerabilities(self, package, specifier, db):
229 """
230 Private method to get the vulnerabilities for a package.
231
232 @param package name of the package
233 @type str
234 @param specifier package specifier
235 @type Specifier
236 @param db vulnerability data
237 @type dict
238 @yield dictionary containing the vulnerability data for the package
239 @ytype dict
240 """
241 for entry in db[package]:
242 for entrySpec in entry["specs"]:
243 if entrySpec == specifier:
244 yield entry
245
246 def check(self, packages):
247 """
248 Public method to check the given packages for vulnerabilities.
249
250 @param packages list of packages
251 @type Package
252 @return tuple containing an error status and a dictionary containing
253 detected vulnerable packages keyed by package name
254 @rtype tuple of (VulnerabilityCheckError, list of Vulnerability)
255 """
256 db = self.__fetchVulnerabilityDatabase()
257 if not db:
258 return VulnerabilityCheckError.SummaryDbUnavailable, []
259
260 fullDb = None
261 vulnerablePackages = frozenset(db.keys())
262 vulnerabilities = collections.defaultdict(list)
263
264 for package in packages:
265 # normalize the package name, the safety-db is converting
266 # underscores to dashes and uses lowercase
267 name = package.name.replace("_", "-").lower()
268
269 if name in vulnerablePackages:
270 # we have a candidate here, build the spec set
271 for specifier in db[name]:
272 specifierSet = SpecifierSet(specifiers=specifier)
273 if specifierSet.contains(package.version):
274 if not fullDb:
275 fullDb = self.__fetchVulnerabilityDatabase(
276 full=True)
277 for data in self.__getVulnerabilities(
278 package=name, specifier=specifier, db=fullDb
279 ):
280 vulnarabilityId = (
281 data.get("id").replace("pyup.io-", "")
282 )
283 cveId = data.get("cve", "")
284 if cveId:
285 cveId = cveId.split(",", 1)[0].strip()
286 vulnerabilities[package.name].append(Vulnerability(
287 name=name,
288 spec=specifier,
289 version=package.version,
290 cve=cveId,
291 advisory=data.get("advisory", ""),
292 vulnerabilityId=vulnarabilityId
293 ))
294
295 return VulnerabilityCheckError.OK, vulnerabilities
296
297 def updateVulnerabilityDb(self):
298 """
299 Public method to update the cache of the vulnerability databases.
300 """
301 self.__fetchVulnerabilityDatabase(full=False, forceUpdate=True)
302 self.__fetchVulnerabilityDatabase(full=True, forceUpdate=True)

eric ide

mercurial