--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/eric7/Plugins/CheckerPlugins/CodeStyleChecker/CodeStyleChecker.py Sat May 15 18:45:04 2021 +0200 @@ -0,0 +1,535 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2011 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the code style checker. +""" + +import queue +import ast +import sys +import multiprocessing +import contextlib + +import pycodestyle +from Naming.NamingStyleChecker import NamingStyleChecker + +# register the name checker +pycodestyle.register_check(NamingStyleChecker, NamingStyleChecker.Codes) + +from DocStyle.DocStyleChecker import DocStyleChecker +from Miscellaneous.MiscellaneousChecker import MiscellaneousChecker +from Complexity.ComplexityChecker import ComplexityChecker +from Security.SecurityChecker import SecurityChecker +from PathLib.PathlibChecker import PathlibChecker +from Simplify.SimplifyChecker import SimplifyChecker + + +def initService(): + """ + Initialize the service and return the entry point. + + @return the entry point for the background client (function) + """ + return codeStyleCheck + + +def initBatchService(): + """ + Initialize the batch service and return the entry point. + + @return the entry point for the background client (function) + """ + return codeStyleBatchCheck + + +class CodeStyleCheckerReport(pycodestyle.BaseReport): + """ + Class implementing a special report to be used with our dialog. + """ + def __init__(self, options): + """ + Constructor + + @param options options for the report (optparse.Values) + """ + super().__init__(options) + + self.__repeat = options.repeat + self.errors = [] + + def error_args(self, line_number, offset, code, check, *args): + """ + Public method to collect the error messages. + + @param line_number line number of the issue (integer) + @param offset position within line of the issue (integer) + @param code message code (string) + @param check reference to the checker function (function) + @param args arguments for the message (list) + @return error code (string) + """ + code = super().error_args( + line_number, offset, code, check, *args) + if code and (self.counters[code] == 1 or self.__repeat): + self.errors.append( + { + "file": self.filename, + "line": line_number, + "offset": offset, + "code": code, + "args": args, + } + ) + return code + + +def extractLineFlags(line, startComment="#", endComment="", flagsLine=False): + """ + Function to extract flags starting and ending with '__' from a line + comment. + + @param line line to extract flags from (string) + @param startComment string identifying the start of the comment (string) + @param endComment string identifying the end of a comment (string) + @param flagsLine flag indicating to check for a flags only line (bool) + @return list containing the extracted flags (list of strings) + """ + flags = [] + + if not flagsLine or ( + flagsLine and line.strip().startswith(startComment)): + pos = line.rfind(startComment) + if pos >= 0: + comment = line[pos + len(startComment):].strip() + if endComment: + endPos = line.rfind(endComment) + if endPos >= 0: + comment = comment[:endPos] + flags = [f.strip() for f in comment.split() + if (f.startswith("__") and f.endswith("__"))] + flags += [f.strip().lower() for f in comment.split() + if f in ("noqa", "NOQA", + "nosec", "NOSEC", + "secok", "SECOK")] + return flags + + +def ignoreCode(code, lineFlags): + """ + Function to check, if the given code should be ignored as per line flags. + + @param code error code to be checked + @type str + @param lineFlags list of line flags to check against + @type list of str + @return flag indicating to ignore the code + @rtype bool + """ + if lineFlags: + + if ( + "__IGNORE_WARNING__" in lineFlags or + "noqa" in lineFlags or + "nosec" in lineFlags + ): + # ignore all warning codes + return True + + for flag in lineFlags: + # check individual warning code + if flag.startswith("__IGNORE_WARNING_"): + ignoredCode = flag[2:-2].rsplit("_", 1)[-1] + if code.startswith(ignoredCode): + return True + + return False + + +def securityOk(code, lineFlags): + """ + Function to check, if the given code is an acknowledged security report. + + @param code error code to be checked + @type str + @param lineFlags list of line flags to check against + @type list of str + @return flag indicating an acknowledged security report + @rtype bool + """ + if lineFlags: + return "secok" in lineFlags + + return False + + +def codeStyleCheck(filename, source, args): + """ + Do the code style check and/or fix found errors. + + @param filename source filename + @type str + @param source string containing the code to check + @type str + @param args arguments used by the codeStyleCheck function (list of + excludeMessages, includeMessages, repeatMessages, fixCodes, + noFixCodes, fixIssues, maxLineLength, maxDocLineLength, blankLines, + hangClosing, docType, codeComplexityArgs, miscellaneousArgs, errors, + eol, encoding, backup) + @type list of (str, str, bool, str, str, bool, int, list of (int, int), + bool, str, dict, dict, list of str, str, str, bool) + @return tuple of statistics (dict) and list of results (tuple for each + found violation of style (lineno, position, text, ignored, fixed, + autofixing, fixedMsg)) + @rtype tuple of (dict, list of tuples of (int, int, str, bool, bool, bool, + str)) + """ + return __checkCodeStyle(filename, source, args) + + +def codeStyleBatchCheck(argumentsList, send, fx, cancelled, maxProcesses=0): + """ + Module function to check code style for a batch of files. + + @param argumentsList list of arguments tuples as given for codeStyleCheck + @type list + @param send reference to send function + @type func + @param fx registered service name + @type str + @param cancelled reference to function checking for a cancellation + @type func + @param maxProcesses number of processes to be used + @type int + """ + if maxProcesses == 0: + # determine based on CPU count + try: + NumberOfProcesses = multiprocessing.cpu_count() + if NumberOfProcesses >= 1: + NumberOfProcesses -= 1 + except NotImplementedError: + NumberOfProcesses = 1 + else: + NumberOfProcesses = maxProcesses + + # Create queues + taskQueue = multiprocessing.Queue() + doneQueue = multiprocessing.Queue() + + # Submit tasks (initially two time number of processes + initialTasks = 2 * NumberOfProcesses + for task in argumentsList[:initialTasks]: + taskQueue.put(task) + + # Start worker processes + for _ in range(NumberOfProcesses): + multiprocessing.Process( + target=worker, args=(taskQueue, doneQueue) + ).start() + + # Get and send results + endIndex = len(argumentsList) - initialTasks + for i in range(len(argumentsList)): + resultSent = False + wasCancelled = False + + while not resultSent: + try: + # get result (waiting max. 3 seconds and send it to frontend + filename, result = doneQueue.get(timeout=3) + send(fx, filename, result) + resultSent = True + except queue.Empty: + # ignore empty queue, just carry on + if cancelled(): + wasCancelled = True + break + + if wasCancelled or cancelled(): + # just exit the loop ignoring the results of queued tasks + break + + if i < endIndex: + taskQueue.put(argumentsList[i + initialTasks]) + + # Tell child processes to stop + for _ in range(NumberOfProcesses): + taskQueue.put('STOP') + + +def worker(inputQueue, outputQueue): + """ + Module function acting as the parallel worker for the style check. + + @param inputQueue input queue (multiprocessing.Queue) + @param outputQueue output queue (multiprocessing.Queue) + """ + for filename, source, args in iter(inputQueue.get, 'STOP'): + result = __checkCodeStyle(filename, source, args) + outputQueue.put((filename, result)) + + +def __checkSyntax(filename, source): + """ + Private module function to perform a syntax check. + + @param filename source filename + @type str + @param source string containing the code to check + @type str + @return tuple containing the error dictionary with syntax error details, + a statistics dictionary and None or a tuple containing two None and + the generated AST tree + @rtype tuple of (dict, dict, None) or tuple of (None, None, ast.Module) + """ + src = "".join(source) + + try: + tree = ( + ast.parse(src, filename, 'exec', type_comments=True) + # need the 'type_comments' parameter to include type annotations + if sys.version_info >= (3, 8) else + ast.parse(src, filename, 'exec') + ) + return None, None, tree + except (SyntaxError, TypeError): + exc_type, exc = sys.exc_info()[:2] + if len(exc.args) > 1: + offset = exc.args[1] + if len(offset) > 2: + offset = offset[1:3] + else: + offset = (1, 0) + return ( + { + "file": filename, + "line": offset[0], + "offset": offset[1], + "code": "E901", + "args": [exc_type.__name__, exc.args[0]], + }, { + "E901": 1, + }, + None + ) + + +def __checkCodeStyle(filename, source, args): + """ + Private module function to perform the code style check and/or fix + found errors. + + @param filename source filename + @type str + @param source string containing the code to check + @type str + @param args arguments used by the codeStyleCheck function (list of + excludeMessages, includeMessages, repeatMessages, fixCodes, + noFixCodes, fixIssues, maxLineLength, maxDocLineLength, blankLines, + hangClosing, docType, codeComplexityArgs, miscellaneousArgs, + annotationArgs, securityArgs, errors, eol, encoding, backup) + @type list of (str, str, bool, str, str, bool, int, list of (int, int), + bool, str, dict, dict, dict, list of str, str, str, bool) + @return tuple of statistics data and list of result dictionaries with + keys: + <ul> + <li>file: file name</li> + <li>line: line_number</li> + <li>offset: offset within line</li> + <li>code: message code</li> + <li>args: list of arguments to format the message</li> + <li>ignored: flag indicating this issue was ignored</li> + <li>fixed: flag indicating this issue was fixed</li> + <li>autofixing: flag indicating that a fix can be done</li> + <li>fixcode: message code for the fix</li> + <li>fixargs: list of arguments to format the fix message</li> + </ul> + @rtype tuple of (dict, list of dict) + """ + (excludeMessages, includeMessages, repeatMessages, fixCodes, noFixCodes, + fixIssues, maxLineLength, maxDocLineLength, blankLines, hangClosing, + docType, codeComplexityArgs, miscellaneousArgs, annotationArgs, + securityArgs, errors, eol, encoding, backup) = args + + stats = {} + + if fixIssues: + from CodeStyleFixer import CodeStyleFixer + fixer = CodeStyleFixer( + filename, source, fixCodes, noFixCodes, + maxLineLength, blankLines, True, eol, backup) + # always fix in place + else: + fixer = None + + if not errors: + if includeMessages: + select = [s.strip() for s in + includeMessages.split(',') if s.strip()] + else: + select = [] + if excludeMessages: + ignore = [i.strip() for i in + excludeMessages.split(',') if i.strip()] + else: + ignore = [] + + syntaxError, syntaxStats, tree = __checkSyntax(filename, source) + + # perform the checks only, if syntax is ok and AST tree was generated + if tree: + # check coding style + pycodestyle.BLANK_LINES_CONFIG = { + # Top level class and function. + 'top_level': blankLines[0], + # Methods and nested class and function. + 'method': blankLines[1], + } + styleGuide = pycodestyle.StyleGuide( + reporter=CodeStyleCheckerReport, + repeat=repeatMessages, + select=select, + ignore=ignore, + max_line_length=maxLineLength, + max_doc_length=maxDocLineLength, + hang_closing=hangClosing, + ) + report = styleGuide.check_files([filename]) + stats.update(report.counters) + errors = report.errors + + # check documentation style + docStyleChecker = DocStyleChecker( + source, filename, select, ignore, [], repeatMessages, + maxLineLength=maxDocLineLength, docType=docType) + docStyleChecker.run() + stats.update(docStyleChecker.counters) + errors += docStyleChecker.errors + + # miscellaneous additional checks + miscellaneousChecker = MiscellaneousChecker( + source, filename, tree, select, ignore, [], repeatMessages, + miscellaneousArgs) + miscellaneousChecker.run() + stats.update(miscellaneousChecker.counters) + errors += miscellaneousChecker.errors + + # check code complexity + complexityChecker = ComplexityChecker( + source, filename, tree, select, ignore, codeComplexityArgs) + complexityChecker.run() + stats.update(complexityChecker.counters) + errors += complexityChecker.errors + + # check function annotations + if sys.version_info >= (3, 8, 0): + # annotations with type comments are supported from + # Python 3.8 on + from Annotations.AnnotationsChecker import AnnotationsChecker + annotationsChecker = AnnotationsChecker( + source, filename, tree, select, ignore, [], repeatMessages, + annotationArgs) + annotationsChecker.run() + stats.update(annotationsChecker.counters) + errors += annotationsChecker.errors + + # check for security issues + securityChecker = SecurityChecker( + source, filename, tree, select, ignore, [], repeatMessages, + securityArgs) + securityChecker.run() + stats.update(securityChecker.counters) + errors += securityChecker.errors + + # check for pathlib usage + pathlibChecker = PathlibChecker( + source, filename, tree, select, ignore, [], repeatMessages) + pathlibChecker.run() + stats.update(pathlibChecker.counters) + errors += pathlibChecker.errors + + # check for code simplifications + simplifyChecker = SimplifyChecker( + source, filename, tree, select, ignore, [], repeatMessages) + simplifyChecker.run() + stats.update(simplifyChecker.counters) + errors += simplifyChecker.errors + + elif syntaxError: + errors = [syntaxError] + stats.update(syntaxStats) + + errorsDict = {} + for error in errors: + if error["line"] > len(source): + error["line"] = len(source) + # inverse processing of messages and fixes + errorLine = errorsDict.setdefault(error["line"], []) + errorLine.append((error["offset"], error)) + deferredFixes = {} + results = [] + for lineno, errorsList in errorsDict.items(): + errorsList.sort(key=lambda x: x[0], reverse=True) + for _, error in errorsList: + error.update({ + "ignored": False, + "fixed": False, + "autofixing": False, + "fixcode": "", + "fixargs": [], + "securityOk": False, + }) + + if source: + code = error["code"] + lineFlags = extractLineFlags(source[lineno - 1].strip()) + with contextlib.suppress(IndexError): + lineFlags += extractLineFlags(source[lineno].strip(), + flagsLine=True) + + if securityOk(code, lineFlags): + error["securityOk"] = True + + if ignoreCode(code, lineFlags): + error["ignored"] = True + else: + if fixer: + res, fixcode, fixargs, id_ = fixer.fixIssue( + lineno, error["offset"], code) + if res == -1: + deferredFixes[id_] = error + else: + error.update({ + "fixed": res == 1, + "autofixing": True, + "fixcode": fixcode, + "fixargs": fixargs, + }) + + results.append(error) + + if fixer: + deferredResults = fixer.finalize() + for id_ in deferredResults: + fixed, fixcode, fixargs = deferredResults[id_] + error = deferredFixes[id_] + error.update({ + "ignored": False, + "fixed": fixed == 1, + "autofixing": True, + "fixcode": fixcode, + "fixargs": fixargs, + }) + + saveError = fixer.saveFile(encoding) + if saveError: + for error in results: + error.update({ + "fixcode": saveError[0], + "fixargs": saveError[1], + }) + + return stats, results