src/eric7/CodeFormatting/IsortFormattingDialog.py

Sun, 06 Nov 2022 11:22:39 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 06 Nov 2022 11:22:39 +0100
branch
eric7
changeset 9481
0b936ff1bbb9
parent 9475
5c09d70e9290
child 9653
e67609152c5e
permissions
-rw-r--r--

Corrected a code style issue.

# -*- coding: utf-8 -*-

# Copyright (c) 2022 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a dialog showing the isort code formatting progress and the results.
"""

import contextlib
import copy
import io
import multiprocessing
import os
import pathlib

from dataclasses import dataclass

from isort import settings
from isort.api import check_file, sort_file
from PyQt6.QtCore import QCoreApplication, Qt, pyqtSlot
from PyQt6.QtWidgets import (
    QAbstractButton,
    QDialog,
    QDialogButtonBox,
    QHeaderView,
    QTreeWidgetItem,
)

from eric7 import Preferences
from eric7.EricWidgets import EricMessageBox

from .FormattingDiffWidget import FormattingDiffWidget
from .IsortFormattingAction import IsortFormattingAction
from .Ui_IsortFormattingDialog import Ui_IsortFormattingDialog


class IsortFormattingDialog(QDialog, Ui_IsortFormattingDialog):
    """
    Class implementing a dialog showing the isort code formatting progress and the
    results.
    """

    DataRole = Qt.ItemDataRole.UserRole
    DataTypeRole = Qt.ItemDataRole.UserRole + 1
    FileNameRole = Qt.ItemDataRole.UserRole + 2
    StatusRole = Qt.ItemDataRole.UserRole + 3

    FileNameColumn = 1
    StatusColumn = 0

    def __init__(
        self,
        configuration,
        filesList,
        project=None,
        action=IsortFormattingAction.Sort,
        parent=None,
    ):
        """
        Constructor

        @param configuration dictionary containing the configuration parameters
        @type dict
        @param filesList list of absolute file paths to be processed
        @type list of str
        @param project reference to the project object (defaults to None)
        @type Project (optional)
        @param action action to be performed (defaults to IsortFormattingAction.Sort)
        @type IsortFormattingAction (optional)
        @param parent reference to the parent widget (defaults to None)
        @type QWidget (optional)
        """
        super().__init__(parent)
        self.setupUi(self)

        self.resultsList.header().setSortIndicator(1, Qt.SortOrder.AscendingOrder)

        self.__project = project

        self.__config = copy.deepcopy(configuration)
        self.__config["quiet"] = True  # we don't want extra output
        self.__config["overwrite_in_place"] = True  # we want to overwrite the files
        if "config_source" in self.__config:
            del self.__config["config_source"]

        # Create an isort Config object and pre-load it with parameters contained in
        # project specific configuration files (like pyproject.toml). The configuration
        # given as a dictionary (i.e. data entered in the configuration dialog)
        # overwrites these. If the project is not passed, the isort config is based on
        # the isort defaults.
        if project:
            # clear the caches in order to force a re-read of config files
            settings._get_config_data.cache_clear()
            settings._find_config.cache_clear()
        try:
            self.__isortConfig = (
                settings.Config(settings_path=project.getProjectPath(), **self.__config)
                if project
                else settings.Config(**self.__config)
            )
        except KeyError:
            # invalid configuration entry found in some config file; use just the dialog
            # parameters
            self.__isortConfig = settings.Config(**self.__config)

        self.__config["__action__"] = action  # needed by the workers

        self.__filesList = filesList[:]

        self.__diffDialog = None

        self.__allFilter = self.tr("<all>")

        self.__sortImportsButton = self.buttonBox.addButton(
            self.tr("Sort Imports"), QDialogButtonBox.ButtonRole.ActionRole
        )
        self.__sortImportsButton.setVisible(False)

        self.show()
        QCoreApplication.processEvents()

        self.__performAction()

    def __performAction(self):
        """
        Private method to execute the requested sorting action.
        """
        self.progressBar.setMaximum(len(self.__filesList))
        self.progressBar.setValue(0)
        self.progressBar.setVisible(True)

        self.statisticsGroup.setVisible(False)
        self.__statistics = IsortStatistics()

        self.__cancelled = False

        self.statusFilterComboBox.clear()
        self.resultsList.clear()

        self.buttonBox.button(QDialogButtonBox.StandardButton.Cancel).setEnabled(True)
        self.buttonBox.button(QDialogButtonBox.StandardButton.Close).setEnabled(False)
        self.buttonBox.button(QDialogButtonBox.StandardButton.Cancel).setDefault(True)

        files = self.__filterFiles(self.__filesList)
        if len(files) > 1:
            self.__sortManyFiles(files)
        elif len(files) == 1:
            self.__sortOneFile(files[0])

    def __filterFiles(self, filesList):
        """
        Private method to filter the given list of files according the
        configuration parameters.

        @param filesList list of files
        @type list of str
        @return list of filtered files
        @rtype list of str
        """
        files = []
        for file in filesList:
            if not self.__isortConfig.is_supported_filetype(
                file
            ) or self.__isortConfig.is_skipped(pathlib.Path(file)):
                self.__handleIsortResult(file, "skipped")
            else:
                files.append(file)

        return files

    def __resort(self):
        """
        Private method to resort the result list.
        """
        self.resultsList.sortItems(
            self.resultsList.sortColumn(),
            self.resultsList.header().sortIndicatorOrder(),
        )

    def __resizeColumns(self):
        """
        Private method to resize the columns of the result list.
        """
        self.resultsList.header().resizeSections(
            QHeaderView.ResizeMode.ResizeToContents
        )
        self.resultsList.header().setStretchLastSection(True)

    def __populateStatusFilterCombo(self):
        """
        Private method to populate the status filter combo box with allowed selections.
        """
        allowedSelections = set()
        for row in range(self.resultsList.topLevelItemCount()):
            allowedSelections.add(
                self.resultsList.topLevelItem(row).text(
                    IsortFormattingDialog.StatusColumn
                )
            )

        self.statusFilterComboBox.addItem(self.__allFilter)
        self.statusFilterComboBox.addItems(sorted(allowedSelections))

    def __finish(self):
        """
        Private method to perform some actions after the run was performed or canceled.
        """
        self.__resort()
        self.__resizeColumns()

        self.buttonBox.button(QDialogButtonBox.StandardButton.Cancel).setEnabled(False)
        self.buttonBox.button(QDialogButtonBox.StandardButton.Close).setEnabled(True)
        self.buttonBox.button(QDialogButtonBox.StandardButton.Close).setDefault(True)

        self.progressBar.setVisible(False)

        self.__sortImportsButton.setVisible(
            self.__config["__action__"] is not IsortFormattingAction.Sort
            and self.__statistics.changeCount > 0
        )

        self.__updateStatistics()
        self.__populateStatusFilterCombo()

    def __updateStatistics(self):
        """
        Private method to update the statistics about the recent sorting run and
        make them visible.
        """
        self.reformattedLabel.setText(
            self.tr("Resorted:")
            if self.__config["__action__"] is IsortFormattingAction.Sort
            else self.tr("Would Resort:")
        )

        total = self.progressBar.maximum()

        self.totalCountLabel.setText("{0:n}".format(total))
        self.skippedCountLabel.setText("{0:n}".format(self.__statistics.skippedCount))
        self.failuresCountLabel.setText("{0:n}".format(self.__statistics.failureCount))
        self.processedCountLabel.setText(
            "{0:n}".format(self.__statistics.processedCount)
        )
        self.reformattedCountLabel.setText(
            "{0:n}".format(self.__statistics.changeCount)
        )
        self.unchangedCountLabel.setText("{0:n}".format(self.__statistics.sameCount))

        self.statisticsGroup.setVisible(True)

    @pyqtSlot(QAbstractButton)
    def on_buttonBox_clicked(self, button):
        """
        Private slot to handle button presses of the dialog buttons.

        @param button reference to the pressed button
        @type QAbstractButton
        """
        if button == self.buttonBox.button(QDialogButtonBox.StandardButton.Cancel):
            self.__cancelled = True
        elif button == self.buttonBox.button(QDialogButtonBox.StandardButton.Close):
            self.accept()
        elif button is self.__sortImportsButton:
            self.__sortImportsButtonClicked()

    @pyqtSlot()
    def __sortImportsButtonClicked(self):
        """
        Private slot handling the selection of the 'Sort Imports' button.
        """
        files = []
        for row in range(self.resultsList.topLevelItemCount()):
            itm = self.resultsList.topLevelItem(row)
            if itm.data(0, IsortFormattingDialog.StatusRole) == "changed":
                files.append(itm.data(0, IsortFormattingDialog.FileNameRole))
        if files:
            self.__filesList = files

        self.__config["__action__"] = IsortFormattingAction.Sort
        self.__performAction()

    @pyqtSlot(QTreeWidgetItem, int)
    def on_resultsList_itemDoubleClicked(self, item, column):
        """
        Private slot handling a double click of a result item.

        @param item reference to the double clicked item
        @type QTreeWidgetItem
        @param column column number that was double clicked
        @type int
        """
        dataType = item.data(0, IsortFormattingDialog.DataTypeRole)
        if dataType == "error":
            EricMessageBox.critical(
                self,
                self.tr("Imports Sorting Failure"),
                self.tr(
                    "<p>Imports sorting failed due to this error.</p><p>{0}</p>"
                ).format(item.data(0, IsortFormattingDialog.DataRole)),
            )
        elif dataType == "diff":
            if self.__diffDialog is None:
                self.__diffDialog = FormattingDiffWidget()
            self.__diffDialog.showDiff(item.data(0, IsortFormattingDialog.DataRole))

    @pyqtSlot(str)
    def on_statusFilterComboBox_currentTextChanged(self, status):
        """
        Private slot handling the selection of a status for items to be shown.

        @param status selected status
        @type str
        """
        for row in range(self.resultsList.topLevelItemCount()):
            itm = self.resultsList.topLevelItem(row)
            itm.setHidden(
                status != self.__allFilter
                and itm.text(IsortFormattingDialog.StatusColumn) != status
            )

    def closeEvent(self, evt):
        """
        Protected slot implementing a close event handler.

        @param evt reference to the close event
        @type QCloseEvent
        """
        if self.__diffDialog is not None:
            self.__diffDialog.close()
        evt.accept()

    def __handleIsortResult(self, filename, status, data=""):
        """
        Private method to handle an isort sorting result.

        @param filename name of the processed file
        @type str
        @param status status of the performed action (one of 'changed', 'failed',
            'skipped' or 'unchanged')
        @type str
        @param data action data (error message or unified diff) (defaults to "")
        @type str (optional)
        """
        isError = False

        if status == "changed":
            statusMsg = (
                self.tr("would resort")
                if self.__config["__action__"]
                in (IsortFormattingAction.Check, IsortFormattingAction.Diff)
                else self.tr("resorted")
            )
            self.__statistics.changeCount += 1

        elif status == "unchanged":
            statusMsg = self.tr("unchanged")
            self.__statistics.sameCount += 1

        elif status == "skipped":
            statusMsg = self.tr("skipped")
            self.__statistics.skippedCount += 1

        elif status == "failed":
            statusMsg = self.tr("failed")
            self.__statistics.failureCount += 1
            isError = True

        elif status == "unsupported":
            statusMsg = self.tr("error")
            data = self.tr("Unsupported 'isort' action ({0}) given.").format(
                self.__config["__action__"]
            )
            self.__statistics.failureCount += 1
            isError = True

        else:
            statusMsg = self.tr("invalid status ({0})").format(status)
            self.__statistics.failureCount += 1
            isError = True

        if status != "skipped":
            self.__statistics.processedCount += 1

        itm = QTreeWidgetItem(
            self.resultsList,
            [
                statusMsg,
                self.__project.getRelativePath(filename)
                if self.__project
                else filename,
            ],
        )
        itm.setData(0, IsortFormattingDialog.StatusRole, status)
        itm.setData(0, IsortFormattingDialog.FileNameRole, filename)
        if data:
            itm.setData(
                0, IsortFormattingDialog.DataTypeRole, "error" if isError else "diff"
            )
            itm.setData(0, IsortFormattingDialog.DataRole, data)

        self.progressBar.setValue(self.progressBar.value() + 1)

        QCoreApplication.processEvents()

    def __sortManyFiles(self, files):
        """
        Private method to sort imports of the list of files according the configuration
        using multiple processes in parallel.

        @param files list of files to be processed
        @type list of str
        """
        maxProcesses = Preferences.getUI("BackgroundServiceProcesses")
        if maxProcesses == 0:
            # determine based on CPU count
            try:
                NumberOfProcesses = multiprocessing.cpu_count()
                if NumberOfProcesses >= 1:
                    NumberOfProcesses -= 1
            except NotImplementedError:
                NumberOfProcesses = 1
        else:
            NumberOfProcesses = maxProcesses

        # Create queues
        taskQueue = multiprocessing.Queue()
        doneQueue = multiprocessing.Queue()

        # Submit tasks (initially two times the number of processes)
        tasks = len(files)
        initialTasks = min(2 * NumberOfProcesses, tasks)
        for _ in range(initialTasks):
            file = files.pop(0)
            taskQueue.put((file, self.__config["__action__"]))

        # Start worker processes
        workers = [
            multiprocessing.Process(
                target=self.sortingWorkerTask,
                args=(taskQueue, doneQueue, self.__isortConfig),
            )
            for _ in range(NumberOfProcesses)
        ]
        for worker in workers:
            worker.start()

        # Get the results from the worker tasks
        for _ in range(tasks):
            result = doneQueue.get()
            self.__handleIsortResult(result.filename, result.status, data=result.data)

            if self.__cancelled:
                break

            if files:
                file = files.pop(0)
                taskQueue.put((file, self.__config["__action__"]))

        # Tell child processes to stop
        for _ in range(NumberOfProcesses):
            taskQueue.put("STOP")

        for worker in workers:
            worker.join()
            worker.close()

        taskQueue.close()
        doneQueue.close()

        self.__finish()

    @staticmethod
    def sortingWorkerTask(inputQueue, outputQueue, isortConfig):
        """
        Static method acting as the parallel worker for the formatting task.

        @param inputQueue input queue
        @type multiprocessing.Queue
        @param outputQueue output queue
        @type multiprocessing.Queue
        @param isortConfig config object for isort
        @type isort.Config
        """
        for file, action in iter(inputQueue.get, "STOP"):
            if action == IsortFormattingAction.Diff:
                result = IsortFormattingDialog.__isortCheckFile(
                    file,
                    isortConfig,
                    withDiff=True,
                )

            elif action == IsortFormattingAction.Sort:
                result = IsortFormattingDialog.__isortSortFile(
                    file,
                    isortConfig,
                )

            else:
                result = IsortResult(
                    status="unsupported",
                    filename=file,
                )

            outputQueue.put(result)

    def __sortOneFile(self, file):
        """
        Private method to sort the imports of the list of files according the
        configuration.

        @param file name of the file to be processed
        @type str
        """
        if self.__config["__action__"] == IsortFormattingAction.Diff:
            result = IsortFormattingDialog.__isortCheckFile(
                file,
                self.__isortConfig,
                withDiff=True,
            )

        elif self.__config["__action__"] == IsortFormattingAction.Sort:
            result = IsortFormattingDialog.__isortSortFile(
                file,
                self.__isortConfig,
            )

        else:
            result = IsortResult(
                status="unsupported",
                filename=file,
            )

        self.__handleIsortResult(result.filename, result.status, data=result.data)

        self.__finish()

    @staticmethod
    def __isortCheckFile(filename, isortConfig, withDiff=True):
        """
        Static method to check, if a file's import statements need to be changed.

        @param filename name of the file to be processed
        @type str
        @param isortConfig config object for isort
        @type isort.Config
        @param withDiff flag indicating to return a unified diff, if the file needs to
            be changed (defaults to True)
        @type bool (optional)
        @return result object
        @rtype IsortResult
        """
        diffIO = io.StringIO() if withDiff else False
        with open(os.devnull, "w") as devnull, contextlib.redirect_stderr(devnull):
            ok = check_file(
                filename,
                show_diff=diffIO,
                config=isortConfig,
            )
        if withDiff:
            data = "" if ok else diffIO.getvalue()
            diffIO.close()
        else:
            data = ""

        status = "unchanged" if ok else "changed"

        return IsortResult(status=status, filename=filename, data=data)

    @staticmethod
    def __isortSortFile(filename, isortConfig):
        """
        Static method to sort the import statements of a file.

        @param filename name of the file to be processed
        @type str
        @param isortConfig config object for isort
        @type isort.Config
        @return result object
        @rtype IsortResult
        """
        with open(os.devnull, "w") as devnull, contextlib.redirect_stderr(devnull):
            ok = sort_file(
                filename,
                config=isortConfig,
                ask_to_apply=False,
                write_to_stdout=False,
                show_diff=False,
            )

        status = "changed" if ok else "unchanged"

        return IsortResult(status=status, filename=filename)


@dataclass
class IsortStatistics:
    """
    Class containing the isort statistic data.
    """

    skippedCount: int = 0
    changeCount: int = 0
    sameCount: int = 0
    failureCount: int = 0
    processedCount: int = 0


@dataclass
class IsortResult:
    """
    Class containing the isort result data.
    """

    status: str = ""
    filename: str = ""
    data: str = ""

eric ide

mercurial