--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ExtensionProtobuf/protoclbr.py Mon Dec 12 16:55:43 2022 +0100 @@ -0,0 +1,394 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2017 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Parse a ProtoBuf protocol file and retrieve messages, enums, services and +rpc methods. + +It is based on the Python class browser found in this package. +""" + +import os +import re + +from eric7 import Utilities +from eric7.Utilities.ClassBrowsers import ClbrBaseClasses + +_getnext = re.compile( + r""" + (?P<String> + " [^"\\\n]* (?: \\. [^"\\\n]*)* " + ) + +| (?P<Comment> + ^ [ \t]* // .*? $ + | + ^ [ \t]* /\* .*? \*/ + ) + +| (?P<Message> + ^ + (?P<MessageIndent> [ \t]* ) + message [ \t]+ + (?P<MessageName> [a-zA-Z_] [a-zA-Z0-9_]* ) + [ \t]* { + ) + +| (?P<Enum> + ^ + (?P<EnumIndent> [ \t]* ) + enum [ \t]+ + (?P<EnumName> [a-zA-Z_] [a-zA-Z0-9_]* ) + [ \t]* { + ) + +| (?P<Service> + ^ + (?P<ServiceIndent> [ \t]* ) + service [ \t]+ + (?P<ServiceName> [a-zA-Z_] [a-zA-Z0-9_]* ) + [ \t]* { + ) + +| (?P<Method> + ^ + (?P<MethodIndent> [ \t]* ) + rpc [ \t]+ + (?P<MethodName> [a-zA-Z_] [a-zA-Z0-9_]* ) + [ \t]* + \( + (?P<MethodSignature> [^)]+? ) + \) + [ \t]+ + returns + [ \t]* + \( + (?P<MethodReturn> [^)]+? ) + \) + [ \t]* + ) + +| (?P<Begin> + [ \t]* { + ) + +| (?P<End> + [ \t]* } [ \t]* ;? + )""", + re.VERBOSE | re.DOTALL | re.MULTILINE, +).search + +# function to replace comments +_commentsub = re.compile(r"""//[^\n]*\n|//[^\n]*$""").sub +# function to normalize whitespace +_normalize = re.compile(r"""[ \t]{2,}""").sub + + +class VisibilityMixin(ClbrBaseClasses.ClbrVisibilityMixinBase): + """ + Mixin class implementing the notion of visibility. + """ + + def __init__(self): + """ + Constructor + """ + self.setPublic() + + +class Message(ClbrBaseClasses.Module, VisibilityMixin): + """ + Class to represent a ProtoBuf Message. + """ + + def __init__(self, module, name, file, lineno): + """ + Constructor + + @param module name of the module containing this message + @type str + @param name name of this message + @type str + @param file filename containing this message + @type str + @param lineno linenumber of the message definition + @type int + """ + ClbrBaseClasses.Module.__init__(self, module, name, file, lineno) + VisibilityMixin.__init__(self) + + +class Enum(ClbrBaseClasses.Enum, VisibilityMixin): + """ + Class to represent a ProtoBuf Enum. + """ + + def __init__(self, module, name, file, lineno): + """ + Constructor + + @param module name of the module containing this enum + @type str + @param name name of this enum + @type str + @param file filename containing this enum + @type str + @param lineno linenumber of the message enum + @type int + """ + ClbrBaseClasses.Enum.__init__(self, module, name, file, lineno) + VisibilityMixin.__init__(self) + + +class Service(ClbrBaseClasses.Class, VisibilityMixin): + """ + Class to represent a ProtoBuf Service. + """ + + def __init__(self, module, name, file, lineno): + """ + Constructor + + @param module name of the module containing this service + @type str + @param name name of this service + @type str + @param file filename containing this service + @type str + @param lineno linenumber of the service definition + @type int + """ + ClbrBaseClasses.Class.__init__(self, module, name, None, file, lineno) + VisibilityMixin.__init__(self) + + +class ServiceMethod(ClbrBaseClasses.Function, VisibilityMixin): + """ + Class to represent a ProtoBuf Service Method. + """ + + def __init__(self, name, file, lineno, signature, returns): + """ + Constructor + + @param name name of this service method + @type str + @param file filename containing this service method + @type str + @param lineno linenumber of the service method definition + @type int + @param signature parameter list of the service method + @type str + @param returns return type of the service method + @type str + """ + ClbrBaseClasses.Function.__init__( + self, + None, + name, + file, + lineno, + signature, + annotation="-> {0}".format(returns), + ) + VisibilityMixin.__init__(self) + + +def readmodule_ex(module, path=None): + """ + Read a ProtoBuf protocol file and return a dictionary of messages, enums, + services and rpc methods. + + @param module name of the ProtoBuf protocol file + @type str + @param path path the file should be searched in + @type list of str + @return the resulting dictionary + @rtype dict + """ + path = [] if path is None else path[:] + for p in path: # search in path + pathname = os.path.join(p, module) + if os.path.exists(pathname): + filename = pathname + try: + src = Utilities.readEncodedFile(filename)[0] + except (UnicodeError, OSError): + # can't do anything with this module + return {} + + return scan(src, filename, module) + return {} + + +def scan(src, file, module): + """ + Public method to scan the given source text. + + @param src source text to be scanned + @type str + @param file file name associated with the source text + @type str + @param module module name associated with the source text + @type str + @return dictionary containing the extracted data + @rtype dict + """ + + def calculateEndline(lineno, lines): + """ + Function to calculate the end line. + + @param lineno line number to start at (one based) + @type int + @param lines list of source lines + @type list of str + @return end line (one based) + @rtype int + """ + # convert lineno to be zero based + lineno -= 1 + # 1. search for opening brace '{' + while lineno < len(lines) and "{" not in lines[lineno]: + lineno += 1 + depth = lines[lineno].count("{") - lines[lineno].count("}") + # 2. search for ending line, i.e. matching closing brace '}' + while depth > 0 and lineno < len(lines) - 1: + lineno += 1 + depth += lines[lineno].count("{") - lines[lineno].count("}") + if depth == 0: + # found a matching brace + return lineno + 1 + else: + # nothing found + return -1 + + # convert eol markers the Python style + src = src.replace("\r\n", "\n").replace("\r", "\n") + srcLines = src.splitlines() + + dictionary = {} + + classstack = [] # stack of (class, indent) pairs + indent = 0 + + lineno, last_lineno_pos = 1, 0 + i = 0 + while True: + m = _getnext(src, i) + if not m: + break + start, i = m.span() + + if m.start("Method") >= 0: + # found a method definition or function + thisindent = indent + meth_name = m.group("MethodName") + meth_sig = m.group("MethodSignature") + meth_sig = meth_sig and meth_sig.replace("\\\n", "") or "" + meth_sig = _commentsub("", meth_sig) + meth_sig = _normalize(" ", meth_sig) + meth_return = m.group("MethodReturn") + meth_return = meth_return and meth_return.replace("\\\n", "") or "" + meth_return = _commentsub("", meth_return) + meth_return = _normalize(" ", meth_return) + lineno += src.count("\n", last_lineno_pos, start) + last_lineno_pos = start + # close all interfaces/modules indented at least as much + while classstack and classstack[-1][1] >= thisindent: + del classstack[-1] + if classstack: + # it's an interface/module method + cur_class = classstack[-1][0] + if isinstance(cur_class, Service): + # it's a method + f = ServiceMethod(meth_name, file, lineno, meth_sig, meth_return) + cur_class._addmethod(meth_name, f) + # else it's a nested def + else: + f = None + else: + # the file is incorrect, ignore the entry + continue + if f: + endline = calculateEndline(lineno, srcLines) + f.setEndLine(endline) + classstack.append((f, thisindent)) # Marker for nested fns + + elif m.start("String") >= 0 or m.start("Comment") >= 0: + pass + + elif m.start("Message") >= 0: + # we found a message definition + thisindent = indent + indent += 1 + lineno += src.count("\n", last_lineno_pos, start) + last_lineno_pos = start + message_name = m.group("MessageName") + # close all messages/services indented at least as much + while classstack and classstack[-1][1] >= thisindent: + del classstack[-1] + # remember this message + cur_class = Message(module, message_name, file, lineno) + endline = calculateEndline(lineno, srcLines) + cur_class.setEndLine(endline) + if not classstack: + dictionary[message_name] = cur_class + else: + msg = classstack[-1][0] + msg._addclass(message_name, cur_class) + classstack.append((cur_class, thisindent)) + + elif m.start("Enum") >= 0: + # we found a message definition + thisindent = indent + indent += 1 + # close all messages/services indented at least as much + while classstack and classstack[-1][1] >= thisindent: + del classstack[-1] + lineno += src.count("\n", last_lineno_pos, start) + last_lineno_pos = start + enum_name = m.group("EnumName") + # remember this Enum + cur_class = Enum(module, enum_name, file, lineno) + endline = calculateEndline(lineno, srcLines) + cur_class.setEndLine(endline) + if not classstack: + dictionary[enum_name] = cur_class + else: + enum = classstack[-1][0] + enum._addclass(enum_name, cur_class) + classstack.append((cur_class, thisindent)) + + elif m.start("Service") >= 0: + # we found a message definition + thisindent = indent + indent += 1 + # close all messages/services indented at least as much + while classstack and classstack[-1][1] >= thisindent: + del classstack[-1] + lineno += src.count("\n", last_lineno_pos, start) + last_lineno_pos = start + service_name = m.group("ServiceName") + # remember this Service + cur_class = Service(module, service_name, file, lineno) + endline = calculateEndline(lineno, srcLines) + cur_class.setEndLine(endline) + if not classstack: + dictionary[service_name] = cur_class + else: + service = classstack[-1][0] + service._addclass(service_name, cur_class) + classstack.append((cur_class, thisindent)) + + elif m.start("Begin") >= 0: + # a begin of a block we are not interested in + indent += 1 + + elif m.start("End") >= 0: + # an end of a block + indent -= 1 + + return dictionary