eric7/PipInterface/PipVulnerabilityChecker.py

branch
eric7
changeset 8978
38c3ddf21537
parent 8977
663521af48b2
child 9001
a00cd6b55728
--- a/eric7/PipInterface/PipVulnerabilityChecker.py	Sun Mar 13 19:59:03 2022 +0100
+++ b/eric7/PipInterface/PipVulnerabilityChecker.py	Mon Mar 14 19:49:48 2022 +0100
@@ -10,12 +10,12 @@
 <a href="https://github.com/pyupio/safety-db">Safety DB</a>.
 """
 
+import collections
 import contextlib
 import enum
 import json
 import os
 import time
-from collections import namedtuple
 from dataclasses import dataclass
 
 from packaging.specifiers import SpecifierSet
@@ -28,7 +28,13 @@
 import Globals
 import Preferences
 
-Package = namedtuple("Package", ["name", "version"])
+@dataclass
+class Package:
+    """
+    Class containing the package data.
+    """
+    name: str               # package name
+    version: str            # version
 
 
 @dataclass
@@ -57,6 +63,9 @@
     """
     Class implementing a Python package vulnerability checker.
     """
+    FullDbFile = "insecure_full.json"
+    SummaryDbFile = "insecure.json"
+    
     def __init__(self, pip, parent=None):
         """
         Constructor
@@ -159,7 +168,7 @@
         with open(self.__cacheFile, "w") as f:
             json.dump(cache, f, indent=2)
     
-    def __fetchVulnerabilityDatabase(self, full=False):
+    def __fetchVulnerabilityDatabase(self, full=False, forceUpdate=False):
         """
         Private method to get the data of the vulnerability database.
         
@@ -170,14 +179,22 @@
         @param full flag indicating to get the database containing the full
             data set (defaults to False)
         @type bool (optional)
+        @param forceUpdate flag indicating an update of the cache is required
+            (defaults to False)
+        @type bool (optional)
         @return dictionary containing the vulnerability data (full data set or
             just package name and version specifier)
         """
-        dbName = "insecure_full.json" if full else "insecure.json"
+        dbName = (
+            PipVulnerabilityChecker.FullDbFile
+            if full else
+            PipVulnerabilityChecker.SummaryDbFile
+        )
         
-        cachedData = self.__getDataFromCache(dbName)
-        if cachedData:
-            return cachedData
+        if not forceUpdate:
+            cachedData = self.__getDataFromCache(dbName)
+            if cachedData:
+                return cachedData
         
         url = Preferences.getPip("VulnerabilityDbMirror") + dbName
         request = QNetworkRequest(QUrl(url))
@@ -229,8 +246,8 @@
         
         @param packages list of packages
         @type Package
-        @return tuple containing an error status and the list of vulnerable
-            packages detected
+        @return tuple containing an error status and a dictionary containing
+            detected vulnerable packages keyed by package name
         @rtype tuple of (VulnerabilityCheckError, list of Vulnerability)
         """
         db = self.__fetchVulnerabilityDatabase()
@@ -239,7 +256,7 @@
         
         fullDb = None
         vulnerablePackages = frozenset(db.keys())
-        vulnerabilities = []            # TODO: fill this list
+        vulnerabilities = collections.defaultdict(list)
         
         for package in packages:
             # normalize the package name, the safety-db is converting
@@ -260,8 +277,23 @@
                             vulnarabilityId = (
                                 data.get("id").replace("pyup.io-", "")
                             )
-                            cveId = data.get("cve")
+                            cveId = data.get("cve", "")
                             if cveId:
                                 cveId = cveId.split(",", 1)[0].strip()
+                            vulnerabilities[package.name].append(Vulnerability(
+                                name=name,
+                                spec=specifier,
+                                version=package.version,
+                                cve=cveId,
+                                advisory=data.get("advisory", ""),
+                                vulnerabilityId=vulnarabilityId
+                            ))
         
         return VulnerabilityCheckError.OK, vulnerabilities
+    
+    def updateVulnerabilityDb(self):
+        """
+        Public method to update the cache of the vulnerability databases.
+        """
+        self.__fetchVulnerabilityDatabase(full=False, forceUpdate=True)
+        self.__fetchVulnerabilityDatabase(full=True, forceUpdate=True)

eric ide

mercurial