Source code for pyrtma.validators

from __future__ import annotations

import ctypes
import collections.abc as abc
import math

from typing import (
    List,
    ClassVar,
    Type,
    Generic,
    TypeVar,
    Union,
    Iterable,
    Optional,
    overload,
    Type,
    Sequence,
    Iterator,
)
from abc import abstractmethod, ABCMeta
from contextlib import contextmanager
from contextvars import ContextVar


from .message_base import MessageBase

__all__ = [
    "Int8",
    "Int16",
    "Int32",
    "Int64",
    "Byte",
    "Uint8",
    "Uint16",
    "Uint32",
    "Uint64",
    "Float",
    "Double",
    "Char",
    "Struct",
    "String",
    "ByteArray",
    "IntArray",
    "FloatArray",
    "StructArray",
    "disable_message_validation",
]

_VALIDATION_ENABLED: ContextVar[bool] = ContextVar("_VALIDATION_ENABLED", default=True)


[docs] @contextmanager def disable_message_validation(ignore=False): """Context manager function to temporarily disable message field validation Use with ``with`` keyword: Optionally pass in ignore=True to do nothing, e.g. for debugging: .. code-block:: python DEBUG = True with disable_message_validation(ignore=DEBUG): ... # disable validation unless DEBUG is True """ if not ignore: token = _VALIDATION_ENABLED.set(False) yield _VALIDATION_ENABLED.reset(token) else: yield # dummy context
_P = TypeVar("_P") # Parent _V = TypeVar("_V") # Value _C = TypeVar("_C") # CType
[docs] class FieldValidator(Generic[_P, _V], metaclass=ABCMeta): """Abstract base class for all message field validator descriptors""" def __init__(self) -> None: self._ctype: Type[ctypes._CDataType] def __set_name__(self, owner: _P, name: str): self._owner = owner self._public_name = name self._private_name = "_" + name @abstractmethod def __get__(self, obj: _P, objtype=None): ... @abstractmethod def __set__(self, obj: _P, value: _V): ... @abstractmethod def validate_one(self, value: _V): ... @abstractmethod def validate_many(self, value: Iterable[_V]): ...
[docs] class FloatValidatorBase(FieldValidator[_P, float], Generic[_P, _C], metaclass=ABCMeta): """Abstract base class for float type validators""" @abstractmethod def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_float def __get__(self, obj: _P, objtype=None) -> float: return getattr(obj, self._private_name) def __set__(self, obj: _P, value: Union[float, int, _C]): if _VALIDATION_ENABLED.get(): self.validate_one(value) setattr(obj, self._private_name, value)
[docs] def validate_one(self, value: Union[float, int, _C]): """Validate a float value Args: value (float): Float value Raises: TypeError: Wrong type ValueError: Value cannot be precisely represented with this datatype """ if isinstance(value, self._ctype): return if not isinstance(value, (float, int)): raise TypeError(f"Expected {value} to be a float") if math.isinf(self._ctype(value).value): raise ValueError( f"The {value} can not be represented as a {type(self).__name__}" )
[docs] def validate_many(self, value: Iterable[Union[float, int]]): """Validate multiple float values Args: value (Iterable[float]): Iterable of floats to validate Raises: TypeError: Wrong type ValueError: Value cannot be precisely represented with this datatype """ # Note: This may not be worth it since this is a rare overflow case. try: if math.isinf(self._ctype(max(value)).value) or math.isinf( self._ctype(min(value)).value ): raise ValueError( f"{value} contains value(s) that can not be represented as a {type(self).__name__}" ) except TypeError: raise TypeError(f"Expected {value!r} to contain all float types.")
def __repr__(self): return f"{type(self).__name__} at 0x{id(self):016X}"
[docs] class Float(FloatValidatorBase[_P, ctypes.c_float], Generic[_P]): """32-bit Float validator class""" def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_float
[docs] class Double(FloatValidatorBase[_P, ctypes.c_double], Generic[_P]): """Double (64-bit float) validator class""" def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_double
# Base Class for int validator fields
[docs] class IntValidatorBase(FieldValidator[_P, int], Generic[_P, _C], metaclass=ABCMeta): """Abstract base class for integer type validators""" _size: ClassVar[int] = 1 _unsigned: ClassVar[bool] = False _min: ClassVar[int] = 0 _max: ClassVar[int] = 2**8 - 1 @abstractmethod def __init__(self, *args): ... @property def size(self) -> int: return self._size @property def unsigned(self) -> bool: return self._unsigned @property def min(self) -> int: return self._min @property def max(self) -> int: return self._max def __get__(self, obj: _P, objtype=None) -> int: return getattr(obj, self._private_name) def __set__(self, obj: _P, value: Union[int, _C]): if _VALIDATION_ENABLED.get(): self.validate_one(value) setattr(obj, self._private_name, value)
[docs] def validate_one(self, value: Union[int, _C]): """Validate an integer value Args: value (int): Integer to validate Raises: TypeError: Wrong type ValueError: Integer out of range for this datatype """ if isinstance(value, self._ctype): return if not isinstance(value, int): raise TypeError(f"Expected {value} to be an int") if not (self._min <= int(value) <= self._max): raise ValueError( f"Expected {value} to be in range of {self._min} to {self._max}" )
[docs] def validate_many(self, value: Iterable[int]): """Validate multiple integer values Args: value (Iterable[int]): Iterable of integers to validate Raises: TypeError: Wrong type ValueError: Integer out of range for this datatype """ # Check all values if any(not isinstance(v, int) for v in value): raise TypeError(f"Expected {value} to contain all int types.") if (int(max(value)) > self._max) or (min(value) < self._min): raise ValueError( f"Expected {value} to be in range of {self._min} to {self._max}." )
def __repr__(self): return f"{type(self).__name__}(size={self.size}, unsigned={self.unsigned}) at 0x{id(self):016X}"
[docs] class Int8(IntValidatorBase[_P, ctypes.c_int8], Generic[_P]): """Validator for 8-bit integers""" _size: ClassVar[int] = 1 _unsigned: ClassVar[bool] = False _min: ClassVar[int] = -(2**7) _max: ClassVar[int] = 2**7 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_int8
[docs] class Int16(IntValidatorBase[_P, ctypes.c_int16], Generic[_P]): """Validator for 16-bit integers""" _size: ClassVar[int] = 2 _unsigned: ClassVar[bool] = False _min: ClassVar[int] = -(2**15) _max: ClassVar[int] = 2**15 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_int16
[docs] class Int32(IntValidatorBase[_P, ctypes.c_int32], Generic[_P]): """Validator for 32-bit integers""" _size: ClassVar[int] = 4 _unsigned: ClassVar[bool] = False _min: ClassVar[int] = -(2**31) _max: ClassVar[int] = 2**31 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_int32
[docs] class Int64(IntValidatorBase[_P, ctypes.c_int64], Generic[_P]): """Validator for 64-bit integers""" _size: ClassVar[int] = 8 _unsigned: ClassVar[bool] = False _min: ClassVar[int] = -(2**63) _max: ClassVar[int] = 2**63 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_int64
[docs] class Uint8(IntValidatorBase[_P, ctypes.c_uint8], Generic[_P]): """Validator for unsigned 8-bit integers""" _size: ClassVar[int] = 1 _unsigned: ClassVar[bool] = True _min: ClassVar[int] = 0 _max: ClassVar[int] = 2**8 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_uint8
[docs] class Uint16(IntValidatorBase[_P, ctypes.c_uint16], Generic[_P]): """Validator for unsigned 16-bit integers""" _size: ClassVar[int] = 2 _unsigned: ClassVar[bool] = True _min: ClassVar[int] = 0 _max: ClassVar[int] = 2**16 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_uint16
[docs] class Uint32(IntValidatorBase[_P, ctypes.c_uint32], Generic[_P]): """Validator for unsigned 32-bit integers""" _size: ClassVar[int] = 4 _unsigned: ClassVar[bool] = True _min: ClassVar[int] = 0 _max: ClassVar[int] = 2**32 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_uint32
[docs] class Uint64(IntValidatorBase[_P, ctypes.c_uint64], Generic[_P]): """Validator for unsigned 64-bit integers""" _size: ClassVar[int] = 8 _unsigned: ClassVar[bool] = True _min: ClassVar[int] = 0 _max: ClassVar[int] = 2**64 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_uint64
[docs] class Byte(FieldValidator[_P, int], Generic[_P]): """Validator for single byte values""" _size: ClassVar[int] = 1 _unsigned: ClassVar[bool] = True _min: ClassVar[int] = 0 _max: ClassVar[int] = 2**8 - 1 def __init__(self, *args) -> None: self._ctype: Type[ctypes._SimpleCData] = ctypes.c_ubyte @property def size(self) -> int: return self._size @property def unsigned(self) -> bool: return self._unsigned @property def min(self) -> int: return self._min @property def max(self) -> int: return self._max def __get__(self, obj: _P, objtype=None) -> int: return getattr(obj, self._private_name) def __set__(self, obj: _P, value: Union[int, bytes, bytearray, ctypes.c_ubyte]): if _VALIDATION_ENABLED.get(): self.validate_one(value) if isinstance(value, (bytes, bytearray)): int_value = int.from_bytes(value, "little") setattr(obj, self._private_name, int_value) return setattr(obj, self._private_name, value)
[docs] def validate_one(self, value: Union[int, bytes, bytearray, ctypes.c_ubyte]): """validate a single byte value Args: value (Union[int, bytes, bytearray]): Byte value to validate Raises: ValueError: Value out of range TypeError: Wrong type """ if isinstance(value, self._ctype): return if isinstance(value, int): if not self._min <= value <= self._max: raise ValueError( f"Expected {value} to be in range of {self._min} to {self._max}" ) return if not isinstance(value, (bytes, bytearray)): raise TypeError(f"Expected {value!r} to be bytes or int") if len(value) != 1: raise ValueError(f"Expected {value!r} to be no longer than 1")
[docs] def validate_many(self, value: Union[Iterable[int], bytes, bytearray]): """Validate multiple byte values Args: value (Union[Iterable[int], bytes, bytearray]): Byte values to validate Raises: TypeError: Wrong type ValueError: Value out of range """ if isinstance(value, (bytes, bytearray)): return # Commented this check to help performance if any(not isinstance(v, int) for v in value): raise TypeError( f"Expected {value} to be an int sequence, bytes, or bytearray." ) if (max(value) > self._max) or (min(value) < self._min): raise ValueError( f"Expected {value} to be in range of {self._min} to {self._max}." )
def __repr__(self): return f"Byte(len=1) at 0x{id(self):016X}"
[docs] class String(FieldValidator[_P, str], Generic[_P]): """Validator for strings (char arrays)""" def __init__(self, len: int): assert ( len > 1 ), "Char Arrays must have a length > 1, the last being a null terminator" self.len = len self._ctype = ctypes.c_char * len def __get__(self, obj: _P, objtype=None) -> str: return getattr(obj, self._private_name).decode("ascii") def __set__(self, obj: _P, value: str): if isinstance(value, self._ctype): setattr(obj, self._private_name, value) return if _VALIDATION_ENABLED.get(): self.validate_one(value) setattr(obj, self._private_name, value.encode("ascii"))
[docs] def validate_one(self, value: str): """Validate a string value Args: value (str): String value Raises: TypeError: Wrong type ValueError: String exceeds max length """ if not isinstance(value, str): raise TypeError(f"Expected {value} to be a str") if len(value) > (self.len - 1): raise ValueError( f'Expected "{value}" to be no longer than {self.len - 1}. Note: Last index is reserved for null terminator.' ) if not value.isascii(): raise TypeError(f"Expected {value} to only contain valid ascii points")
[docs] def validate_many(self, value): """Validate multiple strings Not implemented Raises: NotImplementedError """ raise NotImplementedError
def __repr__(self): return f"String(len={self.len}) at 0x{id(self):016X}"
_FV = TypeVar("_FV", bound=FieldValidator)
[docs] class Char(String[_P], Generic[_P]): """Validator for scalar char values""" def __init__(self) -> None: self._ctype = ctypes.c_char self.len = 1 def __set__(self, obj: _P, value: Union[str, ctypes.c_char]): if isinstance(value, ctypes.c_char): setattr(obj, self._private_name, value) else: super().__set__(obj, value)
[docs] def validate_one(self, value: Union[str, ctypes.c_char]): """Validate a char value Args: value (str): String value Raises: TypeError: Wrong type ValueError: String exceeds max length """ if isinstance(value, self._ctype): return if not isinstance(value, str): raise TypeError(f"Expected {value} to be a str") if len(value) > self.len: raise ValueError(f'Expected "{value}" to be no longer than {self.len}') if not value.isascii(): raise TypeError(f"Expected {value} to only contain valid ascii points")
def __repr__(self): return f"Char() at 0x{id(self):016X}"
[docs] class ArrayField(FieldValidator, abc.Sequence, Generic[_FV]): """Array field validator base class""" def __init__(self, validator: Type[_FV], len: int): """Array field validator base class Args: validator (Type[_FV]): Field validator class for datatype len (int): Field length """ self._validator: FieldValidator = validator() self._len = len self._bound_obj: Optional[MessageBase] = None self._ctype = self._validator._ctype * len @classmethod def _bound(cls, obj: ArrayField[_FV], bound_obj: MessageBase) -> ArrayField[_FV]: new_obj = super().__new__(cls) new_obj._bound_obj = bound_obj new_obj._validator = obj._validator new_obj._len = obj._len new_obj._ctype = obj._ctype new_obj._owner = obj._owner new_obj._public_name = obj._public_name new_obj._private_name = obj._private_name return new_obj def __get__(self, obj: MessageBase, objtype=None) -> ArrayField[_FV]: """Return an Array bound to a message obj instance.""" return ArrayField._bound(self, obj) def __set__(self, obj: MessageBase, value: Union[ArrayField[_FV], Sequence]): if isinstance(value, ArrayField): if _VALIDATION_ENABLED.get(): self.validate_array(value) setattr( obj, self._private_name, getattr(value._bound_obj, value._private_name) ) else: self.__get__(obj).__setitem__(slice(None), value) def __getitem__(self, key): if self._bound_obj is None: raise AttributeError("Array descriptor is not bound to an instance object.") return getattr(self._bound_obj, self._private_name)[key] def __iter__(self) -> Iterator: return iter(self[:]) def __setitem__(self, key, value): if self._bound_obj is None: raise AttributeError("Array descriptor is not bound to an instance object.") if _VALIDATION_ENABLED.get(): if isinstance(value, abc.Iterable) or hasattr(value, "__getitem__"): self.validate_many(value) else: self.validate_one(value) getattr(self._bound_obj, self._private_name)[key] = value def __len__(self) -> int: return self._len def __repr__(self) -> str: return f"ArrayField({type(self._validator).__name__}, len={self._len}) at 0x{id(self):016X}" def __str__(self): if self._bound_obj: return str(getattr(self._bound_obj, self._private_name)[:]) else: return self.__repr__() def __eq__(self, value) -> bool: if not isinstance(value, abc.Sequence): return False if len(value) != self._len: return False for self_val, comp_val in zip(self, value): if self_val != comp_val: return False return True
[docs] def validate_one(self, value): """Validate one value Args: value: Value to validate """ self._validator.validate_one(value)
[docs] def validate_many(self, value): """Validate multiple values Args: value: Values to validate """ self._validator.validate_many(value)
[docs] def validate_array(self, value: ArrayField): """Validate array Args: value (ArrayField): Array value to validate Raises: TypeError: Wrong type """ if not isinstance(value, type(self)): raise TypeError( f"Expected a {self.__class__.__name__}({type(self._validator).__name__}). Got {type(value).__name__}." ) if not isinstance(value._validator, type(self._validator)): raise TypeError( f"Expected an {self.__class__.__name__}({type(self._validator).__name__}, {len(self)}). Got {type(value).__name__}({type(value._validator).__name__}, {len(value)})." ) if len(value) != len(self): raise ValueError( f"Array size mismatch. Expected an {self.__class__.__name__}({type(self._validator).__name__}, {len(self)}). Got {type(value).__name__}({type(value._validator).__name__}, {len(value)})." ) if value._bound_obj is None: raise ValueError( f"The instance of {type(value).__name__}({type(value._validator).__name__}, {len(value)}) is not bound to a MessageBase object." ) return
_IV = TypeVar("_IV", bound=IntValidatorBase)
[docs] class IntArray(ArrayField[_IV], Generic[_IV]): """IntArray validator class""" def __init__(self, validator: Type[_IV], len: int): """IntArray validator class Args: validator (Type[IV]): Field validator class for Int type len (int): Field length """ self._validator = validator() self._len = len self._bound_obj: Optional[MessageBase] = None self._ctype = self._validator._ctype * len @classmethod def _bound(cls, obj: ArrayField[_IV], bound_obj: MessageBase) -> IntArray[_IV]: new_obj = super().__new__(cls) new_obj._bound_obj = bound_obj new_obj._validator = obj._validator new_obj._len = obj._len new_obj._ctype = obj._ctype new_obj._owner = obj._owner new_obj._public_name = obj._public_name new_obj._private_name = obj._private_name return new_obj def __get__(self, obj: MessageBase, objtype=None) -> IntArray[_IV]: """Return an Array bound to a message obj instance.""" return IntArray._bound(self, obj) def __set__( self, obj: MessageBase, value: Union[ArrayField[_IV], Sequence[int], ctypes.Array], ): if isinstance(value, ArrayField): if _VALIDATION_ENABLED.get(): self.validate_array(value) setattr( obj, self._private_name, getattr(value._bound_obj, value._private_name) ) else: self.__get__(obj).__setitem__(slice(None), value) @overload def __getitem__(self, key: int) -> int: ... @overload def __getitem__(self, key: slice) -> List[int]: ... def __getitem__(self, key) -> Union[int, List[int]]: if self._bound_obj is None: raise AttributeError("Array descriptor is not bound to an instance object.") return getattr(self._bound_obj, self._private_name)[key] def __iter__(self) -> Iterator[int]: return iter(self[:]) def __repr__(self) -> str: return f"IntArray({type(self._validator).__name__}, len={self._len}) at 0x{id(self):016X}"
_FPV = TypeVar("_FPV", bound=FloatValidatorBase)
[docs] class ByteArray(ArrayField[Byte]): """Validator class for Bytes arrays""" def __init__(self, len: int): """Validator class for Bytes arrays Args: len (int): Byte array length """ assert len >= 1 self._validator = Byte() self._len = len self._bound_obj: Optional[MessageBase] = None self._ctype = ctypes.c_ubyte * len @classmethod def _bound(cls, obj: ArrayField[Byte], bound_obj: MessageBase) -> ByteArray: new_obj = super().__new__(cls) new_obj._bound_obj = bound_obj new_obj._validator = obj._validator new_obj._len = obj._len new_obj._ctype = obj._ctype new_obj._owner = obj._owner new_obj._public_name = obj._public_name new_obj._private_name = obj._private_name return new_obj def __get__(self, obj: MessageBase, objtype=None) -> ByteArray: return ByteArray._bound(self, obj) def __set__( self, obj: MessageBase, value: Union[ArrayField[Byte], Sequence[int], bytes, bytearray, ctypes.Array], ): if isinstance(value, ArrayField): if _VALIDATION_ENABLED.get(): self.validate_array(value) setattr( obj, self._private_name, getattr(value._bound_obj, value._private_name) ) else: self.__get__(obj).__setitem__(slice(None), value) @overload def __getitem__(self, key: int) -> int: ... @overload def __getitem__(self, key: slice) -> bytearray: ... def __getitem__(self, key) -> bytearray | int: if self._bound_obj is None: raise AttributeError("Array descriptor is not bound to an instance object.") value = getattr(self._bound_obj, self._private_name)[key] if isinstance(value, int): return value else: return bytearray(value) def __iter__(self) -> Iterator[int]: # Generator[_S, None, None]: for i in range(self._len): yield self.__getitem__(i) def __setitem__(self, key, value): if self._bound_obj is None: raise AttributeError("Array descriptor is not bound to an instance object.") if _VALIDATION_ENABLED.get(): if isinstance(value, abc.Iterable) or hasattr(value, "__getitem__"): self.validate_many(value) else: self.validate_one(value) if isinstance(value, (bytes, bytearray)): if len(value) == 1 and isinstance(key, int): value = int.from_bytes(value, "little") getattr(self._bound_obj, self._private_name)[key] = value def __repr__(self) -> str: return f"ByteArray(len={self._len}) at 0x{id(self):016X}"
[docs] class FloatArray(ArrayField[_FPV], Generic[_FPV]): """Validator class for float arrays""" def __init__(self, validator: Type[_FPV], len: int): """Validator class for float arrays Args: validator (Type[_FPV]): Float type validator len (int): Array length """ self._validator = validator() self._len = len self._bound_obj: Optional[MessageBase] = None self._ctype = self._validator._ctype * len @classmethod def _bound(cls, obj: ArrayField[_FPV], bound_obj: MessageBase) -> FloatArray[_FPV]: new_obj = super().__new__(cls) new_obj._bound_obj = bound_obj new_obj._validator = obj._validator new_obj._len = obj._len new_obj._ctype = obj._ctype new_obj._owner = obj._owner new_obj._public_name = obj._public_name new_obj._private_name = obj._private_name return new_obj def __get__(self, obj: MessageBase, objtype=None) -> FloatArray[_FPV]: """Return an Array bound to a message obj instance.""" return FloatArray._bound(self, obj) def __set__( self, obj: MessageBase, value: Union[ArrayField[_FPV], Sequence[float], ctypes.Array], ): if isinstance(value, ArrayField): if _VALIDATION_ENABLED.get(): self.validate_array(value) setattr( obj, self._private_name, getattr(value._bound_obj, value._private_name) ) else: self.__get__(obj).__setitem__(slice(None), value) @overload def __getitem__(self, key: int) -> float: ... @overload def __getitem__(self, key: slice) -> List[float]: ... def __getitem__(self, key) -> Union[float, List[float]]: if self._bound_obj is None: raise AttributeError("Array descriptor is not bound to an instance object.") return getattr(self._bound_obj, self._private_name)[key] def __iter__(self) -> Iterator[float]: return iter(self[:]) def __repr__(self) -> str: return f"FloatArray({type(self._validator).__name__}, len={self._len}) at 0x{id(self):016X}"
_S = TypeVar("_S", bound=MessageBase)
[docs] class Struct(FieldValidator, Generic[_S]): """Validator class for Structures""" def __init__(self, _ctype: Type[_S]): self._ctype = _ctype def __get__(self, obj, objtype=None) -> _S: return getattr(obj, self._private_name) @overload def __set__(self, obj: MessageBase, value: _S): ... @overload def __set__(self, obj: StructArray, value: Struct[_S]): ... def __set__(self, obj, value): if _VALIDATION_ENABLED.get(): self.validate_one(value) # Note: ctypes already copies the data here setattr(obj, self._private_name, value)
[docs] def validate_one(self, value: _S): """Validate a structure Args: value (_S): Structure value to validate Raises: TypeError: Wrong type """ if not isinstance(value, self._ctype): raise TypeError(f"Expected {self._ctype.__name__}")
[docs] def validate_many(self, value: Iterable[_S]): """Validate multiple structures Args: value (Iterable[_S]): Iterable of structures to validate Raises: TypeError: Wrong type """ if any(not isinstance(v, self._ctype) for v in value): raise TypeError(f"Expected {value} to be an {self._ctype.__name__}.")
def __repr__(self) -> str: return f"Struct({self._ctype.__name__}) at 0x{id(self):016X}"
[docs] class StructArray(FieldValidator, abc.Sequence, Generic[_S]): """Validator for structure arrays""" def __init__(self, msg_struct: Type[_S], len: int): """Validator for structure arrays Args: msg_struct (Type[_S]): Structure class len (int): Array length """ self._validator = Struct(msg_struct) self._len = len self._bound_obj: Optional[MessageBase] = None self._ctype = self._validator._ctype * len @classmethod def _bound(cls, obj: StructArray[_S], bound_obj: MessageBase) -> StructArray[_S]: new_obj: StructArray[_S] = super().__new__(cls) new_obj._bound_obj = bound_obj new_obj._validator = obj._validator new_obj._len = obj._len new_obj._ctype = obj._ctype new_obj._owner = obj._owner new_obj._public_name = obj._public_name new_obj._private_name = obj._private_name return new_obj def __get__(self, obj: MessageBase, objtype=None) -> StructArray[_S]: """Return an StructArray bound to a message obj instance.""" return StructArray._bound(self, obj) def __set__(self, obj, value: Union[StructArray, Sequence[_S], ctypes.Array[_S]]): if isinstance(value, StructArray): if _VALIDATION_ENABLED.get(): self.validate_array(value) setattr( obj, self._private_name, getattr(value._bound_obj, value._private_name) ) else: self.__get__(obj).__setitem__(slice(None), value) @overload def __getitem__(self, key: int) -> _S: ... @overload def __getitem__(self, key: slice) -> List[_S]: ... def __getitem__(self, key) -> Union[_S, List[_S]]: if self._bound_obj is None: raise AttributeError("Array descriptor is not bound to an instance object.") return getattr(self._bound_obj, self._private_name)[key] def __iter__(self) -> Iterator[_S]: return iter(self[:]) def __setitem__(self, key, value): if self._bound_obj is None: raise AttributeError( "StructArray descriptor is not bound to an instance object." ) if _VALIDATION_ENABLED.get(): if isinstance(value, abc.Iterable) or hasattr(value, "__getitem__"): self.validate_many(value) else: self.validate_one(value) getattr(self._bound_obj, self._private_name)[key] = value def __len__(self) -> int: return self._len def __repr__(self) -> str: return f"StructArray({self._validator._ctype.__name__}, len={self._len}) at 0x{id(self):016X}" def __str__(self): return self.pretty_print()
[docs] def pretty_print(self, add_tabs=0): """Generate formatted string for structure array Args: add_tabs (int, optional): Indent level. Defaults to 0. Returns: str: Formatted string """ if self._bound_obj: max_len = 5 val = getattr(self._bound_obj, self._private_name) tab = "\t" * add_tabs if self._len <= max_len: return ( f"{tab}[\n" + "\n".join(x.pretty_print(add_tabs) for x in val) + f"\n{tab}]" ) else: return ( f"{tab}[\n" + "\n".join( f"{val[i].pretty_print(add_tabs)}" for i in range(max_len) ) + f"\n{tab}...]" ) else: return self.__repr__()
def __eq__(self, value) -> bool: if not isinstance(value, abc.Sequence): return False if len(value) != self._len: return False for self_val, comp_val in zip(self, value): if self_val != comp_val: return False return True
[docs] def validate_one(self, value: _S): """Validate a structure Args: value (_S): Structure value to validate """ self._validator.validate_one(value)
[docs] def validate_many(self, value: Iterable[_S]): """Validate multiple structures Args: value (Iterable[_S]): Structure values to validate """ self._validator.validate_many(value)
[docs] def validate_array(self, value: StructArray[_S]): """Validate structure array Args: value (StructArray[_S]): StructArray to validate Raises: TypeError: Wrong type """ if not isinstance(value, StructArray): raise TypeError( f"Expected a StructArray({self._validator._ctype.__name__}, {len(self)}). Got {type(value).__name__}." ) if value._validator._ctype is not self._validator._ctype: raise TypeError( f"Expected a StructArray({self._validator._ctype.__name__}, {len(self)}). Got {type(value).__name__}({value._validator._ctype.__name__}, {len(value)})." ) if len(value) != len(self): raise ValueError( f"Array size mismatch. Expected an {self.__class__.__name__}({self._validator._ctype.__name__}, {len(self)}). Got {type(value).__name__}({value._validator._ctype.__name__}, {len(value)})." ) if value._bound_obj is None: raise ValueError( f"The instance of {type(value).__name__}({value._validator._ctype.__name__}, {len(value)}) is not bound to a MessageBase object." ) return