Source code for pyrtma.data_logger.dataset

"""pyrtma.data_logger.dataset"""

from __future__ import annotations

import time
import textwrap
import pyrtma
import pyrtma.core_defs as cd
from pyrtma.data_logger.exceptions import NoClientError
from pyrtma.exceptions import UnknownMessageType
from pathlib import Path
from typing import TYPE_CHECKING, List, Dict, Any, ClassVar, cast

if TYPE_CHECKING:
    from _typeshed import StrPath

__all__ = ["Dataset"]


[docs] class Dataset: DATALOGGER_TYPES: ClassVar[tuple[int, ...]] = ( cd.MT_DATASET_STATUS, cd.MT_DATASET_ADDED, cd.MT_DATASET_REMOVED, cd.MT_DATASET_STARTED, cd.MT_DATASET_STOPPED, cd.MT_DATASET_SAVED, cd.MT_DATA_LOGGER_CONFIG, cd.MT_DATA_LOGGER_ERROR, ) def __init__( self, name: str, save_path: StrPath, filename: str, formatter: str, subdivide_interval: int = 0, msg_types: List[int] | None = None, mm_ip: str = "127.0.0.1:7111", status_interval: float = 5.0, create_client: bool = True, ): if create_client: self._client = pyrtma.Client(0, name=name) self._client.connect(mm_ip) self._client.subscribe(Dataset.DATALOGGER_TYPES) self._managed_client = True else: # User needs to register a client self._client = None self._name = name self._save_path = Path(save_path) self._filename = filename self._formatter = formatter self._subdivide_interval = subdivide_interval self._status = cd.MDF_DATASET_STATUS() if msg_types is None: self._msg_types = tuple([cd.ALL_MESSAGE_TYPES]) else: self._msg_types = tuple(msg_types) self._started = False self._added = False self._removed = False self._stopped = False self._saved: list[str] = [] self.status_interval = status_interval self._last_request = time.time() def close(self): if self._managed_client and self._client: self._client.disconnect() def __del__(self): self.close def __str__(self) -> str: s = f"""\ name = {self.name} filename = {self.filename} formatter = {self.formatter} subdivide = {self.subdivide_interval} msg_types = {self.msg_names} added = {self.added} started = {self.started} stopped = {self.stopped} removed = {self.removed} is_recording = {self.is_recording} is_paused = {self.is_paused} elapsed_time = {self.elapsed_time} """ return textwrap.dedent(s)
[docs] def register_client(self, client: pyrtma.Client): """Register a pyrtma client with this dataset Args: client (pyrtma.Client): client to register """ if self._managed_client and self._client: self._client.disconnect() self._client = client self._managed_client = False
def unregister_client(self): if not self._managed_client: self._client = None @property def name(self) -> str: return self._name @property def save_path(self) -> Path: return self._save_path @property def filename(self) -> str: return self._filename @property def formatter(self) -> str: return self._formatter @property def subdivide_interval(self) -> int: return self._subdivide_interval @property def msg_types(self) -> tuple[int, ...]: return self._msg_types @property def msg_names(self) -> tuple[str, ...]: names = [] for msg_type in self.msg_types: if msg_type < 1: continue if msg_type == cd.ALL_MESSAGE_TYPES: names.append("ALL_MESSAGE_TYPES") else: try: names.append(pyrtma.get_msg_cls(msg_type).type_name) except UnknownMessageType: names.append(msg_type) return tuple(names) @property def added(self) -> bool: return self._started @property def removed(self) -> bool: return self._started @property def started(self) -> bool: return self._started @property def stopped(self) -> bool: return self._stopped @property def saved(self) -> tuple[str, ...]: return tuple(self._saved) @property def elapsed_time(self) -> float: return self._status.elapsed_time @property def is_recording(self) -> bool: return bool(self._status.is_recording) @property def is_paused(self) -> bool: return bool(self._status.is_paused) def poll(self, timeout: float = 0) -> cd.MDF_DATA_LOGGER_ERROR | None: if not self._managed_client: raise NotImplementedError( "User must implement a custom 'poll' function after registering an external pyrtma.Client to a Dataset" ) if self._client is None: raise NoClientError if (now := time.time()) - self._last_request > self.status_interval: req = cd.MDF_DATASET_STATUS_REQUEST() req.name = self.name self._client.send_message(req) self._last_request = now msg = self._client.read_message(timeout) if msg: return self.process_msg(msg) def process_msg(self, msg: pyrtma.Message) -> cd.MDF_DATA_LOGGER_ERROR | None: if msg.type_id in ( cd.MT_DATASET_STATUS, cd.MT_DATASET_ADDED, cd.MT_DATASET_STARTED, cd.MT_DATASET_STOPPED, cd.MT_DATASET_REMOVED, cd.MT_DATASET_SAVED, cd.MT_DATA_LOGGER_ERROR, ): if msg.data.name != self._name: return match (msg.type_id): case cd.MT_DATASET_STATUS: self._status = cast(cd.MDF_DATASET_STATUS, msg.data) self._last_status = time.time() case cd.MT_DATASET_STARTED: self._started = True case cd.MT_DATASET_STOPPED: self._stopped = True case cd.MT_DATASET_ADDED: self._added = True case cd.MT_DATASET_REMOVED: self._removed = True case cd.MT_DATASET_SAVED: self._saved.append(cast(cd.MDF_DATASET_SAVED, msg.data).filepath) case cd.MT_DATA_LOGGER_ERROR: return cast(cd.MDF_DATA_LOGGER_ERROR, msg.data) case cd.MT_DATA_LOGGER_CONFIG: self.datalogger_config = self.process_data_logger_config_msg( cast(cd.MDF_DATA_LOGGER_CONFIG, msg.data) ) def add(self) -> cd.MDF_DATASET_ADD: msg = cd.MDF_DATASET_ADD() msg.dataset.name = self.name msg.dataset.save_path = str(self.save_path) msg.dataset.filename = self.filename msg.dataset.formatter = self.formatter msg.dataset.msg_types[: len(self.msg_types)] = self.msg_types msg.dataset.subdivide_interval = self.subdivide_interval if self._client: self._client.send_message(msg) else: raise NoClientError return msg
[docs] def remove(self) -> cd.MDF_DATASET_REMOVE: """Remove dataset config from data_logger""" msg = cd.MDF_DATASET_REMOVE() msg.name = self.name if self._client: self._client.send_message(msg) else: raise NoClientError return msg
[docs] def start(self) -> cd.MDF_DATASET_START: """Start dataset recording in data_logger""" msg = cd.MDF_DATASET_START() msg.name = self.name if self._client: self._client.send_message(msg) else: raise NoClientError return msg
[docs] def stop(self) -> cd.MDF_DATASET_STOP: """Stop dataset recording in data_logger""" msg = cd.MDF_DATASET_STOP() msg.name = self.name if self._client: self._client.send_message(msg) else: raise NoClientError return msg
[docs] def pause(self) -> cd.MDF_DATASET_PAUSE: """Pause dataset recording in data_logger""" msg = cd.MDF_DATASET_PAUSE() msg.name = self.name if self._client: self._client.send_message(msg) else: raise NoClientError return msg
[docs] def resume(self) -> cd.MDF_DATASET_RESUME: """Resume dataset recording in data_logger""" msg = cd.MDF_DATASET_RESUME() msg.name = self.name if self._client: self._client.send_message(msg) else: raise NoClientError return msg
[docs] def status_request(self) -> cd.MDF_DATASET_STATUS_REQUEST: """Request dataset status in data_logger""" msg = cd.MDF_DATASET_STATUS_REQUEST() msg.name = self.name if self._client: self._client.send_message(msg) else: raise NoClientError return msg
[docs] def request_data_logger_config(self): """Request data_logger config""" if self._client: self._client.send_signal(cd.MT_DATA_LOGGER_CONFIG_REQUEST) else: raise NoClientError
[docs] def reset_data_logger(self): """Reset data_logger (stop and remove all)""" if self._client: self._client.send_signal(cd.MT_DATA_LOGGER_RESET) else: raise NoClientError
[docs] @staticmethod def process_data_logger_config_msg( msg_data: cd.MDF_DATA_LOGGER_CONFIG, ) -> Dict[str, Any]: """Process a data_logger config message Args: msg_data (cd.MDF_DATA_LOGGER_CONFIG): unprocessed message data Returns: Dict[str, Any]: data_logger config dict """ d = msg_data.to_dict() for ds in d["datasets"]: names = [] for msg_type in ds["msg_types"]: if msg_type < 1: continue if msg_type == cd.ALL_MESSAGE_TYPES: names.append("ALL_MESSAGE_TYPES") else: try: names.append(pyrtma.get_msg_cls(msg_type).type_name) except UnknownMessageType: names.append(msg_type) ds["msg_types"] = names d["datasets"] = d["datasets"][: d["num_datasets"]] return d