Wed, 30 Dec 2020 11:00:05 +0100
Updated copyright for 2021.
# -*- coding: utf-8 -*- # Copyright (c) 2017 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> # """ Parse a ProtoBuf protocol file and retrieve messages, enums, services and rpc methods. It is based on the Python class browser found in this package. """ import re import Utilities import Utilities.ClassBrowsers as ClassBrowsers from . import ClbrBaseClasses SUPPORTED_TYPES = [ClassBrowsers.PROTO_SOURCE] _getnext = re.compile( r""" (?P<String> " [^"\\\n]* (?: \\. [^"\\\n]*)* " ) | (?P<Comment> ^ [ \t]* // .*? $ | ^ [ \t]* /\* .*? \*/ ) | (?P<Message> ^ (?P<MessageIndent> [ \t]* ) message [ \t]+ (?P<MessageName> [a-zA-Z_] [a-zA-Z0-9_]* ) [ \t]* { ) | (?P<Enum> ^ (?P<EnumIndent> [ \t]* ) enum [ \t]+ (?P<EnumName> [a-zA-Z_] [a-zA-Z0-9_]* ) [ \t]* { ) | (?P<Service> ^ (?P<ServiceIndent> [ \t]* ) service [ \t]+ (?P<ServiceName> [a-zA-Z_] [a-zA-Z0-9_]* ) [ \t]* { ) | (?P<Method> ^ (?P<MethodIndent> [ \t]* ) rpc [ \t]+ (?P<MethodName> [a-zA-Z_] [a-zA-Z0-9_]* ) [ \t]* \( (?P<MethodSignature> [^)]+? ) \) [ \t]+ returns [ \t]* \( (?P<MethodReturn> [^)]+? ) \) [ \t]* ) | (?P<Begin> [ \t]* { ) | (?P<End> [ \t]* } [ \t]* ;? )""", re.VERBOSE | re.DOTALL | re.MULTILINE).search # function to replace comments _commentsub = re.compile(r"""//[^\n]*\n|//[^\n]*$""").sub # function to normalize whitespace _normalize = re.compile(r"""[ \t]{2,}""").sub _modules = {} # cache of modules we've seen class VisibilityMixin(ClbrBaseClasses.ClbrVisibilityMixinBase): """ Mixin class implementing the notion of visibility. """ def __init__(self): """ Constructor """ self.setPublic() class Message(ClbrBaseClasses.Module, VisibilityMixin): """ Class to represent a ProtoBuf Message. """ def __init__(self, module, name, file, lineno): """ Constructor @param module name of the module containing this message @type str @param name name of this message @type str @param file filename containing this message @type str @param lineno linenumber of the message definition @type int """ ClbrBaseClasses.Module.__init__(self, module, name, file, lineno) VisibilityMixin.__init__(self) class Enum(ClbrBaseClasses.Enum, VisibilityMixin): """ Class to represent a ProtoBuf Enum. """ def __init__(self, module, name, file, lineno): """ Constructor @param module name of the module containing this enum @type str @param name name of this enum @type str @param file filename containing this enum @type str @param lineno linenumber of the message enum @type int """ ClbrBaseClasses.Enum.__init__(self, module, name, file, lineno) VisibilityMixin.__init__(self) class Service(ClbrBaseClasses.Class, VisibilityMixin): """ Class to represent a ProtoBuf Service. """ def __init__(self, module, name, file, lineno): """ Constructor @param module name of the module containing this service @type str @param name name of this service @type str @param file filename containing this service @type str @param lineno linenumber of the service definition @type int """ ClbrBaseClasses.Class.__init__(self, module, name, None, file, lineno) VisibilityMixin.__init__(self) class ServiceMethod(ClbrBaseClasses.Function, VisibilityMixin): """ Class to represent a ProtoBuf Service Method. """ def __init__(self, name, file, lineno, signature, returns): """ Constructor @param name name of this service method @type str @param file filename containing this service method @type str @param lineno linenumber of the service method definition @type int @param signature parameter list of the service method @type str @param returns return type of the service method @type str """ ClbrBaseClasses.Function.__init__(self, None, name, file, lineno, signature, annotation="-> {0}".format(returns)) VisibilityMixin.__init__(self) def readmodule_ex(module, path=None): """ Read a ProtoBuf protocol file and return a dictionary of messages, enums, services and rpc methods. @param module name of the ProtoBuf protocol file @type str @param path path the file should be searched in @type list of str @return the resulting dictionary @rtype dict """ global _modules if module in _modules: # we've seen this file before... return _modules[module] # search the path for the file f = None fullpath = [] if path is None else path[:] f, file, (suff, mode, type) = ClassBrowsers.find_module(module, fullpath) if f: f.close() if type not in SUPPORTED_TYPES: # not ProtoBuf protocol source, can't do anything with this module _modules[module] = {} return {} try: src = Utilities.readEncodedFile(file)[0] except (UnicodeError, OSError): # can't do anything with this module _modules[module] = {} return {} _modules[module] = scan(src, file, module) return _modules[module] def scan(src, file, module): """ Public method to scan the given source text. @param src source text to be scanned @type str @param file file name associated with the source text @type str @param module module name associated with the source text @type str @return dictionary containing the extracted data @rtype dict """ def calculateEndline(lineno, lines): """ Function to calculate the end line. @param lineno line number to start at (one based) @type int @param lines list of source lines @type list of str @return end line (one based) @rtype int """ # convert lineno to be zero based lineno -= 1 # 1. search for opening brace '{' while lineno < len(lines) and "{" not in lines[lineno]: lineno += 1 depth = lines[lineno].count("{") - lines[lineno].count("}") # 2. search for ending line, i.e. matching closing brace '}' while depth > 0 and lineno < len(lines) - 1: lineno += 1 depth += lines[lineno].count("{") - lines[lineno].count("}") if depth == 0: # found a matching brace return lineno + 1 else: # nothing found return -1 # convert eol markers the Python style src = src.replace("\r\n", "\n").replace("\r", "\n") srcLines = src.splitlines() dictionary = {} classstack = [] # stack of (class, indent) pairs indent = 0 lineno, last_lineno_pos = 1, 0 i = 0 while True: m = _getnext(src, i) if not m: break start, i = m.span() if m.start("Method") >= 0: # found a method definition or function thisindent = indent meth_name = m.group("MethodName") meth_sig = m.group("MethodSignature") meth_sig = meth_sig and meth_sig.replace('\\\n', '') or '' meth_sig = _commentsub('', meth_sig) meth_sig = _normalize(' ', meth_sig) meth_return = m.group("MethodReturn") meth_return = meth_return and meth_return.replace('\\\n', '') or '' meth_return = _commentsub('', meth_return) meth_return = _normalize(' ', meth_return) lineno = lineno + src.count('\n', last_lineno_pos, start) last_lineno_pos = start # close all interfaces/modules indented at least as much while classstack and classstack[-1][1] >= thisindent: del classstack[-1] if classstack: # it's an interface/module method cur_class = classstack[-1][0] if isinstance(cur_class, Service): # it's a method f = ServiceMethod(meth_name, file, lineno, meth_sig, meth_return) cur_class._addmethod(meth_name, f) # else it's a nested def else: f = None else: # the file is incorrect, ignore the entry continue if f: endline = calculateEndline(lineno, srcLines) f.setEndLine(endline) classstack.append((f, thisindent)) # Marker for nested fns elif m.start("String") >= 0: pass elif m.start("Comment") >= 0: pass elif m.start("Message") >= 0: # we found a message definition thisindent = indent indent += 1 lineno = lineno + src.count('\n', last_lineno_pos, start) last_lineno_pos = start message_name = m.group("MessageName") # close all messages/services indented at least as much while classstack and classstack[-1][1] >= thisindent: del classstack[-1] # remember this message cur_class = Message(module, message_name, file, lineno) endline = calculateEndline(lineno, srcLines) cur_class.setEndLine(endline) if not classstack: dictionary[message_name] = cur_class else: msg = classstack[-1][0] msg._addclass(message_name, cur_class) classstack.append((cur_class, thisindent)) elif m.start("Enum") >= 0: # we found a message definition thisindent = indent indent += 1 # close all messages/services indented at least as much while classstack and classstack[-1][1] >= thisindent: del classstack[-1] lineno = lineno + src.count('\n', last_lineno_pos, start) last_lineno_pos = start enum_name = m.group("EnumName") # remember this Enum cur_class = Enum(module, enum_name, file, lineno) endline = calculateEndline(lineno, srcLines) cur_class.setEndLine(endline) if not classstack: dictionary[enum_name] = cur_class else: enum = classstack[-1][0] enum._addclass(enum_name, cur_class) classstack.append((cur_class, thisindent)) elif m.start("Service") >= 0: # we found a message definition thisindent = indent indent += 1 # close all messages/services indented at least as much while classstack and classstack[-1][1] >= thisindent: del classstack[-1] lineno = lineno + src.count('\n', last_lineno_pos, start) last_lineno_pos = start service_name = m.group("ServiceName") # remember this Service cur_class = Service(module, service_name, file, lineno) endline = calculateEndline(lineno, srcLines) cur_class.setEndLine(endline) if not classstack: dictionary[service_name] = cur_class else: service = classstack[-1][0] service._addclass(service_name, cur_class) classstack.append((cur_class, thisindent)) elif m.start("Begin") >= 0: # a begin of a block we are not interested in indent += 1 elif m.start("End") >= 0: # an end of a block indent -= 1 return dictionary