Started implementing a vulnerability checker based on the data of the Safety DB. eric7

Sun, 13 Mar 2022 19:59:03 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 13 Mar 2022 19:59:03 +0100
branch
eric7
changeset 8977
663521af48b2
parent 8976
ca442cd49b9e
child 8978
38c3ddf21537

Started implementing a vulnerability checker based on the data of the Safety DB.

eric7/PipInterface/Pip.py file | annotate | diff | comparison | revisions
eric7/PipInterface/PipPackagesWidget.py file | annotate | diff | comparison | revisions
eric7/PipInterface/PipPackagesWidget.ui file | annotate | diff | comparison | revisions
eric7/PipInterface/PipVulnerabilityChecker.py file | annotate | diff | comparison | revisions
eric7/Preferences/__init__.py file | annotate | diff | comparison | revisions
scripts/install.py file | annotate | diff | comparison | revisions
setup.py file | annotate | diff | comparison | revisions
--- a/eric7/PipInterface/Pip.py	Sun Mar 13 15:20:26 2022 +0100
+++ b/eric7/PipInterface/Pip.py	Sun Mar 13 19:59:03 2022 +0100
@@ -31,6 +31,7 @@
     SSL_AVAILABLE = False
 
 from .PipDialog import PipDialog
+from .PipVulnerabilityChecker import PipVulnerabilityChecker
 
 import Preferences
 import Globals
@@ -65,6 +66,8 @@
             self.__networkManager.sslErrors.connect(
                 self.__sslErrorHandler.sslErrorsReply)
         self.__replies = []
+        
+        self.__vulnerabilityChecker = PipVulnerabilityChecker(self, self)
     
     def getNetworkAccessManager(self):
         """
@@ -75,6 +78,15 @@
         """
         return self.__networkManager
     
+    def getVulnerabilityChecker(self):
+        """
+        Public method to get a reference to the vulnerability checker object.
+        
+        @return reference to the vulnerability checker object
+        @type PipVulnerabilityChecker
+        """
+        return self.__vulnerabilityChecker
+    
     ##########################################################################
     ## Methods below implement some utility functions
     ##########################################################################
--- a/eric7/PipInterface/PipPackagesWidget.py	Sun Mar 13 15:20:26 2022 +0100
+++ b/eric7/PipInterface/PipPackagesWidget.py	Sun Mar 13 19:59:03 2022 +0100
@@ -13,6 +13,7 @@
 import contextlib
 
 from PyQt6.QtCore import pyqtSlot, Qt, QUrl, QUrlQuery
+from PyQt6.QtGui import QIcon
 from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest
 from PyQt6.QtWidgets import (
     QWidget, QToolButton, QApplication, QHeaderView, QTreeWidgetItem,
@@ -148,6 +149,12 @@
     ShowProcessFilesListMode = 3
     
     SearchVersionRole = Qt.ItemDataRole.UserRole + 1
+    VulnerabilityRole = Qt.ItemDataRole.UserRole + 2
+    
+    PackageColumn = 0
+    InstalledVersionColumn = 1
+    AvailableVersionColumn = 2
+    VulnerabilityColumn = 3
     
     def __init__(self, pip, parent=None):
         """
@@ -189,7 +196,7 @@
         self.__pip = pip
         
         self.packagesList.header().setSortIndicator(
-            0, Qt.SortOrder.AscendingOrder)
+            PipPackagesWidget.PackageColumn, Qt.SortOrder.AscendingOrder)
         
         self.__infoLabels = {
             "name": self.tr("Name:"),
@@ -324,7 +331,7 @@
         """
         return [
             itm for itm in self.packagesList.selectedItems()
-            if bool(itm.text(2))
+            if bool(itm.text(PipPackagesWidget.AvailableVersionColumn))
         ]
     
     def __allUpdateableItems(self):
@@ -337,7 +344,7 @@
         updateableItems = []
         for index in range(self.packagesList.topLevelItemCount()):
             itm = self.packagesList.topLevelItem(index)
-            if itm.text(2):
+            if itm.text(PipPackagesWidget.AvailableVersionColumn):
                 updateableItems.append(itm)
         
         return updateableItems
@@ -384,7 +391,8 @@
                         usersite=self.userCheckBox.isChecked(),
                     )
                     for package, version in installedPackages:
-                        QTreeWidgetItem(self.packagesList, [package, version])
+                        QTreeWidgetItem(self.packagesList,
+                                        [package, version, "", ""])
                     self.packagesList.setUpdatesEnabled(True)
                     self.statusLabel.setText(
                         self.tr("Getting outdated packages..."))
@@ -406,12 +414,20 @@
                         )
                         if items:
                             itm = items[0]
-                            itm.setText(2, latest)
+                            itm.setText(
+                                PipPackagesWidget.AvailableVersionColumn,
+                                latest)
                     
-                    self.packagesList.sortItems(0, Qt.SortOrder.AscendingOrder)
+                    self.packagesList.sortItems(
+                        PipPackagesWidget.PackageColumn,
+                        Qt.SortOrder.AscendingOrder)
                     for col in range(self.packagesList.columnCount()):
                         self.packagesList.resizeColumnToContents(col)
                     self.packagesList.setUpdatesEnabled(True)
+                    
+                    # 3. update with vulnerability information
+                    if self.vulnerabilityCheckBox.isChecked():
+                        self.__updateVulnerabilityData()
                 self.statusLabel.hide()
         
         self.__updateActionButtons()
@@ -428,33 +444,24 @@
         """
         self.__refreshPackagesList()
     
-    @pyqtSlot(bool)
-    def on_localCheckBox_clicked(self, checked):
+    @pyqtSlot()
+    def on_localCheckBox_clicked(self):
         """
         Private slot handling the switching of the local mode.
-        
-        @param checked state of the local check box
-        @type bool
         """
         self.__refreshPackagesList()
     
-    @pyqtSlot(bool)
-    def on_notRequiredCheckBox_clicked(self, checked):
+    @pyqtSlot()
+    def on_notRequiredCheckBox_clicked(self):
         """
         Private slot handling the switching of the 'not required' mode.
-        
-        @param checked state of the 'not required' check box
-        @type bool
         """
         self.__refreshPackagesList()
     
-    @pyqtSlot(bool)
-    def on_userCheckBox_clicked(self, checked):
+    @pyqtSlot()
+    def on_userCheckBox_clicked(self):
         """
         Private slot handling the switching of the 'user-site' mode.
-        
-        @param checked state of the 'user-site' check box
-        @type bool
         """
         self.__refreshPackagesList()
     
@@ -616,7 +623,8 @@
         """
         Private slot to remove selected packages of the selected environment.
         """
-        packages = [itm.text(0) for itm in self.packagesList.selectedItems()]
+        packages = [itm.text(PipPackagesWidget.PackageColumn)
+                    for itm in self.packagesList.selectedItems()]
         self.executeUninstallPackages(packages)
     
     def executeUninstallPackages(self, packages):
@@ -653,7 +661,7 @@
         """
         item = self.packagesList.selectedItems()[0]
         if item:
-            packageName = item.text(0)
+            packageName = item.text(PipPackagesWidget.PackageColumn)
             upgradable = bool(item.text(2))
             # show details for available version or installed one
             if item.text(2):
@@ -1137,7 +1145,8 @@
         """
         Private slot to force a re-installation of the selected packages.
         """
-        packages = [itm.text(0) for itm in self.packagesList.selectedItems()]
+        packages = [itm.text(PipPackagesWidget.PackageColumn)
+                    for itm in self.packagesList.selectedItems()]
         venvName = self.environmentsComboBox.currentText()
         if venvName and packages:
             self.__pip.installPackages(packages, venvName=venvName,
@@ -1283,3 +1292,50 @@
         venvName = self.environmentsComboBox.currentText()
         if venvName:
             self.__pip.cachePurge(venvName)
+    
+    ##################################################################
+    ## Interface to the vulnerability checks below
+    ##################################################################
+    
+    @pyqtSlot(bool)
+    def on_vulnerabilityCheckBox_clicked(self, checked):
+        """
+        Private slot handling a change of the automatic vulnerability checks.
+        
+        @param checked flag indicating the state of the check box
+        @type bool
+        """
+        if checked:
+            self.__updateVulnerabilityData(clearFirst=True)
+    
+    @pyqtSlot()
+    def __clearVulnerabilityInfo(self):
+        """
+        Private slot to clear the vulnerability info.
+        """
+        for row in range(self.packagesList.topLevelItemCount()):
+            itm = self.packagesList.topLevelItem(row)
+            itm.setText(PipPackagesWidget.VulnerabilityColumn, "")
+            itm.setToolTip(PipPackagesWidget.VulnerabilityColumn, "")
+            itm.setIcon(PipPackagesWidget.VulnerabilityColumn, QIcon())
+            itm.setData(PipPackagesWidget.VulnerabilityColumn,
+                        PipPackagesWidget.VulnerabilityRole,
+                        None)
+    
+    @pyqtSlot()
+    def __updateVulnerabilityData(self, clearFirst=True):
+        """
+        Private slot to update the shown vulnerability info.
+        
+        @param clearFirst flag indicating to clear the vulnerability info first
+            (defaults to True)
+        @type bool (optional)
+        """
+        if clearFirst:
+            self.__clearVulnerabilityInfo()
+        
+        packages = []       # TODO: fill this list with real data
+        
+        error, vulnerabilities = (
+            self.__pip.getVulnerabilityChecker().check(packages)
+        )
--- a/eric7/PipInterface/PipPackagesWidget.ui	Sun Mar 13 15:20:26 2022 +0100
+++ b/eric7/PipInterface/PipPackagesWidget.ui	Sun Mar 13 19:59:03 2022 +0100
@@ -71,6 +71,19 @@
           </property>
          </widget>
         </item>
+        <item row="1" column="1">
+         <widget class="QCheckBox" name="vulnerabilityCheckBox">
+          <property name="toolTip">
+           <string>Perform vulnerability checks based on &quot;Safety DB&quot;.</string>
+          </property>
+          <property name="text">
+           <string>Vulnerability Check</string>
+          </property>
+          <property name="checked">
+           <bool>true</bool>
+          </property>
+         </widget>
+        </item>
        </layout>
       </item>
       <item>
@@ -121,6 +134,11 @@
            <string>Available Version</string>
           </property>
          </column>
+         <column>
+          <property name="text">
+           <string>Vulnerability</string>
+          </property>
+         </column>
         </widget>
         <widget class="QWidget" name="widget" native="true">
          <property name="sizePolicy">
@@ -470,6 +488,7 @@
   <tabstop>localCheckBox</tabstop>
   <tabstop>notRequiredCheckBox</tabstop>
   <tabstop>userCheckBox</tabstop>
+  <tabstop>vulnerabilityCheckBox</tabstop>
   <tabstop>packagesList</tabstop>
   <tabstop>verboseCheckBox</tabstop>
   <tabstop>installedFilesCheckBox</tabstop>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/eric7/PipInterface/PipVulnerabilityChecker.py	Sun Mar 13 19:59:03 2022 +0100
@@ -0,0 +1,267 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing a Python package vulnerability checker.
+
+The vulnerability data is provided by the open Python vulnerability database
+<a href="https://github.com/pyupio/safety-db">Safety DB</a>.
+"""
+
+import contextlib
+import enum
+import json
+import os
+import time
+from collections import namedtuple
+from dataclasses import dataclass
+
+from packaging.specifiers import SpecifierSet
+
+from PyQt6.QtCore import QCoreApplication, QObject, QThread, QUrl
+from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest
+
+from EricWidgets import EricMessageBox
+
+import Globals
+import Preferences
+
+Package = namedtuple("Package", ["name", "version"])
+
+
+@dataclass
+class Vulnerability:
+    """
+    Class containing the vulnerability data.
+    """
+    name: str               # package name
+    spec: dict              # package specification record
+    version: str            # package version
+    cve: str                # CVE ID
+    advisory: str           # CVE advisory text
+    vulnerabilityId: str    # vulnerability ID
+
+
+class VulnerabilityCheckError(enum.Enum):
+    """
+    Class defining various vulnerability check error states.
+    """
+    OK = 0
+    SummaryDbUnavailable = 1
+    FullDbUnavailable = 2
+
+
+class PipVulnerabilityChecker(QObject):
+    """
+    Class implementing a Python package vulnerability checker.
+    """
+    def __init__(self, pip, parent=None):
+        """
+        Constructor
+        
+        @param pip reference to the global pip interface
+        @type Pip
+        @param parent reference to the parent widget (defaults to None)
+        @type QWidget (optional)
+        """
+        super().__init__(parent)
+        
+        self.__pip = pip
+        
+        securityDir = os.path.join(Globals.getConfigDir(), "security")
+        os.makedirs(securityDir, mode=0o700, exist_ok=True)
+        self.__cacheFile = os.path.join(securityDir,
+                                        "vulnerability_cache.json")
+        if not os.path.exists(self.__cacheFile):
+            self.__createCacheFile()
+    
+    def __createCacheFile(self):
+        """
+        Private method to create the cache file.
+        
+        The cache file has the following structure.
+        {
+          "insecure.json": {
+              "cachedAt": 12345678
+              "db": {}
+          },
+          "insecure_full.json": {
+              "cachedAt": 12345678
+              "db": {}
+          },
+        }
+        """
+        structure = {
+            "insecure.json": {
+                "cachedAt": 0,
+                "db": {},
+            },
+            "insecure_full.json": {
+                "cachedAt": 0,
+                "db": {},
+            },
+        }
+        with open(self.__cacheFile, "w") as f:
+            json.dump(structure, f, indent=2)
+    
+    def __getDataFromCache(self, dbName):
+        """
+        Private method to get the vulnerability database from the cache.
+        
+        @param dbName name of the vulnerability database
+        @type str
+        @return dictionary containing the requested vulnerability data
+        @rtype dict
+        """
+        if os.path.exists(self.__cacheFile):
+            with open(self.__cacheFile, "r") as f:
+                with contextlib.suppress(json.JSONDecodeError, OSError):
+                    cachedData = json.load(f)
+                    if (
+                        dbName in cachedData and
+                        "cachedAt" in cachedData[dbName]
+                    ):
+                        cacheValidPeriod = Preferences.getPip(
+                            "VulnerabilityDbCacheValidity")
+                        if (
+                            cachedData[dbName]["cachedAt"] + cacheValidPeriod >
+                            time.time()
+                        ):
+                            return cachedData[dbName]["db"]
+        
+        return {}
+    
+    def __writeDataToCache(self, dbName, data):
+        """
+        Private method to write the vulnerability data for a database to the
+        cache.
+        
+        @param dbName name of the vulnerability database
+        @type str
+        @param data dictionary containing the vulnerability data
+        @type dict
+        """
+        if not os.path.exists(self.__cacheFile):
+            self.__createCacheFile()
+        
+        with open(self.__cacheFile, "r") as f:
+            try:
+                cache = json.load(f)
+            except json.JSONDecodeError:
+                cache = {}
+        
+        cache[dbName] = {
+            "cachedAt": time.time(),
+            "db": data,
+        }
+        with open(self.__cacheFile, "w") as f:
+            json.dump(cache, f, indent=2)
+    
+    def __fetchVulnerabilityDatabase(self, full=False):
+        """
+        Private method to get the data of the vulnerability database.
+        
+        If the cached data is still valid, this data will be used.
+        Otherwise a copy of the requested database will be downloaded
+        and cached.
+        
+        @param full flag indicating to get the database containing the full
+            data set (defaults to False)
+        @type bool (optional)
+        @return dictionary containing the vulnerability data (full data set or
+            just package name and version specifier)
+        """
+        dbName = "insecure_full.json" if full else "insecure.json"
+        
+        cachedData = self.__getDataFromCache(dbName)
+        if cachedData:
+            return cachedData
+        
+        url = Preferences.getPip("VulnerabilityDbMirror") + dbName
+        request = QNetworkRequest(QUrl(url))
+        reply = self.__pip.getNetworkAccessManager().get(request)
+        while not reply.isFinished():
+            QCoreApplication.processEvents()
+            QThread.msleep(100)
+        
+        reply.deleteLater()
+        if reply.error() == QNetworkReply.NetworkError.NoError:
+            data = str(reply.readAll(),
+                       Preferences.getSystem("IOEncoding"),
+                       'replace')
+            with contextlib.suppress(json.JSONDecodeError):
+                data = json.loads(data)
+                self.__writeDataToCache(dbName, data)
+                return data
+        
+        EricMessageBox.critical(
+            None,
+            self.tr("Fetching Vulnerability Database"),
+            self.tr("""<p>The vulnerability database <b>{0}</b> could not"""
+                    """ be loaded from <b>{1}</b>.</p><p>The vulnerability"""
+                    """ check is not available.</p>""")
+        )
+        return {}
+    
+    def __getVulnerabilities(self, package, specifier, db):
+        """
+        Private method to get the vulnerabilities for a package.
+        
+        @param package name of the package
+        @type str
+        @param specifier package specifier
+        @type Specifier
+        @param db vulnerability data
+        @type dict
+        @yield dictionary containing the vulnerability data for the package
+        @ytype dict
+        """
+        for entry in db[package]:
+            for entrySpec in entry["specs"]:
+                if entrySpec == specifier:
+                    yield entry
+    
+    def check(self, packages):
+        """
+        Public method to check the given packages for vulnerabilities.
+        
+        @param packages list of packages
+        @type Package
+        @return tuple containing an error status and the list of vulnerable
+            packages detected
+        @rtype tuple of (VulnerabilityCheckError, list of Vulnerability)
+        """
+        db = self.__fetchVulnerabilityDatabase()
+        if not db:
+            return VulnerabilityCheckError.SummaryDbUnavailable, []
+        
+        fullDb = None
+        vulnerablePackages = frozenset(db.keys())
+        vulnerabilities = []            # TODO: fill this list
+        
+        for package in packages:
+            # normalize the package name, the safety-db is converting
+            # underscores to dashes and uses lowercase
+            name = package.name.replace("_", "-").lower()
+        
+            if name in vulnerablePackages:
+                # we have a candidate here, build the spec set
+                for specifier in db[name]:
+                    specifierSet = SpecifierSet(specifiers=specifier)
+                    if specifierSet.contains(package.version):
+                        if not fullDb:
+                            fullDb = self.__fetchVulnerabilityDatabase(
+                                full=True)
+                        for data in self.__getVulnerabilities(
+                            package=name, specifier=specifier, db=fullDb
+                        ):
+                            vulnarabilityId = (
+                                data.get("id").replace("pyup.io-", "")
+                            )
+                            cveId = data.get("cve")
+                            if cveId:
+                                cveId = cveId.split(",", 1)[0].strip()
+        
+        return VulnerabilityCheckError.OK, vulnerabilities
--- a/eric7/Preferences/__init__.py	Sun Mar 13 15:20:26 2022 +0100
+++ b/eric7/Preferences/__init__.py	Sun Mar 13 19:59:03 2022 +0100
@@ -1434,6 +1434,12 @@
         "PipSearchIndex": "",               # used by the search command
         "ExcludeCondaEnvironments": True,
         # don't show conda environments in selector
+        
+        # defaults for the package vulnerability check
+        "VulnerabilityDbMirror":
+            "https://raw.githubusercontent.com/pyupio/safety-db/master/data/",
+        "VulnerabilityDbCacheValidity": 60 * 60 * 6     # 6 hours
+        # TODO: make these entries configurable
     }
     
     # defaults for MicroPython
@@ -3440,10 +3446,14 @@
     @param key the key of the value to get
     @return the requested pip value
     """
-    if key in ("ExcludeCondaEnvironments"):
+    if key in ("ExcludeCondaEnvironments",):
         return toBool(Prefs.settings.value(
             "Pip/" + key,
             Prefs.pipDefaults[key]))
+    elif key in ("VulnerabilityDbCacheValidity",):
+        return int(Prefs.settings.value(
+            "Pip/" + key,
+            Prefs.pipDefaults[key]))
     else:
         return Prefs.settings.value(
             "Pip/" + key,
--- a/scripts/install.py	Sun Mar 13 15:20:26 2022 +0100
+++ b/scripts/install.py	Sun Mar 13 19:59:03 2022 +0100
@@ -1608,6 +1608,7 @@
         "wheel": ("wheel", ""),
         "parso": ("parso", ""),
         "jedi": ("jedi", ""),
+        "packaging": ("packaging", ""),
     }
     if not ignorePyqt6Tools:
         optionalModulesList["qt6-applications"] = ("qt6_applications", "")
--- a/setup.py	Sun Mar 13 15:20:26 2022 +0100
+++ b/setup.py	Sun Mar 13 19:59:03 2022 +0100
@@ -336,7 +336,7 @@
         "Topic :: Text Editors :: Integrated Development Environments (IDE)"
     ],
     keywords="Development PyQt6 IDE Python3",
-    python_requires=">=3.6",
+    python_requires=">=3.7",
     install_requires=[
         "pip>=19.0",
         "wheel",
@@ -355,6 +355,7 @@
         "Pygments",
         "parso",
         "jedi",
+        "packaging",
         "pywin32>=1.0;platform_system=='Windows'",
     ],
     data_files=getDataFiles(),

eric ide

mercurial