Plugins/VcsPlugins/vcsMercurial/HgSummaryDialog.py

Sun, 19 Jan 2014 15:37:47 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 19 Jan 2014 15:37:47 +0100
changeset 3218
c33689d92b14
parent 3190
a9a94491c4fd
child 3302
e92f0dd51979
permissions
-rw-r--r--

Fixed an issue in the Mercurial summary dialog.

# -*- coding: utf-8 -*-

# Copyright (c) 2013 - 2014 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a dialog to show some summary information of the working
directory state.
"""

import os

from PyQt4.QtCore import pyqtSlot, QProcess, QTimer
from PyQt4.QtGui import QDialog, QDialogButtonBox

from E5Gui import E5MessageBox

from .HgUtilities import prepareProcess

from .Ui_HgSummaryDialog import Ui_HgSummaryDialog

import Preferences


class HgSummaryDialog(QDialog, Ui_HgSummaryDialog):
    """
    Class implementing a dialog to show some summary information of the working
    directory state.
    """
    def __init__(self, vcs, parent=None):
        """
        Constructor
        
        @param vcs reference to the vcs object
        @param parent parent widget (QWidget)
        """
        super().__init__(parent)
        self.setupUi(self)
        
        self.refreshButton = self.buttonBox.addButton(
            self.tr("Refresh"), QDialogButtonBox.ActionRole)
        self.refreshButton.setToolTip(
            self.tr("Press to refresh the summary display"))
        self.refreshButton.setEnabled(False)
        
        self.process = None
        self.vcs = vcs
        self.vcs.committed.connect(self.__committed)
    
    def closeEvent(self, e):
        """
        Private slot implementing a close event handler.
        
        @param e close event (QCloseEvent)
        """
        if self.process is not None and \
           self.process.state() != QProcess.NotRunning:
            self.process.terminate()
            QTimer.singleShot(2000, self.process.kill)
            self.process.waitForFinished(3000)
        
        e.accept()
    
    def start(self, path, mq=False):
        """
        Public slot to start the hg summary command.
        
        @param path path name of the working directory (string)
        @param mq flag indicating to show the queue status as well (boolean)
        """
        self.errorGroup.hide()
        self.__path = path
        self.__mq = mq
        
        args = []
        args.append('summary')
        self.vcs.addArguments(args, self.vcs.options['global'])
        args.append("--remote")
        if self.__mq:
            args.append("--mq")
        
        # find the root of the repo
        repodir = self.__path
        while not os.path.isdir(os.path.join(repodir, self.vcs.adminDir)):
            repodir = os.path.dirname(repodir)
            if os.path.splitdrive(repodir)[1] == os.sep:
                return
        
        if self.process:
            self.process.kill()
        else:
            self.process = QProcess()
            prepareProcess(self.process, Preferences.getSystem("IOEncoding"),
                           "C")
            self.process.finished.connect(self.__procFinished)
            self.process.readyReadStandardOutput.connect(self.__readStdout)
            self.process.readyReadStandardError.connect(self.__readStderr)
        
        self.process.setWorkingDirectory(repodir)
        
        self.__buffer = []
        
        self.process.start('hg', args)
        procStarted = self.process.waitForStarted(5000)
        if not procStarted:
            E5MessageBox.critical(
                self,
                self.tr('Process Generation Error'),
                self.tr(
                    'The process {0} could not be started. '
                    'Ensure, that it is in the search path.'
                ).format('hg'))
    
    def __finish(self):
        """
        Private slot called when the process finished or the user pressed
        the button.
        """
        if self.process is not None and \
           self.process.state() != QProcess.NotRunning:
            self.process.terminate()
            QTimer.singleShot(2000, self.process.kill)
            self.process.waitForFinished(3000)
        
        self.refreshButton.setEnabled(True)
        self.process = None
    
    def on_buttonBox_clicked(self, button):
        """
        Private slot called by a button of the button box clicked.
        
        @param button button that was clicked (QAbstractButton)
        """
        if button == self.buttonBox.button(QDialogButtonBox.Close):
            self.close()
        elif button == self.refreshButton:
            self.on_refreshButton_clicked()
    
    def __procFinished(self, exitCode, exitStatus):
        """
        Private slot connected to the finished signal.
        
        @param exitCode exit code of the process (integer)
        @param exitStatus exit status of the process (QProcess.ExitStatus)
        """
        self.__processOutput(self.__buffer)
        self.__finish()
    
    def __readStdout(self):
        """
        Private slot to handle the readyReadStandardOutput signal.
        
        It reads the output of the process, formats it and inserts it into
        the contents pane.
        """
        if self.process is not None:
            self.process.setReadChannel(QProcess.StandardOutput)
            
            while self.process.canReadLine():
                line = str(
                    self.process.readLine(),
                    Preferences.getSystem("IOEncoding"),
                    'replace')
                self.__buffer.append(line)
    
    def __readStderr(self):
        """
        Private slot to handle the readyReadStandardError signal.
        
        It reads the error output of the process and inserts it into the
        error pane.
        """
        if self.process is not None:
            s = str(self.process.readAllStandardError(),
                    Preferences.getSystem("IOEncoding"),
                    'replace')
            self.__showError(s)
    
    def __showError(self, out):
        """
        Private slot to show some error.
        
        @param out error to be shown (string)
        """
        self.errorGroup.show()
        self.errors.insertPlainText(out)
        self.errors.ensureCursorVisible()
    
    @pyqtSlot()
    def on_refreshButton_clicked(self):
        """
        Private slot to refresh the status display.
        """
        self.refreshButton.setEnabled(False)
        self.summary.clear()
        
        self.start(self.__path, mq=self.__mq)
    
    def __committed(self):
        """
        Private slot called after the commit has finished.
        """
        if self.isVisible():
            self.on_refreshButton_clicked()
    
    def __processOutput(self, output):
        """
        Private method to process the output into nice readable text.
        
        @param output output from the summary command (string)
        """
        infoDict = {}
        
        # step 1: parse the output
        while output:
            line = output.pop(0)
            name, value = line.split(": ", 1)
            value = value.strip()
            
            if name == "parent":
                if " " in value:
                    parent, tags = value.split(" ", 1)
                else:
                    parent = value
                    tags = ""
                rev, node = parent.split(":")
                
                remarks = []
                if tags:
                    if " (empty repository)" in tags:
                        remarks.append("@EMPTY@")
                        tags = tags.replace(" (empty repository)", "")
                    if " (no revision checked out)" in tags:
                        remarks.append("@NO_REVISION@")
                        tags = tags.replace(" (no revision checked out)", "")
                else:
                    tags = None
                
                value = infoDict.get(name, [])
                
                if rev == "-1":
                    value.append((int(rev), node, tags, None, remarks))
                else:
                    message = output.pop(0).strip()
                    value.append((int(rev), node, tags, message, remarks))
            elif name == "branch":
                pass
            elif name == "bookmarks":
                pass
            elif name == "commit":
                stateDict = {}
                if "(" in value:
                    if value.startswith("("):
                        states = ""
                        remark = value[1:-1]
                    else:
                        states, remark = value.rsplit(" (", 1)
                        remark = remark[:-1]
                else:
                    states = value
                    remark = ""
                states = states.split(", ")
                for state in states:
                    if state:
                        count, category = state.split(" ")
                        stateDict[category] = count
                value = (stateDict, remark)
            elif name == "update":
                if value.endswith("(current)"):
                    value = ("@CURRENT@", 0, 0)
                elif value.endswith("(update)"):
                    value = ("@UPDATE@", int(value.split(" ", 1)[0]), 0)
                elif value.endswith("(merge)"):
                    parts = value.split(", ")
                    value = ("@MERGE@", int(parts[0].split(" ", 1)[0]),
                             int(parts[1].split(" ", 1)[0]))
                else:
                    value = ("@UNKNOWN@", 0, 0)
            elif name == "remote":
                if value == "(synced)":
                    value = (0, 0, 0, 0)
                else:
                    inc = incb = outg = outgb = 0
                    for val in value.split(", "):
                        count, category = val.split(" ", 1)
                        if category == "outgoing":
                            outg = int(count)
                        elif category.endswith("incoming"):
                            inc = int(count)
                        elif category == "incoming bookmarks":
                            incb = int(count)
                        elif category == "outgoing bookmarks":
                            outgb = int(count)
                    value = (inc, outg, incb, outgb)
            elif name == "mq":
                if value == "(empty queue)":
                    value = (0, 0)
                else:
                    applied = unapplied = 0
                    for val in value.split(", "):
                        count, category = val.split(" ", 1)
                        if category == "applied":
                            applied = int(count)
                        elif category == "unapplied":
                            unapplied = int(count)
                    value = (applied, unapplied)
            else:
                # ignore unknown entries
                continue
            
            infoDict[name] = value
        
        # step 2: build the output
        if infoDict:
            info = ["<table>"]
            pindex = 0
            for rev, node, tags, message, remarks in infoDict["parent"]:
                pindex += 1
                changeset = "{0}:{1}".format(rev, node)
                if len(infoDict["parent"]) > 1:
                    info.append(self.tr(
                        "<tr><td><b>Parent #{0}</b></td><td>{1}</td></tr>")
                        .format(pindex, changeset))
                else:
                    info.append(self.tr(
                        "<tr><td><b>Parent</b></td><td>{0}</td></tr>")
                        .format(changeset))
                if tags:
                    info.append(self.tr(
                        "<tr><td><b>Tags</b></td><td>{0}</td></tr>")
                        .format('<br/>'.join(tags.split())))
                if message:
                    info.append(self.tr(
                        "<tr><td><b>Commit Message</b></td><td>{0}</td></tr>")
                        .format(message))
                if remarks:
                    rem = []
                    if "@EMPTY@" in remarks:
                        rem.append(self.tr("empty repository"))
                    if "@NO_REVISION@" in remarks:
                        rem.append(self.tr("no revision checked out"))
                    info.append(self.tr(
                        "<tr><td><b>Remarks</b></td><td>{0}</td></tr>")
                        .format(", ".join(rem)))
            if "branch" in infoDict:
                info.append(self.tr(
                    "<tr><td><b>Branch</b></td><td>{0}</td></tr>")
                    .format(infoDict["branch"]))
            if "bookmarks" in infoDict:
                bookmarks = infoDict["bookmarks"].split()
                for i in range(len(bookmarks)):
                    if bookmarks[i].startswith("*"):
                        bookmarks[i] = "<b>{0}</b>".format(bookmarks[i])
                info.append(self.tr(
                    "<tr><td><b>Bookmarks</b></td><td>{0}</td></tr>")
                    .format('<br/>'.join(bookmarks)))
            if "commit" in infoDict:
                cinfo = []
                for category, count in infoDict["commit"][0].items():
                    if category == "modified":
                        cinfo.append(self.tr("{0} modified").format(count))
                    elif category == "added":
                        cinfo.append(self.tr("{0} added").format(count))
                    elif category == "removed":
                        cinfo.append(self.tr("{0} removed").format(count))
                    elif category == "renamed":
                        cinfo.append(self.tr("{0} renamed").format(count))
                    elif category == "copied":
                        cinfo.append(self.tr("{0} copied").format(count))
                    elif category == "deleted":
                        cinfo.append(self.tr("{0} deleted").format(count))
                    elif category == "unknown":
                        cinfo.append(self.tr("{0} unknown").format(count))
                    elif category == "ignored":
                        cinfo.append(self.tr("{0} ignored").format(count))
                    elif category == "unresolved":
                        cinfo.append(
                            self.tr("{0} unresolved").format(count))
                    elif category == "subrepos":
                        cinfo.append(self.tr("{0} subrepos").format(count))
                remark = infoDict["commit"][1]
                if remark == "merge":
                    cinfo.append(self.tr("Merge needed"))
                elif remark == "new branch":
                    cinfo.append(self.tr("New Branch"))
                elif remark == "head closed":
                    cinfo.append(self.tr("Head is closed"))
                elif remark == "clean":
                    cinfo.append(self.tr("No commit required"))
                elif remark == "new branch head":
                    cinfo.append(self.tr("New Branch Head"))
                info.append(self.tr(
                    "<tr><td><b>Commit Status</b></td><td>{0}</td></tr>")
                    .format("<br/>".join(cinfo)))
            if "update" in infoDict:
                if infoDict["update"][0] == "@CURRENT@":
                    uinfo = self.tr("current")
                elif infoDict["update"][0] == "@UPDATE@":
                    uinfo = self.tr(
                        "%n new changeset(s)<br/>Update required", "",
                        infoDict["update"][1])
                elif infoDict["update"][0] == "@MERGE@":
                    uinfo1 = self.tr(
                        "%n new changeset(s)", "", infoDict["update"][1])
                    uinfo2 = self.tr(
                        "%n branch head(s)", "", infoDict["update"][2])
                    uinfo = self.tr(
                        "{0}<br/>{1}<br/>Merge required",
                        "0 is changesets, 1 is branch heads")\
                        .format(uinfo1, uinfo2)
                else:
                    uinfo = self.tr("unknown status")
                info.append(self.tr(
                    "<tr><td><b>Update Status</b></td><td>{0}</td></tr>")
                    .format(uinfo))
            if "remote" in infoDict:
                if infoDict["remote"] == (0, 0, 0, 0):
                    rinfo = self.tr("synched")
                else:
                    li = []
                    if infoDict["remote"][0]:
                        li.append(self.tr("1 or more incoming"))
                    if infoDict["remote"][1]:
                        li.append(self.tr("{0} outgoing")
                                  .format(infoDict["remote"][1]))
                    if infoDict["remote"][2]:
                        li.append(self.tr("%n incoming bookmark(s)", "",
                                  infoDict["remote"][2]))
                    if infoDict["remote"][3]:
                        li.append(self.tr("%n outgoing bookmark(s)", "",
                                  infoDict["remote"][3]))
                    rinfo = "<br/>".join(li)
                info.append(self.tr(
                    "<tr><td><b>Remote Status</b></td><td>{0}</td></tr>")
                    .format(rinfo))
            if "mq" in infoDict:
                if infoDict["mq"] == (0, 0):
                    qinfo = self.tr("empty queue")
                else:
                    li = []
                    if infoDict["mq"][0]:
                        li.append(self.tr("{0} applied")
                                  .format(infoDict["mq"][0]))
                    if infoDict["mq"][1]:
                        li.append(self.tr("{0} unapplied")
                                  .format(infoDict["mq"][1]))
                    qinfo = "<br/>".join(li)
                info.append(self.tr(
                    "<tr><td><b>Queues Status</b></td><td>{0}</td></tr>")
                    .format(qinfo))
            info.append("</table>")
        else:
            info = [self.tr("<p>No status information available.</p>")]
        
        self.summary.insertHtml("\n".join(info))

eric ide

mercurial