eric6/Plugins/CheckerPlugins/SyntaxChecker/SyntaxCheck.py

Tue, 13 Apr 2021 18:02:59 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Tue, 13 Apr 2021 18:02:59 +0200
changeset 8234
fcb6b4b96274
parent 8217
385f60c94548
child 8243
cc717c2ae956
permissions
-rw-r--r--

Applied some more code simplifications suggested by the new Simplify checker (Y108: use ternary operator) (batch 2).

# -*- coding: utf-8 -*-

# Copyright (c) 2011 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the syntax check for Python 3.
"""

import queue
import ast
import re
import traceback
import multiprocessing


try:
    from pyflakes.checker import Checker
    from pyflakes.messages import ImportStarUsed, ImportStarUsage
except ImportError:
    pass

VcsConflictMarkerRegExpList = (
    re.compile(
        r"""^<<<<<<< .*?\|\|\|\|\|\|\| .*?=======.*?>>>>>>> .*?$""",
        re.MULTILINE | re.DOTALL
    ),
    re.compile(
        r"""^<<<<<<< .*?=======.*?>>>>>>> .*?$""",
        re.MULTILINE | re.DOTALL
    ),
)


def initService():
    """
    Initialize the service and return the entry point.
    
    @return the entry point for the background client (function)
    """
    return syntaxAndPyflakesCheck


def initBatchService():
    """
    Initialize the batch service and return the entry point.
    
    @return the entry point for the background client (function)
    """
    return syntaxAndPyflakesBatchCheck


def normalizeCode(codestring):
    """
    Function to normalize the given code.
    
    @param codestring code to be normalized (string)
    @return normalized code (string)
    """
    codestring = codestring.replace("\r\n", "\n").replace("\r", "\n")

    if codestring and codestring[-1] != '\n':
        codestring += '\n'
    
    return codestring


def extractLineFlags(line, startComment="#", endComment="", flagsLine=False):
    """
    Function to extract flags starting and ending with '__' from a line
    comment.
    
    @param line line to extract flags from (string)
    @param startComment string identifying the start of the comment (string)
    @param endComment string identifying the end of a comment (string)
    @param flagsLine flag indicating to check for a flags only line (bool)
    @return list containing the extracted flags (list of strings)
    """
    flags = []
    
    if not flagsLine or (
       flagsLine and line.strip().startswith(startComment)):
        pos = line.rfind(startComment)
        if pos >= 0:
            comment = line[pos + len(startComment):].strip()
            if endComment:
                endPos = line.rfind(endComment)
                if endPos >= 0:
                    comment = comment[:endPos]
            flags = [f.strip() for f in comment.split()
                     if (f.startswith("__") and f.endswith("__"))]
            flags += [f.strip().lower() for f in comment.split()
                      if f in ("noqa", "NOQA")]
    return flags


def syntaxAndPyflakesCheck(filename, codestring, checkFlakes=True,
                           ignoreStarImportWarnings=False):
    """
    Function to compile one Python source file to Python bytecode
    and to perform a pyflakes check.
    
    @param filename source filename (string)
    @param codestring string containing the code to compile (string)
    @param checkFlakes flag indicating to do a pyflakes check (boolean)
    @param ignoreStarImportWarnings flag indicating to
        ignore 'star import' warnings (boolean)
    @return dictionary with the keys 'error' and 'warnings' which
            hold a list containing details about the error/ warnings
            (file name, line number, column, codestring (only at syntax
            errors), the message, a list with arguments for the message)
    """
    return __syntaxAndPyflakesCheck(filename, codestring, checkFlakes,
                                    ignoreStarImportWarnings)


def syntaxAndPyflakesBatchCheck(argumentsList, send, fx, cancelled,
                                maxProcesses=0):
    """
    Module function to check syntax for a batch of files.
    
    @param argumentsList list of arguments tuples as given for
        syntaxAndPyflakesCheck
    @type list
    @param send reference to send function
    @type func
    @param fx registered service name
    @type str
    @param cancelled reference to function checking for a cancellation
    @type func
    @param maxProcesses number of processes to be used
    @type int
    """
    if maxProcesses == 0:
        # determine based on CPU count
        try:
            NumberOfProcesses = multiprocessing.cpu_count()
            if NumberOfProcesses >= 1:
                NumberOfProcesses -= 1
        except NotImplementedError:
            NumberOfProcesses = 1
    else:
        NumberOfProcesses = maxProcesses

    # Create queues
    taskQueue = multiprocessing.Queue()
    doneQueue = multiprocessing.Queue()

    # Submit tasks (initially two time number of processes
    initialTasks = 2 * NumberOfProcesses
    for task in argumentsList[:initialTasks]:
        taskQueue.put(task)

    # Start worker processes
    for _ in range(NumberOfProcesses):
        multiprocessing.Process(
            target=worker, args=(taskQueue, doneQueue)
        ).start()

    # Get and send results
    endIndex = len(argumentsList) - initialTasks
    for i in range(len(argumentsList)):
        resultSent = False
        wasCancelled = False
        
        while not resultSent:
            try:
                # get result (waiting max. 3 seconds and send it to frontend
                filename, result = doneQueue.get()
                send(fx, filename, result)
                resultSent = True
            except queue.Empty:
                # ignore empty queue, just carry on
                if cancelled():
                    wasCancelled = True
                    break
        
        if wasCancelled or cancelled():
            # just exit the loop ignoring the results of queued tasks
            break
        
        if i < endIndex:
            taskQueue.put(argumentsList[i + initialTasks])

    # Tell child processes to stop
    for _ in range(NumberOfProcesses):
        taskQueue.put('STOP')


def worker(inputQueue, outputQueue):
    """
    Module function acting as the parallel worker for the syntax check.
    
    @param inputQueue input queue (multiprocessing.Queue)
    @param outputQueue output queue (multiprocessing.Queue)
    """
    for filename, args in iter(inputQueue.get, 'STOP'):
        source, checkFlakes, ignoreStarImportWarnings = args
        result = __syntaxAndPyflakesCheck(filename, source, checkFlakes,
                                          ignoreStarImportWarnings)
        outputQueue.put((filename, result))


def __syntaxAndPyflakesCheck(filename, codestring, checkFlakes=True,
                             ignoreStarImportWarnings=False):
    """
    Function to compile one Python source file to Python bytecode
    and to perform a pyflakes check.
    
    @param filename source filename
    @type str
    @param codestring string containing the code to compile
    @type str
    @param checkFlakes flag indicating to do a pyflakes check
    @type bool
    @param ignoreStarImportWarnings flag indicating to
        ignore 'star import' warnings
    @type bool
    @return dictionary with the keys 'error' and 'warnings' which
            hold a list containing details about the error/ warnings
            (file name, line number, column, codestring (only at syntax
            errors), the message, a list with arguments for the message)
    @rtype dict
    """
    import builtins
    
    try:
        codestring = normalizeCode(codestring)
        
        # Check for VCS conflict markers
        for conflictMarkerRe in VcsConflictMarkerRegExpList:
            conflict = conflictMarkerRe.search(codestring)
            if conflict is not None:
                start, i = conflict.span()
                lineindex = 1 + codestring.count("\n", 0, start)
                return [{'error':
                         (filename, lineindex, 0, "",
                          "VCS conflict marker found")
                         }]
        
        if filename.endswith('.ptl'):
            try:
                import quixote.ptl_compile
            except ImportError:
                return [{'error': (filename, 0, 0, '',
                        'Quixote plugin not found.')}]
            template = quixote.ptl_compile.Template(codestring, filename)
            template.compile()
        else:
            module = builtins.compile(
                codestring, filename, 'exec', ast.PyCF_ONLY_AST)
    except SyntaxError as detail:
        index = 0
        code = ""
        error = ""
        lines = traceback.format_exception_only(SyntaxError, detail)
        match = re.match(r'\s*File "(.+)", line (\d+)',
                         lines[0].replace('<string>', filename))
        if match is not None:
            fn, line = match.group(1, 2)
            if lines[1].startswith('SyntaxError:'):
                error = re.match('SyntaxError: (.+)', lines[1]).group(1)
            else:
                code = re.match('(.+)', lines[1]).group(1)
                for seLine in lines[2:]:
                    if seLine.startswith('SyntaxError:'):
                        error = re.match('SyntaxError: (.+)', seLine).group(1)
                    elif seLine.rstrip().endswith('^'):
                        index = len(seLine.rstrip()) - 4
        else:
            fn = detail.filename
            line = detail.lineno or 1
            error = detail.msg
        return [{'error': (fn, int(line), index, code.strip(), error)}]
    except ValueError as detail:
        try:
            fn = detail.filename
            line = detail.lineno
            error = detail.msg
        except AttributeError:
            fn = filename
            line = 1
            error = str(detail)
        return [{'error': (fn, line, 0, "", error)}]
    except Exception as detail:
        try:
            fn = detail.filename
            line = detail.lineno
            error = detail.msg
            return [{'error': (fn, line, 0, "", error)}]
        except Exception:           # secok
            pass
    
    # pyflakes
    if not checkFlakes:
        return [{}]
    
    results = []
    lines = codestring.splitlines()
    try:
        warnings = Checker(module, filename, withDoctest=True)
        warnings.messages.sort(key=lambda a: a.lineno)
        for warning in warnings.messages:
            if (
                ignoreStarImportWarnings and
                isinstance(warning, (ImportStarUsed, ImportStarUsage))
            ):
                continue
            
            _fn, lineno, col, message, msg_args = warning.getMessageData()
            lineFlags = extractLineFlags(lines[lineno - 1].strip())
            try:
                lineFlags += extractLineFlags(lines[lineno].strip(),
                                              flagsLine=True)
            except IndexError:
                pass
            if (
                "__IGNORE_WARNING__" not in lineFlags and
                "noqa" not in lineFlags
            ):
                results.append((_fn, lineno, col, "", message, msg_args))
    except SyntaxError as err:
        msg = err.text.strip() if err.text.strip() else err.msg
        results.append((filename, err.lineno, 0, "FLAKES_ERROR", msg, []))
    
    return [{'warnings': results}]

eric ide

mercurial