Source code for pyrtma.parser

"""Message definition YAML parser"""

import re
import json
import pathlib
import os
import textwrap
import ctypes
import logging
import string
import sys

# import yaml
from ruamel.yaml import YAML

from copy import copy
from hashlib import sha256
from typing import (
    List,
    Optional,
    Any,
    Union,
    Tuple,
    Dict,
    Type,
    Literal,
    cast,
    TYPE_CHECKING,
)
from dataclasses import dataclass, field, is_dataclass, asdict

if TYPE_CHECKING:
    from _typeshed import DataclassInstance

try:
    from .core_defs import MAX_MESSAGE_TYPES
except AttributeError:
    # if we deprecate and remove this constant, we can fall back on inf
    # changed from inf to maxsize to maintain int type
    MAX_MESSAGE_TYPES = sys.maxsize


[docs] @dataclass class NativeType: name: str size: int format: str
# Native C types and sizes that are supported supported_types = { "char": NativeType(name="char", size=1, format="c"), "signed char": NativeType(name="signed_char", size=1, format="b"), "unsigned char": NativeType(name="unsigned char", size=1, format="B"), "byte": NativeType(name="byte", size=1, format="B"), "int": NativeType(name="int", size=4, format="i"), "signed int": NativeType(name="signed int", size=4, format="i"), "unsigned int": NativeType(name="unsigned int", size=4, format="I"), "unsigned": NativeType(name="unsigned", size=4, format="I"), "short": NativeType(name="short", size=2, format="h"), "signed short": NativeType(name="signed short", size=2, format="h"), "unsigned short": NativeType(name="unsigned short", size=2, format="H"), "long": NativeType(name="long", size=4, format="i"), "signed long": NativeType(name="signed long", size=4, format="i"), "unsigned long": NativeType(name="unsigned long", size=4, format="I"), "long long": NativeType(name="long long", size=8, format="q"), "signed long long": NativeType(name="signed long long", size=8, format="q"), "unsigned long long": NativeType(name="unsigned long long", size=8, format="Q"), "float": NativeType(name="float", size=4, format="f"), "double": NativeType(name="double", size=8, format="d"), "uint8": NativeType(name="uint8", size=1, format="B"), "uint16": NativeType(name="uint16", size=2, format="H"), "uint32": NativeType(name="uint32", size=4, format="I"), "uint64": NativeType(name="uint64", size=8, format="Q"), "int8": NativeType(name="int8", size=1, format="b"), "int16": NativeType(name="int16", size=2, format="h"), "int32": NativeType(name="int32", size=4, format="i"), "int64": NativeType(name="int64", size=8, format="q"), }
[docs] class ParserError(Exception): """Base class for all parser exceptions""" pass
[docs] class YAMLSyntaxError(ParserError): """Raised when the parser encounters invalid YAML""" pass
[docs] class RTMASyntaxError(ParserError): """Raised when the parser encounters invalid RTMA syntax""" pass
[docs] class HostIDError(ParserError): """Raised when the a host id is already in use""" pass
[docs] class ModuleIDError(ParserError): """Raised when the a module id is already in use""" pass
[docs] class DuplicateNameError(ParserError): """Raised when the a name is already in use""" pass
[docs] class MessageIDError(ParserError): """Raised when the a message id is already in use""" pass
[docs] class CircularRefError(ParserError): """Raised when an expression contains a circular reference""" pass
[docs] class ExpressionExpansionError(ParserError): """Raised when an expression can not be expanded.""" pass
[docs] class RecurisionError(ParserError): """Raised when recursion limit is exceeded evaluating aliases and expressions.""" pass
[docs] class InvalidTypeError(ParserError): """Raised when a field contains the wrong type of data""" pass
[docs] class FileFormatError(ParserError): """Raised when the wrong file extension is referenced.""" pass
[docs] class AlignmentError(ParserError): """Raised when a struct is not 64-bit aligned""" pass
[docs] class InvalidMessageSize(ParserError): """Raised when a message is too large""" pass
[docs] @dataclass class Metadata: name: str value: Union[int, float, bool, str] src: pathlib.Path
[docs] @dataclass class CompilerOptions: name: str value: Union[int, float, bool, str] src: pathlib.Path
[docs] @dataclass class Import: file: pathlib.Path src: pathlib.Path
[docs] @dataclass class ConstantExpr: name: str expression: str expanded: Optional[str] value: Union[str, int, float] src: pathlib.Path
[docs] @dataclass class ConstantString: name: str value: str src: pathlib.Path
[docs] @dataclass class MT: name: str value: int src: pathlib.Path
[docs] @dataclass class MID: name: str value: int src: pathlib.Path
[docs] @dataclass class HID: name: str value: int src: pathlib.Path
[docs] @dataclass class TypeAlias: name: str type_name: str type_obj: Union[NativeType, "SDF"] src: pathlib.Path @property def size(self) -> int: return self.type_obj.size @property def alignment(self) -> int: if isinstance(self.type_obj, NativeType): return self.type_obj.size else: return self.type_obj.alignment @property def format(self) -> str: return self.type_obj.format
[docs] @dataclass class Field: name: str type_name: str type_obj: Union[NativeType, TypeAlias, "MDF", "SDF"] length_expression: Optional[str] = None length_expanded: Optional[str] = None length: Optional[int] = None offset: int = -1 @property def base_size(self) -> int: return self.type_obj.size @property def size(self): return self.type_obj.size * (self.length or 1) @property def alignment(self) -> int: if isinstance(self.type_obj, NativeType): return self.type_obj.size else: return self.type_obj.alignment @property def format(self) -> str: if self.length: return self.length * self.type_obj.format else: return self.type_obj.format
[docs] @dataclass class MDF: raw: str hash: str name: str type_id: int src: pathlib.Path fields: List[Field] = field(default_factory=list) alignment: int = 8 @property def signal(self) -> bool: return len(self.fields) > 0 @property def size(self) -> int: return sum([f.size for f in self.fields]) @property def format(self) -> str: s = "" for field in self.fields: s += field.format return s
[docs] @dataclass class SDF: raw: str hash: str name: str src: pathlib.Path fields: List[Field] = field(default_factory=list) alignment: int = 8 @property def size(self) -> int: return sum([f.size for f in self.fields]) @property def format(self) -> str: s = "" for field in self.fields: s += field.format return s
RTMAObject = Union[ConstantExpr, ConstantString, HID, MID, TypeAlias, MDF, SDF, Field]
[docs] class Parser: """Parser class""" _instance_count: int = -1 def __init__( self, debug: bool = False, validate_alignment: bool = True, auto_pad: bool = True, import_coredefs: bool = True, ): """Parser class Args: debug (bool, optional): Flag for debug mode. Defaults to False. """ self.included_files: List[pathlib.Path] = [] self.current_file = pathlib.Path() self.root_path = pathlib.Path() self.debug = debug Parser._instance_count += 1 # compiler options self.validate_alignment = validate_alignment self.auto_pad = auto_pad self.import_coredefs = import_coredefs self.yaml_dict: Dict[str, Dict[str, Any]] = dict( metadata={}, compiler_options={}, imports={}, constants={}, string_constants={}, host_ids={}, module_ids={}, aliases={}, struct_defs={}, message_defs={}, ) self.metadata: Dict[str, Metadata] = {} self.compiler_options: Dict[str, CompilerOptions] = {} self.imports: List[Import] = [] self.constants: Dict[str, ConstantExpr] = {} self.string_constants: Dict[str, ConstantString] = {} self.aliases: Dict[str, TypeAlias] = {} self.host_ids: Dict[str, HID] = {} self.module_ids: Dict[str, MID] = {} self.struct_defs: Dict[str, SDF] = {} self.message_ids: Dict[str, MT] = {} self.message_defs: Dict[str, MDF] = {} self.logger = logging.getLogger(f"pyrtma.parser ({self._instance_count})") self.logger.propagate = False if self.debug: self.logger.setLevel(logging.DEBUG) else: self.logger.setLevel(logging.INFO) fmt = "{levelname:<6} - {message}" formatter = logging.Formatter(fmt, style="{") console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(formatter) self.logger.addHandler(console) # Reserved field names reserved_file = pathlib.Path(__file__).parent / "reserved_field_names.json" with open(reserved_file, "r") as f: self.reserved_field_names = json.load(f) def warning(self, msg: str): self.logger.warning(msg) def clear(self): self.current_file = pathlib.Path() self.yaml_dict = dict( metadata={}, compiler_options={}, imports={}, constants={}, string_constants={}, host_ids={}, module_ids={}, aliases={}, struct_defs={}, message_defs={}, ) self.included_files = [] self.metadata = {} self.compiler_options = {} self.imports = [] self.constants = {} self.string_constants = {} self.aliases = {} self.host_ids = {} self.module_ids = {} self.struct_defs = {} self.message_ids = {} self.message_defs = {}
[docs] def check_duplicate_name( self, section: str, name: str, namespaces: Tuple[str, ...] ): """Check namespaces for conflicting names.""" for namespace in namespaces: ns = getattr(self, namespace) for o in ns.values(): if name == o.name: raise DuplicateNameError( f"Duplicate name conflict found: \n\n1: {namespace} -> {o.name} -> {o.src.absolute()}\n2: {section} -> {name} -> {self.current_file}\n" )
def expand_expression(self, name: str, expr: str) -> Tuple[str, int]: rdepth_limit = 10 n = 0 symbol_regex = r"\b(?P<symbol>[a-zA-Z_]+\w*)\b" m = re.search(symbol_regex, expr) while m: symbol = m.group() n += 1 try: c = self.constants[symbol] except KeyError: raise ExpressionExpansionError( f"Unable to expand expression {name} -> {symbol} not defined: {self.current_file}" ) if re.search(rf"\b{name}\b", c.expression) is not None: raise CircularRefError( f"Circular reference in expression: {name} -> {expr}." ) if c.value is None: raise ExpressionExpansionError( f"{c.name} has not been evaluated to a value." ) expr = re.sub(rf"\b{c.name}\b", str(c.value), expr) if n > rdepth_limit: raise RecursionError( f"Recursion limit reached expanding constant expression {name} in {self.current_file}" ) # Try to match another macro symbol m = re.search(symbol_regex, expr) # Add the expanded form and evaluated value expanded = expr # TODO: check that only numbers and operators are left in expression value = eval(expr) return expanded, value def handle_import(self, fname: str): imp = Import(pathlib.Path(fname), src=self.trim_root(self.current_file)) self.imports.append(imp) if imp.file.is_dir(): raise FileFormatError( f"Imports must be yaml files, not directories. -> {self.current_file}" ) if not imp.file.suffix.lower() in [".yaml", ".yml"]: raise FileFormatError(f"Imports must be .yaml files -> {self.current_file}") if imp.file.suffix == ".yml": self.warning(f"Please change {imp.file} to use '.yaml' extension.") input("Hit enter to continue...") self.parse_file(imp.file.absolute()) def handle_metadata(self, name: str, value: Union[int, float, bool, str]): self.check_name(name) self.check_duplicate_name( "metadata", name, namespaces=("metadata",), ) if not isinstance(value, (int, float, bool, str)): raise InvalidTypeError( f"Values in 'metadata' section must be of type int, float, bool, or string not{type(value).__name__}. {name} -> {self.current_file}" ) self.metadata[name] = Metadata( name, value, src=self.trim_root(self.current_file), ) def handle_compiler_options(self, name: str, value: Union[int, float, bool, str]): self.check_name(name) self.check_duplicate_name( "compiler_options", name, namespaces=("compiler_options",), ) self.compiler_options[name] = CompilerOptions( name, value, src=self.trim_root(self.current_file), ) def handle_expression(self, name: str, expression: Union[int, float, str]): self.check_name(name) self.check_duplicate_name( "constants", name, namespaces=( "constants", "string_constants", "aliases", "struct_defs", "message_defs", ), ) if not isinstance(expression, (int, float, str)): raise InvalidTypeError( f"Values in 'constants' section must be of type int, float, or string not{type(expression).__name__}. {name} -> {self.current_file}" ) # Expand numerical expression now if isinstance(expression, (int, float)): self.constants[name] = ConstantExpr( name, expression=str(expression), expanded=str(expression), value=expression, src=self.trim_root(self.current_file), ) elif isinstance(expression, str): expanded, value = self.expand_expression(name, expression) self.constants[name] = ConstantExpr( name, expression=expression, expanded=expanded, value=value, src=self.trim_root(self.current_file), ) def handle_string(self, name: str, value: str): self.check_name(name) self.check_duplicate_name( "string_constants", name, namespaces=( "constants", "string_constants", "aliases", "struct_defs", "message_defs", ), ) if not isinstance(value, str): raise RTMASyntaxError( f"Values in 'string_constants' section must evaluate to string type not {type(value)}. {name}: {value} -> {self.current_file}" ) self.string_constants[name] = ConstantString( name, value=f'"{value}"', src=self.trim_root(self.current_file) )
[docs] def handle_alias(self, alias: str, ftype: str): """Find the base type ultimately represented by the typedef alias""" if not isinstance(ftype, str): raise InvalidTypeError( f"Values in 'aliases' section must be a string not {type(ftype).__name__}.\n{alias} -> {self.current_file}" ) self.check_name(alias) self.check_duplicate_name( "aliases", alias, namespaces=( "constants", "string_constants", "aliases", "struct_defs", "message_defs", ), ) n = 0 prev = ftype while n < 10: if ftype in supported_types.keys(): self.aliases[alias] = TypeAlias( alias, ftype, supported_types[ftype], self.current_file ) return # Check if ftype points back to a user defined struct for sdf in self.struct_defs.values(): if ftype == sdf.name: ftype = sdf.name self.aliases[alias] = TypeAlias( alias, ftype, sdf, src=self.trim_root(self.current_file) ) return # Check if ftype points back to another typedef for a in self.aliases.values(): if ftype == a.name: n += 1 ftype = a.type_name if ftype == prev: raise RTMASyntaxError(f"Unable to resolve alias {alias}: {ftype}") else: prev = ftype raise RecursionError(f"Recursion limit exceeded for alias: {alias}")
def handle_host_id(self, name: str, value: int): self.check_name(name) self.check_duplicate_name("host_ids", name, namespaces=("host_ids",)) if not isinstance(value, int): raise InvalidTypeError( f"Values in 'host_ids' section must evaluate to int type not {type(value).__name__}. {name}: {value}" ) if value < 1 or value > 32767: if self.current_file.name != "core_defs.yaml" and self.import_coredefs: raise RTMASyntaxError( f"Value outside of valid range [1 - 32767] for host_id: {name}: {value}" ) for hid in self.host_ids.values(): if value == hid.value: raise HostIDError( f"Duplicate host id conflict found for {name} and {hid.name} -> {value}\n1: {hid.src.absolute()}\n2: {self.current_file}\n" ) self.host_ids[name] = HID( name, int(value), src=self.trim_root(self.current_file) ) def handle_module_id(self, name: str, value: int): self.check_name(name) self.check_duplicate_name("module_ids", name, namespaces=("module_ids",)) if not isinstance(value, int): raise InvalidTypeError( f"Values in 'module_ids' section must evaluate to int type not {type(value).__name__}. {name}: {value} -> {self.current_file}" ) if value < 10 or (99 < value < 200): if ( self.current_file.name != "core_defs.yaml" and value != 0 and self.import_coredefs ): raise RTMASyntaxError( f"Value outside of valid range [10 - 99 or > 200] for module_id: {name}: {value} -> {self.current_file}" ) for mid in self.module_ids.values(): if value == mid.value: raise ModuleIDError( f"Duplicate module id conflict found for {name} and {mid.name} -> {value}\n1: {mid.src.absolute()}\n2: {self.current_file}\n" ) self.module_ids[name] = MID( name, int(value), src=self.trim_root(self.current_file) )
[docs] def check_alignment(self, s: Union[SDF, MDF]): """Confirm 64 bit alignment of structures""" PADDING_BYTE_TYPE = "byte" # This value will represent the memory address currently pointed to in the struct layout ptr = 0 npad = 0 # padding field count n = 0 # field index pad_len: Union[int, Literal[""]] # Loop over all fields in struct while n < len(s.fields): # Get the next field object field = s.fields[n] # Check if current address meets alignment requirement of the field if (ptr % field.alignment) == 0: # Store the fields offset in struct field.offset = ptr # Stride by an amount equal to the field size ptr += field.size # Go on to the next field (No padding required) n += 1 continue # Bail if auto padding is disabled if not self.auto_pad: raise AlignmentError( f"{s.name}.{field.name} does not start on a valid memory boundary for type: {field.type_name}. Add padding fields prior for 64-bit alignment." ) # Calculate number of bytes needed to get to next alignment boundary pad_len = field.alignment - (ptr % field.alignment) # Create the required padding field padding = Field( name=f"padding_{npad}_", type_name=PADDING_BYTE_TYPE, type_obj=supported_types[PADDING_BYTE_TYPE], length_expression=f'"{pad_len}"', length_expanded=f'"{pad_len}"', length=pad_len, offset=ptr, ) # Insert the padding into MDF or SDF object before the current field s.fields.insert(n, padding) # Stride by an amount equal to the field size ptr += padding.size # Ideally the user should explicitly pad their struct layout self.warning( f"Adding {pad_len} padding byte(s) before {s.name}.{field.name}." ) # Store the fields offset in struct field.offset = ptr # Stride by an amount equal to the field size ptr += field.size # Skip 2 since len of fields has been extended n += 2 npad += 1 # Note: ptr currently points to the byte immediately following the struct # Explicitly align the end of the struct to allow for the strictest alignment to be satisfied # Must account for C packing of array of structs and striding using sizeof # We do this by calculating the padding required for consecutive structs pad_len = 0 # Start with no padding while 1: # Check if any fields do not meet their required alignment if any([(pad_len + ptr + f.offset) % f.alignment for f in s.fields]): # Increase the padding and try again pad_len += 1 continue # No padding required if pad_len == 0: break # Bail if auto padding is disabled if not self.auto_pad: raise AlignmentError( f"{s.name} requires trailing padding of {pad_len} bytes for 64-bit alignment." ) # Don't make an array type if we only need one byte padding if pad_len == 1: pad_len = "" # Create the required padding field padding = Field( name=f"padding_{npad}_", type_name=PADDING_BYTE_TYPE, type_obj=supported_types[PADDING_BYTE_TYPE], length_expression=f'"{pad_len}"', length_expanded=f'"{pad_len}"', length=pad_len or None, ) # Stride by an amount equal to the field size ptr += padding.size # Append the padding at the end of the MDF or SDF object s.fields.append(padding) # Ideally the user should explicitly pad their struct layout self.warning( f"WARNING: Adding {pad_len} trailing padding byte(s) at end of {s.name}." ) # Exit loop break # Determine the alignment requirement for the struct strictest_alignment = max(f.alignment for f in s.fields) for a in (8, 4, 2, 1): # Find where the current ptr aligns if (ptr % a) == 0: s.alignment = min(a, strictest_alignment) break else: # only runs if we don't break # Note: This should never happen raise RuntimeError(f"Unable to determine alignment required for {s.name}") # Final size check using Python's builtin ctypes module assert s.size == self.get_ctype_size( s ), f"{s.name} is not 64-bit aligned. s.size = {s.size}, struct size = {self.get_ctype_size(s)}."
def get_ctype_cls(self, s: Union[MDF, SDF]) -> Type[ctypes.Structure]: # Field type name to ctypes type_map = { "char": ctypes.c_char, "unsigned char": ctypes.c_ubyte, "byte": ctypes.c_ubyte, "int": ctypes.c_int32, "signed int": ctypes.c_int32, "unsigned int": ctypes.c_uint32, "unsigned": ctypes.c_uint32, "short": ctypes.c_int16, "signed short": ctypes.c_int16, "unsigned short": ctypes.c_uint16, "long": ctypes.c_int32, "signed long": ctypes.c_int32, "unsigned long": ctypes.c_uint32, "long long": ctypes.c_int64, "signed long long": ctypes.c_int64, "unsigned long long": ctypes.c_uint64, "float": ctypes.c_float, "double": ctypes.c_double, "uint8": ctypes.c_uint8, "uint16": ctypes.c_uint16, "uint32": ctypes.c_uint32, "uint64": ctypes.c_uint64, "int8": ctypes.c_int8, "int16": ctypes.c_int16, "int32": ctypes.c_int32, "int64": ctypes.c_int64, } f = [] cobj: Any for n, field in enumerate(s.fields): if isinstance(field.type_obj, (MDF, SDF)): cobj = self.get_ctype_cls(field.type_obj) elif isinstance(field.type_obj, NativeType): cobj = type_map[field.type_obj.name] elif isinstance(field.type_obj, TypeAlias): if isinstance(field.type_obj.type_obj, NativeType): cobj = type_map[field.type_obj.type_obj.name] else: cobj = self.get_ctype_size(field.type_obj.type_obj) else: raise RuntimeError(f"Unexpected type {type(field.type_obj)}") if field.length: f.append((f"f{n}", cobj * (field.length))) else: f.append((f"f{n}", cobj)) return type(s.name, (ctypes.Structure,), {"_fields_": f}) def get_ctype_size(self, s: Union[MDF, SDF]) -> int: return ctypes.sizeof(self.get_ctype_cls(s)) def validate_msg_id(self, name: str, msg_id: int): if not isinstance(msg_id, int): raise InvalidTypeError( f"Message definition id must evaluate to int type not {type(msg_id).__name__}. {name}: {msg_id}" ) if msg_id < 0 or msg_id > MAX_MESSAGE_TYPES: raise RTMASyntaxError( f"Value outside of valid range [0 - {MAX_MESSAGE_TYPES}] for module_id: {name}: {msg_id}" ) for mt in self.message_ids.values(): if msg_id == mt.value: raise MessageIDError( f"Duplicate message ids conflict found for {mt.name} and {name}: {msg_id} ->\n1: {mt.src.absolute()}\n2: {self.current_file}\n" ) def validate_msg_def(self, mdf): # Check number of fields > 0 assert ( len(mdf.fields) > 0 ), f"Message and Struct definitions must have at least one field: {mdf.name} -> {self.current_file}" # Check memory alignment layout if self.validate_alignment: self.check_alignment(mdf) # Check size if mdf.size > 65535: raise InvalidMessageSize( f"{mdf.name} size of {mdf.size} exceeds maximum allowe of 65535 bytes." ) def add_fields(self, mdf: Union[SDF, MDF], fields: Union[Dict, str]): # Pattern to parse field specs FIELD_REGEX = r"\s*(?P<ftype>[\s\w]*)(\[(?P<len_str>.*)\])?" # Copy fields from another definition if isinstance(fields, str): type_name = fields df = self.message_defs.get(type_name) or self.struct_defs.get(type_name) if df is None: raise RTMASyntaxError( f"Unable to find definition for {type_name} in {mdf.name} -> {self.current_file}" ) for field in df.fields: mdf.fields.append(copy(field)) elif isinstance(fields, dict): # Parse field specs into Field objects for fname, fstr in fields.items(): for language, reserved_fields in self.reserved_field_names.items(): if fname in reserved_fields: raise RTMASyntaxError( f"{fname} is a reserved field name for {language} use." ) if fname[0] not in string.ascii_letters: raise RTMASyntaxError( f"Field names must start with a letter: {mdf.name}=> {fname} -> {self.current_file}" ) if not isinstance(fstr, str): raise InvalidTypeError( f"Field types must be a string not {type(fstr).__name__}: {mdf.name}=> {fname}: {fstr} -> {self.current_file}" ) m = re.match(FIELD_REGEX, fstr) if m is None: raise RTMASyntaxError( f"Invalid syntax for field type specification: {mdf.name}=> {fname}: {fstr} -> {self.current_file}" ) ftype = m.groupdict()["ftype"].strip() len_str = (m.groupdict()["len_str"] or "").strip() if ftype is None: raise RTMASyntaxError( f"Invalid syntax for field type specification: {mdf.name}=> {fname}: {fstr} -> {self.current_file}" ) if len_str == "" and "[" in fstr: raise RTMASyntaxError( f"Invalid syntax for array field length: {mdf.name}=> {fname}: {fstr} -> {self.current_file}" ) # Check for a valid type ftype_obj: Union[NativeType, TypeAlias, MDF, SDF] if ftype in supported_types.keys(): ftype_obj = supported_types[ftype] elif ftype in self.aliases.keys(): ftype_obj = self.aliases[ftype] elif ftype in self.struct_defs.keys(): ftype_obj = self.struct_defs[ftype] elif ftype in self.message_defs.keys(): ftype_obj = self.message_defs[ftype] else: raise RTMASyntaxError( f"Unknown type specified ({ftype}): {mdf.name}=> {fname}: {fstr} -> {self.current_file}" ) # Check for invalid signal def usage if ftype in self.message_defs.keys(): assert ( len(self.message_defs[ftype].fields) != 0 ), f"Signal definitions can not be used as field types: {mdf.name}=> {fname}:{fstr} -> {self.current_file}" # Expand the length string if needed if len_str: expanded, flen = self.expand_expression( f"{mdf.name}->{fname}", len_str ) new_field = Field( name=fname, type_name=ftype, type_obj=ftype_obj, length_expression=len_str, length_expanded=expanded, length=int(flen), ) else: new_field = Field(name=fname, type_name=ftype, type_obj=ftype_obj) mdf.fields.append(new_field) else: raise InvalidTypeError( f"Invalid object for field spec of {type(mdf).__name__}. {mdf.name} -> {self.current_file}" ) self.validate_msg_def(mdf) def handle_signal(self, name: str, mdf: Dict[str, Any]): # Create a string representation of the defintion to hash raw = f"{name}:\n id: {mdf['id']}\n fields: null" raw = textwrap.dedent(raw) hash = sha256(raw.encode()).hexdigest() # Check and validate message id msg_id = mdf["id"] self.validate_msg_id(name, msg_id) self.message_ids[name] = MT(name, msg_id, src=self.trim_root(self.current_file)) # Create the MDF data class obj = MDF( raw, hash, name, type_id=mdf["id"], src=self.trim_root(self.current_file) ) # Store the new definition self.message_defs[name] = obj return def handle_struct(self, name: str, sdf: Dict[str, Any]): # Check for valid name self.check_name(name) self.check_duplicate_name( "struct_defs", name, namespaces=( "constants", "string_constants", "aliases", "struct_defs", "message_defs", ), ) if not isinstance(sdf, dict): raise InvalidTypeError( f"Invalid object type for struct def {name} of {type(sdf).__name__} -> {self.current_file}" ) # Check for correct section headers valid_sections = ("fields",) for section in sdf.keys(): if section not in valid_sections: raise RTMASyntaxError( f"Invalid top-level section '{section}' in message or struct definition of {name} -> {self.current_file}." ) # Create a string representation of the defintion to hash f: Union[str, List[str]] if isinstance(sdf["fields"], str): f = f" fields: {sdf['fields']}" else: f = [f" {fname}: {ftype}" for fname, ftype in sdf["fields"].items()] f = "\n".join(f) raw = f"{name}:\n fields:\n{f}" raw = textwrap.dedent(raw) hash = sha256(raw.encode()).hexdigest() # Create the SDF data class obj = SDF(raw, hash, name, src=self.trim_root(self.current_file)) # Parse and the fields of the definition self.add_fields(obj, sdf["fields"]) # Store the new defintion self.struct_defs[name] = obj
[docs] def check_name(self, name: str): """Check that names start with a letter.""" if name == "_RESERVED_": return if not name.startswith(tuple(c for c in string.ascii_letters)): raise RTMASyntaxError( f"Invalid name {name} in {self.current_file}. Names can only start with letters" )
def handle_message_def(self, name: str, mdf: Dict[str, Any]): # Check for valid name self.check_name(name) self.check_duplicate_name( "message_defs", name, namespaces=( "constants", "string_constants", "aliases", "struct_defs", "message_defs", ), ) if not isinstance(mdf, dict): raise InvalidTypeError( f"Invalid object type for message def {name} of {type(mdf).__name__} -> {self.current_file}" ) if name == "_RESERVED_": self.handle_reserve(name, mdf) return # Check for correct section headers valid_sections = ("id", "fields") for section in mdf.keys(): if section not in valid_sections: raise RTMASyntaxError( f"Invalid top-level section '{section}' in message or struct definition of {name} -> {self.current_file}." ) # Check for a signal def if mdf["fields"] is None: self.handle_signal(name, mdf) return # Create a string representation of the defintion to hash f: Union[str, List[str]] if isinstance(mdf["fields"], str): f = f" fields: {mdf['fields']}" else: f = [f" {fname}: {ftype}" for fname, ftype in mdf["fields"].items()] f = "\n".join(f) raw = f"{name}:\n id: {mdf['id']}\n fields:\n{f}" raw = textwrap.dedent(raw) hash = sha256(raw.encode()).hexdigest() # Create the MDF data class obj = MDF( raw, hash, name, type_id=mdf["id"], src=self.trim_root(self.current_file) ) # Check and validate message id msg_id = mdf["id"] self.validate_msg_id(name, msg_id) self.message_ids[name] = MT(name, msg_id, src=self.trim_root(self.current_file)) # Parse and the fields of the definition self.add_fields(obj, mdf["fields"]) # Store the new defintion self.message_defs[name] = obj def handle_reserve(self, name: str, mdf: Dict[str, List]): for section in mdf.keys(): if section not in ("id",): raise RTMASyntaxError( f"Invalid top-level section '{section}'. Only 'id is allowed in {name} -> {self.current_file}" ) if not isinstance(mdf["id"], list): raise InvalidTypeError("_RESERVED_.id must be a list.") reserved = [] for e in mdf["id"]: if isinstance(e, int): reserved.append(e) elif isinstance(e, str): m = re.search(r"\s*(?P<start>[0-9]+)\s*(\-|to)\s*(?P<end>[0-9]+)\s*", e) if m is None: raise RTMASyntaxError(f"_RESERVED_.id has invalid entry: {e}") start = int(m.groupdict()["start"]) end = int(m.groupdict()["end"]) if start > end: raise RTMASyntaxError(f"_RESERVED_.id has an invalid range: {e}") if ((end + 1) - start) > 100: raise RTMASyntaxError( f"_RESERVED_.id has an spans too large a range (100 max): {e}" ) reserved.extend(list(range(start, end + 1))) else: raise RTMASyntaxError(f"_RESERVED_.id has an invalid entry: {e}") # Generate reserved defintion placeholders for id in reserved: name = f"RESERVED_{id:06d}" self.handle_signal(name, dict(id=id, fields=None)) def parse_options_text(self, text: str): # parse compiler options from root file self.check_key_value_separation(text) # Parse the yaml file try: yaml = YAML(typ="safe", pure=True) # typ="unsafe", pure=True) data = yaml.load(text) except Exception as e: raise YAMLSyntaxError( f"Error encountered by YAML parser in {self.current_file}" ) from e if data.get("compiler_options") is not None: for name, value in data["compiler_options"].items(): self.handle_compiler_options(name, value) self.yaml_dict["compiler_options"].update(data["compiler_options"]) def parse_text(self, text: str): self.check_key_value_separation(text) # Parse the yaml file try: yaml = YAML(typ="safe") # typ="unsafe", pure=True) data = yaml.load(text) except Exception as e: raise YAMLSyntaxError( f"Error encountered by YAML parser in {self.current_file}" ) from e valid_sections = ( "metadata", "compiler_options", "imports", "constants", "string_constants", "aliases", "host_ids", "module_ids", "struct_defs", "message_defs", ) # Check file format for section in data.keys(): if section not in valid_sections: raise RTMASyntaxError( f"Invalid top-level section '{section}' in message defs file -> {self.current_file}." ) if data.get("metadata") is not None: for name, value in data["metadata"].items(): self.handle_metadata(name, value) self.yaml_dict["metadata"].update(data["metadata"]) if data.get("imports") is not None: if isinstance(data["imports"], list): for imp in data["imports"]: self.handle_import(imp) else: raise RTMASyntaxError( f"Imports must be a list of paths in message defs file -> {self.current_file}." ) if data.get("constants") is not None: for name, expr in data["constants"].items(): self.handle_expression(name, expr) self.yaml_dict["constants"].update(data["constants"]) if data.get("string_constants") is not None: for name, str_const in data["string_constants"].items(): self.handle_string(name, str_const) self.yaml_dict["string_constants"].update(data["string_constants"]) if data.get("aliases") is not None: for alias, ftype in data["aliases"].items(): self.handle_alias(alias, ftype) self.yaml_dict["aliases"].update(data["aliases"]) if data.get("host_ids") is not None: for name, value in data["host_ids"].items(): self.handle_host_id(name, value) self.yaml_dict["host_ids"].update(data["host_ids"]) if data.get("module_ids") is not None: for name, value in data["module_ids"].items(): self.handle_module_id(name, value) self.yaml_dict["module_ids"].update(data["module_ids"]) if data.get("struct_defs") is not None: for name, sdf in data["struct_defs"].items(): self.handle_struct(name, sdf) self.yaml_dict["struct_defs"].update(data["struct_defs"]) if data.get("message_defs") is not None: for name, mdf in data["message_defs"].items(): self.handle_message_def(name, mdf) self.yaml_dict["message_defs"].update(data["message_defs"]) def check_key_value_separation(self, text: str): for n, line in enumerate(text.splitlines(), start=1): # strip comments if "#" in line: idx = line.index("#") line = line[:idx] if ":" in line: idx = line.index(":") # check first : if idx + 1 < len(line) and not line[idx + 1].isspace(): raise RTMASyntaxError( f"Key-Value pairs must be separated with a ':' followed by a space. Add a space after the colon.\n{self.current_file}: line {n}\n{line}" ) def parse_compiler_options( self, msgdefs_file: os.PathLike ) -> Dict[str, CompilerOptions]: # Get the current pwd cwd = pathlib.Path.cwd() try: # first parse compiler options defs_path = pathlib.Path(msgdefs_file) self.root_path = defs_path.parent.resolve() self.parse_options(defs_path) except Exception as e: self.clear() raise finally: os.chdir(str(cwd.absolute())) return self.compiler_options def parse(self, msgdefs_file: os.PathLike): # Get the current pwd cwd = pathlib.Path.cwd() try: if self.import_coredefs: # Parse the core_defs.yaml file first pkg_dir = pathlib.Path(os.path.realpath(__file__)).parent core_defs = pkg_dir / "core_defs/core_defs.yaml" self.root_path = pkg_dir self.parse_file(core_defs.absolute()) defs_path = pathlib.Path(msgdefs_file) self.root_path = defs_path.parent.resolve() self.parse_file(defs_path) except Exception as e: self.clear() raise finally: os.chdir(str(cwd.absolute())) def parse_file(self, msgdefs_file: pathlib.Path): # Change the working directory cwd = pathlib.Path.cwd() msgdefs_path = (cwd / msgdefs_file).resolve() os.chdir(str(msgdefs_path.parent.absolute())) # check previously included files if any((msgdefs_path == f for f in self.included_files)): self.logger.debug( f"Skipping -> {msgdefs_path.absolute()} already parsed..." ) os.chdir(str(cwd.absolute())) return self.logger.info(f"Parsing -> {msgdefs_path.absolute()}") prev_file = self.current_file self.current_file = msgdefs_path.absolute() self.included_files.append(msgdefs_path) try: with open(msgdefs_path, "rt") as f: text = f.read() except FileNotFoundError as e: alt_ext = ".yaml" if msgdefs_path.suffix == ".yml" else ".yml" alt_path = msgdefs_path.parent / (msgdefs_path.stem + alt_ext) if alt_path.is_file() and alt_path.exists(): detail = f"\n\nCheck file extension -> Did you mean {alt_path}?" else: detail = "" e.args = tuple([*e.args, msgdefs_path.absolute(), detail]) raise e self.parse_text(text) self.current_file = prev_file os.chdir(str(cwd.absolute())) def parse_options(self, msgdefs_file: pathlib.Path): # Change the working directory cwd = pathlib.Path.cwd() msgdefs_path = (cwd / msgdefs_file).resolve() os.chdir(str(msgdefs_path.parent.absolute())) self.logger.info(f"Parsing Compiler Options-> {msgdefs_path.absolute()}") prev_file = self.current_file self.current_file = msgdefs_path.absolute() try: with open(msgdefs_path, "rt") as f: text = f.read() except FileNotFoundError as e: alt_ext = ".yaml" if msgdefs_path.suffix == ".yml" else ".yml" alt_path = msgdefs_path.parent / (msgdefs_path.stem + alt_ext) if alt_path.is_file() and alt_path.exists(): detail = f"\n\nCheck file extension -> Did you mean {alt_path}?" else: detail = "" e.args = tuple([*e.args, msgdefs_path.absolute(), detail]) raise e self.parse_options_text(text) self.current_file = prev_file os.chdir(str(cwd.absolute())) def trim_root(self, p: pathlib.Path) -> pathlib.Path: return pathlib.Path(os.path.relpath(p, self.root_path)) def to_json(self): d = dict( imports=self.imports, constants=self.constants, string_constants=self.string_constants, host_ids=self.host_ids, module_ids=self.module_ids, aliases=self.aliases, struct_defs=self.struct_defs, message_defs=self.message_defs, ) return json.dumps(d, indent=2, cls=CustomEncoder)
[docs] class CustomEncoder(json.JSONEncoder):
[docs] def default(self, o: Any) -> Any: if is_dataclass(o): # this is true for instance OR type if TYPE_CHECKING: o = cast(DataclassInstance, o) if not isinstance(o, type): return asdict(o) if isinstance(o, pathlib.Path): return str(o.absolute()) return super().default(o)