src/eric7/PipInterface/PipVulnerabilityChecker.py

branch
eric7
changeset 9221
bf71ee032bb4
parent 9209
b99e7fd55fd3
child 9413
80c06d472826
--- a/src/eric7/PipInterface/PipVulnerabilityChecker.py	Wed Jul 13 11:16:20 2022 +0200
+++ b/src/eric7/PipInterface/PipVulnerabilityChecker.py	Wed Jul 13 14:55:47 2022 +0200
@@ -34,8 +34,9 @@
     """
     Class containing the package data.
     """
-    name: str               # package name
-    version: str            # version
+
+    name: str  # package name
+    version: str  # version
 
 
 @dataclass
@@ -43,18 +44,20 @@
     """
     Class containing the vulnerability data.
     """
-    name: str               # package name
-    spec: dict              # package specification record
-    version: str            # package version
-    cve: str                # CVE ID
-    advisory: str           # CVE advisory text
-    vulnerabilityId: str    # vulnerability ID
+
+    name: str  # package name
+    spec: dict  # package specification record
+    version: str  # package version
+    cve: str  # CVE ID
+    advisory: str  # CVE advisory text
+    vulnerabilityId: str  # vulnerability ID
 
 
 class VulnerabilityCheckError(enum.Enum):
     """
     Class defining various vulnerability check error states.
     """
+
     OK = 0
     SummaryDbUnavailable = 1
     FullDbUnavailable = 2
@@ -64,33 +67,33 @@
     """
     Class implementing a Python package vulnerability checker.
     """
+
     FullDbFile = "insecure_full.json"
     SummaryDbFile = "insecure.json"
-    
+
     def __init__(self, pip, parent=None):
         """
         Constructor
-        
+
         @param pip reference to the global pip interface
         @type Pip
         @param parent reference to the parent widget (defaults to None)
         @type QWidget (optional)
         """
         super().__init__(parent)
-        
+
         self.__pip = pip
-        
+
         securityDir = os.path.join(Globals.getConfigDir(), "security")
         os.makedirs(securityDir, mode=0o700, exist_ok=True)
-        self.__cacheFile = os.path.join(securityDir,
-                                        "vulnerability_cache.json")
+        self.__cacheFile = os.path.join(securityDir, "vulnerability_cache.json")
         if not os.path.exists(self.__cacheFile):
             self.__createCacheFile()
-    
+
     def __createCacheFile(self):
         """
         Private method to create the cache file.
-        
+
         The cache file has the following structure.
         {
           "insecure.json": {
@@ -115,11 +118,11 @@
         }
         with open(self.__cacheFile, "w") as f:
             json.dump(structure, f, indent=2)
-    
+
     def __getDataFromCache(self, dbName):
         """
         Private method to get the vulnerability database from the cache.
-        
+
         @param dbName name of the vulnerability database
         @type str
         @return dictionary containing the requested vulnerability data
@@ -129,25 +132,23 @@
             with open(self.__cacheFile, "r") as f:  # __IGNORE_WARNING_Y117__
                 with contextlib.suppress(json.JSONDecodeError, OSError):
                     cachedData = json.load(f)
-                    if (
-                        dbName in cachedData and
-                        "cachedAt" in cachedData[dbName]
-                    ):
+                    if dbName in cachedData and "cachedAt" in cachedData[dbName]:
                         cacheValidPeriod = Preferences.getPip(
-                            "VulnerabilityDbCacheValidity")
+                            "VulnerabilityDbCacheValidity"
+                        )
                         if (
-                            cachedData[dbName]["cachedAt"] + cacheValidPeriod >
-                            time.time()
+                            cachedData[dbName]["cachedAt"] + cacheValidPeriod
+                            > time.time()
                         ):
                             return cachedData[dbName]["db"]
-        
+
         return {}
-    
+
     def __writeDataToCache(self, dbName, data):
         """
         Private method to write the vulnerability data for a database to the
         cache.
-        
+
         @param dbName name of the vulnerability database
         @type str
         @param data dictionary containing the vulnerability data
@@ -155,28 +156,28 @@
         """
         if not os.path.exists(self.__cacheFile):
             self.__createCacheFile()
-        
+
         with open(self.__cacheFile, "r") as f:
             try:
                 cache = json.load(f)
             except json.JSONDecodeError:
                 cache = {}
-        
+
         cache[dbName] = {
             "cachedAt": time.time(),
             "db": data,
         }
         with open(self.__cacheFile, "w") as f:
             json.dump(cache, f, indent=2)
-    
+
     def __fetchVulnerabilityDatabase(self, full=False, forceUpdate=False):
         """
         Private method to get the data of the vulnerability database.
-        
+
         If the cached data is still valid, this data will be used.
         Otherwise a copy of the requested database will be downloaded
         and cached.
-        
+
         @param full flag indicating to get the database containing the full
             data set (defaults to False)
         @type bool (optional)
@@ -188,32 +189,30 @@
         """
         dbName = (
             PipVulnerabilityChecker.FullDbFile
-            if full else
-            PipVulnerabilityChecker.SummaryDbFile
+            if full
+            else PipVulnerabilityChecker.SummaryDbFile
         )
-        
+
         if not forceUpdate:
             cachedData = self.__getDataFromCache(dbName)
             if cachedData:
                 return cachedData
-        
+
         url = Preferences.getPip("VulnerabilityDbMirror") + dbName
         request = QNetworkRequest(QUrl(url))
         reply = self.__pip.getNetworkAccessManager().get(request)
         while not reply.isFinished():
             QCoreApplication.processEvents()
             QThread.msleep(100)
-        
+
         reply.deleteLater()
         if reply.error() == QNetworkReply.NetworkError.NoError:
-            data = str(reply.readAll(),
-                       Preferences.getSystem("IOEncoding"),
-                       'replace')
+            data = str(reply.readAll(), Preferences.getSystem("IOEncoding"), "replace")
             with contextlib.suppress(json.JSONDecodeError):
                 data = json.loads(data)
                 self.__writeDataToCache(dbName, data)
                 return data
-        
+
         EricMessageBox.critical(
             None,
             self.tr("Fetching Vulnerability Database"),
@@ -221,14 +220,14 @@
                 """<p>The vulnerability database <b>{0}</b> could not"""
                 """ be loaded from <b>{1}</b>.</p><p>The vulnerability"""
                 """ check is not available.</p>"""
-            ).format(dbName, Preferences.getPip("VulnerabilityDbMirror"))
+            ).format(dbName, Preferences.getPip("VulnerabilityDbMirror")),
         )
         return {}
-    
+
     def __getVulnerabilities(self, package, specifier, db):
         """
         Private method to get the vulnerabilities for a package.
-        
+
         @param package name of the package
         @type str
         @param specifier package specifier
@@ -242,11 +241,11 @@
             for entrySpec in entry["specs"]:
                 if entrySpec == specifier:
                     yield entry
-    
+
     def check(self, packages):
         """
         Public method to check the given packages for vulnerabilities.
-        
+
         @param packages list of packages
         @type Package
         @return tuple containing an error status and a dictionary containing
@@ -256,44 +255,43 @@
         db = self.__fetchVulnerabilityDatabase()
         if not db:
             return VulnerabilityCheckError.SummaryDbUnavailable, []
-        
+
         fullDb = None
         vulnerablePackages = frozenset(db.keys())
         vulnerabilities = collections.defaultdict(list)
-        
+
         for package in packages:
             # normalize the package name, the safety-db is converting
             # underscores to dashes and uses lowercase
             name = package.name.replace("_", "-").lower()
-        
+
             if name in vulnerablePackages:
                 # we have a candidate here, build the spec set
                 for specifier in db[name]:
                     specifierSet = SpecifierSet(specifiers=specifier)
                     if specifierSet.contains(package.version):
                         if not fullDb:
-                            fullDb = self.__fetchVulnerabilityDatabase(
-                                full=True)
+                            fullDb = self.__fetchVulnerabilityDatabase(full=True)
                         for data in self.__getVulnerabilities(
                             package=name, specifier=specifier, db=fullDb
                         ):
-                            vulnarabilityId = (
-                                data.get("id").replace("pyup.io-", "")
-                            )
+                            vulnarabilityId = data.get("id").replace("pyup.io-", "")
                             cveId = data.get("cve", "")
                             if cveId:
                                 cveId = cveId.split(",", 1)[0].strip()
-                            vulnerabilities[package.name].append(Vulnerability(
-                                name=name,
-                                spec=specifier,
-                                version=package.version,
-                                cve=cveId,
-                                advisory=data.get("advisory", ""),
-                                vulnerabilityId=vulnarabilityId
-                            ))
-        
+                            vulnerabilities[package.name].append(
+                                Vulnerability(
+                                    name=name,
+                                    spec=specifier,
+                                    version=package.version,
+                                    cve=cveId,
+                                    advisory=data.get("advisory", ""),
+                                    vulnerabilityId=vulnarabilityId,
+                                )
+                            )
+
         return VulnerabilityCheckError.OK, vulnerabilities
-    
+
     def updateVulnerabilityDb(self):
         """
         Public method to update the cache of the vulnerability databases.

eric ide

mercurial