--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/eric7/Plugins/CheckerPlugins/SyntaxChecker/SyntaxCheck.py Thu Jul 07 11:23:56 2022 +0200 @@ -0,0 +1,327 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2011 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the syntax check for Python 3. +""" + +import queue +import ast +import re +import traceback +import multiprocessing +import contextlib + +with contextlib.suppress(ImportError): + from pyflakes.checker import Checker + from pyflakes.messages import ImportStarUsed, ImportStarUsage + +VcsConflictMarkerRegExpList = ( + re.compile( + r"""^<<<<<<< .*?\|\|\|\|\|\|\| .*?=======.*?>>>>>>> .*?$""", + re.MULTILINE | re.DOTALL + ), + re.compile( + r"""^<<<<<<< .*?=======.*?>>>>>>> .*?$""", + re.MULTILINE | re.DOTALL + ), +) + + +def initService(): + """ + Initialize the service and return the entry point. + + @return the entry point for the background client (function) + """ + return syntaxAndPyflakesCheck + + +def initBatchService(): + """ + Initialize the batch service and return the entry point. + + @return the entry point for the background client (function) + """ + return syntaxAndPyflakesBatchCheck + + +def normalizeCode(codestring): + """ + Function to normalize the given code. + + @param codestring code to be normalized (string) + @return normalized code (string) + """ + codestring = codestring.replace("\r\n", "\n").replace("\r", "\n") + + if codestring and codestring[-1] != '\n': + codestring += '\n' + + return codestring + + +def extractLineFlags(line, startComment="#", endComment="", flagsLine=False): + """ + Function to extract flags starting and ending with '__' from a line + comment. + + @param line line to extract flags from (string) + @param startComment string identifying the start of the comment (string) + @param endComment string identifying the end of a comment (string) + @param flagsLine flag indicating to check for a flags only line (bool) + @return list containing the extracted flags (list of strings) + """ + flags = [] + + if not flagsLine or ( + flagsLine and line.strip().startswith(startComment)): + pos = line.rfind(startComment) + if pos >= 0: + comment = line[pos + len(startComment):].strip() + if endComment: + endPos = line.rfind(endComment) + if endPos >= 0: + comment = comment[:endPos] + flags = [f.strip() for f in comment.split() + if (f.startswith("__") and f.endswith("__"))] + flags += [f.strip().lower() for f in comment.split() + if f in ("noqa", "NOQA")] + return flags + + +def syntaxAndPyflakesCheck(filename, codestring, checkFlakes=True, + ignoreStarImportWarnings=False): + """ + Function to compile one Python source file to Python bytecode + and to perform a pyflakes check. + + @param filename source filename (string) + @param codestring string containing the code to compile (string) + @param checkFlakes flag indicating to do a pyflakes check (boolean) + @param ignoreStarImportWarnings flag indicating to + ignore 'star import' warnings (boolean) + @return dictionary with the keys 'error' and 'warnings' which + hold a list containing details about the error/ warnings + (file name, line number, column, codestring (only at syntax + errors), the message, a list with arguments for the message) + """ + return __syntaxAndPyflakesCheck(filename, codestring, checkFlakes, + ignoreStarImportWarnings) + + +def syntaxAndPyflakesBatchCheck(argumentsList, send, fx, cancelled, + maxProcesses=0): + """ + Module function to check syntax for a batch of files. + + @param argumentsList list of arguments tuples as given for + syntaxAndPyflakesCheck + @type list + @param send reference to send function + @type func + @param fx registered service name + @type str + @param cancelled reference to function checking for a cancellation + @type func + @param maxProcesses number of processes to be used + @type int + """ + if maxProcesses == 0: + # determine based on CPU count + try: + NumberOfProcesses = multiprocessing.cpu_count() + if NumberOfProcesses >= 1: + NumberOfProcesses -= 1 + except NotImplementedError: + NumberOfProcesses = 1 + else: + NumberOfProcesses = maxProcesses + + # Create queues + taskQueue = multiprocessing.Queue() + doneQueue = multiprocessing.Queue() + + # Submit tasks (initially two time number of processes + initialTasks = 2 * NumberOfProcesses + for task in argumentsList[:initialTasks]: + taskQueue.put(task) + + # Start worker processes + workers = [ + multiprocessing.Process( + target=workerTask, args=(taskQueue, doneQueue) + ) for _ in range(NumberOfProcesses) + ] + for worker in workers: + worker.start() + + # Get and send results + endIndex = len(argumentsList) - initialTasks + for i in range(len(argumentsList)): + resultSent = False + wasCancelled = False + + while not resultSent: + try: + # get result (waiting max. 3 seconds and send it to frontend + filename, result = doneQueue.get() + send(fx, filename, result) + resultSent = True + except queue.Empty: + # ignore empty queue, just carry on + if cancelled(): + wasCancelled = True + break + + if wasCancelled or cancelled(): + # just exit the loop ignoring the results of queued tasks + break + + if i < endIndex: + taskQueue.put(argumentsList[i + initialTasks]) + + # Tell child processes to stop + for _ in range(NumberOfProcesses): + taskQueue.put('STOP') + + for worker in workers: + worker.join() + worker.close() + + +def workerTask(inputQueue, outputQueue): + """ + Module function acting as the parallel worker for the syntax check. + + @param inputQueue input queue (multiprocessing.Queue) + @param outputQueue output queue (multiprocessing.Queue) + """ + for filename, args in iter(inputQueue.get, 'STOP'): + source, checkFlakes, ignoreStarImportWarnings = args + result = __syntaxAndPyflakesCheck(filename, source, checkFlakes, + ignoreStarImportWarnings) + outputQueue.put((filename, result)) + + +def __syntaxAndPyflakesCheck(filename, codestring, checkFlakes=True, + ignoreStarImportWarnings=False): + """ + Function to compile one Python source file to Python bytecode + and to perform a pyflakes check. + + @param filename source filename + @type str + @param codestring string containing the code to compile + @type str + @param checkFlakes flag indicating to do a pyflakes check + @type bool + @param ignoreStarImportWarnings flag indicating to + ignore 'star import' warnings + @type bool + @return dictionary with the keys 'error' and 'warnings' which + hold a list containing details about the error/ warnings + (file name, line number, column, codestring (only at syntax + errors), the message, a list with arguments for the message) + @rtype dict + """ + import builtins + + try: + codestring = normalizeCode(codestring) + + # Check for VCS conflict markers + for conflictMarkerRe in VcsConflictMarkerRegExpList: + conflict = conflictMarkerRe.search(codestring) + if conflict is not None: + start, i = conflict.span() + lineindex = 1 + codestring.count("\n", 0, start) + return [{'error': + (filename, lineindex, 0, "", + "VCS conflict marker found") + }] + + if filename.endswith('.ptl'): + try: + import quixote.ptl_compile + except ImportError: + return [{'error': (filename, 0, 0, '', + 'Quixote plugin not found.')}] + template = quixote.ptl_compile.Template(codestring, filename) + template.compile() + else: + module = builtins.compile( + codestring, filename, 'exec', ast.PyCF_ONLY_AST) + except SyntaxError as detail: + index = 0 + code = "" + error = "" + lines = traceback.format_exception_only(SyntaxError, detail) + match = re.match(r'\s*File "(.+)", line (\d+)', + lines[0].replace('<string>', filename)) + if match is not None: + fn, line = match.group(1, 2) + if lines[1].startswith('SyntaxError:'): + error = re.match('SyntaxError: (.+)', lines[1]).group(1) + else: + code = re.match('(.+)', lines[1]).group(1) + for seLine in lines[2:]: + if seLine.startswith('SyntaxError:'): + error = re.match('SyntaxError: (.+)', seLine).group(1) + elif seLine.rstrip().endswith('^'): + index = len(seLine.rstrip()) - 4 + else: + fn = detail.filename + line = detail.lineno or 1 + error = detail.msg + return [{'error': (fn, int(line), index, code.strip(), error)}] + except ValueError as detail: + try: + fn = detail.filename + line = detail.lineno + error = detail.msg + except AttributeError: + fn = filename + line = 1 + error = str(detail) + return [{'error': (fn, line, 0, "", error)}] + except Exception as detail: + with contextlib.suppress(Exception): + fn = detail.filename + line = detail.lineno + error = detail.msg + return [{'error': (fn, line, 0, "", error)}] + + # pyflakes + if not checkFlakes: + return [{}] + + results = [] + lines = codestring.splitlines() + try: + warnings = Checker(module, filename, withDoctest=True) + warnings.messages.sort(key=lambda a: a.lineno) + for warning in warnings.messages: + if ( + ignoreStarImportWarnings and + isinstance(warning, (ImportStarUsed, ImportStarUsage)) + ): + continue + + _fn, lineno, col, message, msg_args = warning.getMessageData() + lineFlags = extractLineFlags(lines[lineno - 1].strip()) + with contextlib.suppress(IndexError): + lineFlags += extractLineFlags(lines[lineno].strip(), + flagsLine=True) + if ( + "__IGNORE_WARNING__" not in lineFlags and + "noqa" not in lineFlags + ): + results.append((_fn, lineno, col, "", message, msg_args)) + except SyntaxError as err: + msg = err.text.strip() if err.text.strip() else err.msg + results.append((filename, err.lineno, 0, "FLAKES_ERROR", msg, [])) + + return [{'warnings': results}]