Wed, 01 Jan 2020 11:57:23 +0100
Updated copyright for 2020.
# -*- coding: utf-8 -*- # Copyright (c) 2011 - 2020 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing a dialog to show the commit message of the current patch. """ import os from PyQt5.QtCore import QProcess, QTimer, Qt, QCoreApplication from PyQt5.QtWidgets import QDialog, QDialogButtonBox from E5Gui import E5MessageBox from .Ui_HgQueuesHeaderDialog import Ui_HgQueuesHeaderDialog import Utilities class HgQueuesHeaderDialog(QDialog, Ui_HgQueuesHeaderDialog): """ Class implementing a dialog to show the commit message of the current patch. """ def __init__(self, vcs, parent=None): """ Constructor @param vcs reference to the vcs object @param parent reference to the parent widget (QWidget) """ super(HgQueuesHeaderDialog, self).__init__(parent) self.setupUi(self) self.buttonBox.button(QDialogButtonBox.Close).setEnabled(False) self.buttonBox.button(QDialogButtonBox.Cancel).setDefault(True) self.vcs = vcs self.__hgClient = vcs.getClient() if self.__hgClient: self.process = None else: self.process = QProcess() self.process.finished.connect(self.__procFinished) self.process.readyReadStandardOutput.connect(self.__readStdout) self.process.readyReadStandardError.connect(self.__readStderr) self.show() QCoreApplication.processEvents() def closeEvent(self, e): """ Protected slot implementing a close event handler. @param e close event (QCloseEvent) """ if self.__hgClient: if self.__hgClient.isExecuting(): self.__hgClient.cancel() else: if ( self.process is not None and self.process.state() != QProcess.NotRunning ): self.process.terminate() QTimer.singleShot(2000, self.process.kill) self.process.waitForFinished(3000) e.accept() def start(self, path): """ Public slot to start the list command. @param path name of directory to be listed (string) """ self.activateWindow() dname, fname = self.vcs.splitPath(path) # find the root of the repo repodir = dname while not os.path.isdir(os.path.join(repodir, self.vcs.adminDir)): repodir = os.path.dirname(repodir) if os.path.splitdrive(repodir)[1] == os.sep: return args = self.vcs.initCommand("qheader") if self.__hgClient: out, err = self.__hgClient.runcommand( args, output=self.__showOutput, error=self.__showError) if err: self.__showError(err) if out: self.__showOutPut(out) self.__finish() else: self.process.kill() self.process.setWorkingDirectory(repodir) self.process.start('hg', args) procStarted = self.process.waitForStarted(5000) if not procStarted: E5MessageBox.critical( self, self.tr('Process Generation Error'), self.tr( 'The process {0} could not be started. ' 'Ensure, that it is in the search path.' ).format('hg')) def __finish(self): """ Private slot called when the process finished or the user pressed the button. """ if ( self.process is not None and self.process.state() != QProcess.NotRunning ): self.process.terminate() QTimer.singleShot(2000, self.process.kill) self.process.waitForFinished(3000) self.buttonBox.button(QDialogButtonBox.Close).setEnabled(True) self.buttonBox.button(QDialogButtonBox.Cancel).setEnabled(False) self.buttonBox.button(QDialogButtonBox.Close).setDefault(True) self.buttonBox.button(QDialogButtonBox.Close).setFocus( Qt.OtherFocusReason) def on_buttonBox_clicked(self, button): """ Private slot called by a button of the button box clicked. @param button button that was clicked (QAbstractButton) """ if button == self.buttonBox.button(QDialogButtonBox.Close): self.close() elif button == self.buttonBox.button(QDialogButtonBox.Cancel): if self.__hgClient: self.__hgClient.cancel() else: self.__finish() def __procFinished(self, exitCode, exitStatus): """ Private slot connected to the finished signal. @param exitCode exit code of the process (integer) @param exitStatus exit status of the process (QProcess.ExitStatus) """ self.__finish() def __readStdout(self): """ Private slot to handle the readyReadStdout signal. It reads the output of the process, formats it and inserts it into the contents pane. """ if self.process is not None: s = str(self.process.readAllStandardOutput(), self.vcs.getEncoding(), 'replace') self.__showOutput(s) def __showOutput(self, out): """ Private slot to show some output. @param out output to be shown (string) """ self.messageEdit.appendPlainText(Utilities.filterAnsiSequences(out)) def __readStderr(self): """ Private slot to handle the readyReadStderr signal. It reads the error output of the process and inserts it into the error pane. """ if self.process is not None: s = str(self.process.readAllStandardError(), self.vcs.getEncoding(), 'replace') self.__showError(s) def __showError(self, out): """ Private slot to show some error. @param out error to be shown (string) """ self.messageEdit.appendPlainText(self.tr("Error: ")) self.messageEdit.appendPlainText(Utilities.filterAnsiSequences(out))