Source code for pyrtma.data_logger.data_logger

"""DataLogger class

Run with python -m pyrtma.data_logger
"""

import time

import pyrtma
import pyrtma.core_defs as cd

from pyrtma.exceptions import UnknownMessageType

from .dataset_writer import DatasetWriter
from .exceptions import *

from typing import cast


[docs] class DataLogger: MAX_DATASETS = 6 ALL_SETS = ("*", "all") def __init__(self, rtma_server_ip: str, log_level: int): self.mm_ip = rtma_server_ip self.client = pyrtma.Client(module_id=cd.MID_DATA_LOGGER, name="data_logger") self.client.logger.set_all_levels(log_level) self.client.connect(rtma_server_ip, logger_status=True) self.logger = self.client.logger self.ctrl_msg_types = [ cd.MT_DATASET_START, cd.MT_DATASET_STOP, cd.MT_DATASET_PAUSE, cd.MT_DATASET_RESUME, cd.MT_DATASET_ADD, cd.MT_DATASET_REMOVE, cd.MT_DATASET_STATUS_REQUEST, cd.MT_DATA_LOGGER_CONFIG_REQUEST, cd.MT_DATA_LOGGER_RESET, cd.MT_EXIT, cd.MT_LM_EXIT, ] # self.client.subscribe(self.ctrl_msg_types) self.client.subscribe([cd.ALL_MESSAGE_TYPES]) self.client.send_module_ready() self.client.info("DataLogger connected and waiting for configuration.") self.datasets: dict[str, DatasetWriter] = {} def update(self, msg: pyrtma.Message | None): dead = [] for name, ds in self.datasets.items(): if ds.dead: dead.append(name) else: ds.update(msg) if ds.file_saved.is_set(): self.client.send_message(ds.save_msg) ds.file_saved.clear() self.logger.info(f"Saved file: {ds.save_msg.filepath}") elif ds.error_event.is_set(): self.client.send_message(ds.error) self.client.error(ds.error) # Remove the dataset after stoppping for name in dead: self.rm_dataset(name) def rm_dataset(self, name: str, dest_mod_id: int = 0): ds = self.datasets.get(name) if ds is None: raise DatasetNotFound(name, f"Dataset named '{name}' not found.") if ds.recording: raise DatasetInProgress( name, f"Recording in progresss for dataset '{ds.name}'" ) self.datasets.pop(name) reply = cd.MDF_DATASET_REMOVED() reply.name = name self.client.send_message(reply, dest_mod_id=dest_mod_id) self.logger.info(f"Removed dataset: '{name}'") def send_status(self, name: str = "all", dest_mod_id: int = 0): msg = cd.MDF_DATASET_STATUS() msg.timestamp = time.time() if name in DataLogger.ALL_SETS: for ds in self.datasets.values(): msg.name = ds.name msg.elapsed_time = ds.elapsed_time msg.is_recording = ds.recording msg.is_paused = ds.paused self.client.send_message(msg, dest_mod_id=dest_mod_id) else: ds = self.datasets.get(name) if ds is None: raise DatasetNotFound(name, f"Dataset named '{name}' not found.") msg.name = ds.name msg.elapsed_time = ds.elapsed_time msg.is_recording = ds.recording msg.is_paused = ds.paused self.client.send_message(msg, dest_mod_id=dest_mod_id) def send_config(self, dest_mod_id: int = 0): msg = cd.MDF_DATA_LOGGER_CONFIG() msg.num_datasets = len(self.datasets) for i, ds in enumerate(self.datasets.values()): d = msg.datasets[i] d.name = ds.name d.save_path = str(ds.save_path) d.filename = ds.filename d.formatter = ds.formatter_cls.name d.msg_types[: len(ds.msg_types)] = ds.msg_types d.subdivide_interval = ( 0 if isinstance(ds.subdivide_interval, float) else ds.subdivide_interval ) self.client.send_message(msg, dest_mod_id=dest_mod_id) def start_logging(self, name: str, dest_mod_id: int = 0): if name in DataLogger.ALL_SETS: for ds in self.datasets.values(): ds.start() self.logger.info(f"Starting dataset: '{ds.name}'") self.logger.info(f"Saving Dataset:{ds.name} to {ds.file_path}") start_msg = cd.MDF_DATASET_STARTED() start_msg.name = ds.name self.client.send_message(start_msg) else: ds = self.datasets.get(name) if ds is None: raise DatasetNotFound(name, f"Dataset named '{name}' not found.") if ds.recording: raise DatasetInProgress( name, f"Recording in progresss for dataset '{ds.name}'" ) ds.start() self.logger.info(f"Starting dataset: '{ds.name}'") self.logger.info(f"Saving Dataset:{ds.name} to {ds.file_path}") start_msg = cd.MDF_DATASET_STARTED() start_msg.name = ds.name self.client.send_message(start_msg) self.send_status(dest_mod_id=dest_mod_id) def stop_logging(self, name: str, dest_mod_id: int = 0): rm = [] if name in DataLogger.ALL_SETS: for ds in self.datasets.values(): if not ds.recording: self.logger.warning(f"Dataset {ds.name} is not recording") continue ds.stop() self.logger.info(f"Stopping dataset: '{ds.name}'") rm.append(ds.name) stop_msg = cd.MDF_DATASET_STOPPED() stop_msg.name = ds.name self.client.send_message(stop_msg) else: ds = self.datasets.get(name) if ds is None: raise DatasetNotFound(name, f"Dataset named '{name}' not found.") if not ds.recording: self.logger.warning(f"Dataset {ds.name} is not recording") return rm.append(ds.name) ds.stop() self.logger.info(f"Stopping dataset: '{ds.name}'") stop_msg = cd.MDF_DATASET_STOPPED() stop_msg.name = ds.name self.client.send_message(stop_msg) self.send_status(dest_mod_id=dest_mod_id) def pause_logging(self, name: str, dest_mod_id: int = 0): if name in DataLogger.ALL_SETS: for ds in self.datasets.values(): ds.pause() else: ds = self.datasets.get(name) if ds is None: raise DatasetNotFound(name, f"Dataset named '{name}' not found.") if ds.paused: self.logger.warning(f"Dataset {ds.name} is already paused") return ds.pause() self.logger.info(f"Pausing dataset: '{ds.name}'") self.send_status(dest_mod_id=dest_mod_id) def resume_logging(self, name: str, dest_mod_id: int = 0): if name in DataLogger.ALL_SETS: for ds in self.datasets.values(): ds.resume() else: ds = self.datasets.get(name) if ds is None: raise DatasetNotFound(name, f"Dataset named '{name}' not found.") if not ds.paused: self.logger.warning(f"Dataset {ds.name} is not paused") return ds.resume() self.logger.info(f"Resuming dataset: {ds.name}") self.send_status(dest_mod_id=dest_mod_id) def add_dataset(self, msg: cd.MDF_DATASET_ADD, dest_mod_id: int = 0): dataset = DatasetWriter( name=msg.dataset.name, save_path=msg.dataset.save_path, filename=msg.dataset.filename, msg_types=msg.dataset.msg_types[:], formatter=msg.dataset.formatter, subdivide_interval=msg.dataset.subdivide_interval, ) if len(self.datasets) == DataLogger.MAX_DATASETS: raise DataLoggerFullError( dataset.name, "Logger has maximum allowed datasets configured." ) if dataset.name in self.datasets.keys(): raise DatasetExistsError( dataset.name, f"A dataset with name '{dataset.name}' already exists" ) self.datasets[dataset.name] = dataset reply = cd.MDF_DATASET_ADDED() reply.name = dataset.name self.client.send_message(reply, dest_mod_id=dest_mod_id) self.logger.info(f"Added dataset: '{dataset.name}'") def reset(self, dest_mod_id: int = 0): self.client.info(f"Reset DataLogger. All Datasets will be cleared") rm = [] for ds in self.datasets.values(): ds.stop() rm.append(ds.name) for name in rm: self.rm_dataset(name, dest_mod_id=dest_mod_id) def send_error(self, exc: DataLoggerError): err = cd.MDF_DATA_LOGGER_ERROR() err.dataset_name = exc.dataset err.exc_type = type(exc).__name__ err.msg = exc.msg self.client.send_message(err) def run(self): try: self._running = True while self._running: try: msg = self.client.read_message(0.100) except UnknownMessageType as e: self.client.warning(f"UnknownMessageType: {e.args[0]}") continue if msg is not None: try: dest_mod_id = msg.header.dest_mod_id match (msg.data.type_id): case cd.MT_DATASET_START: self.start_logging( cast(str, msg.data.name), dest_mod_id=dest_mod_id ) case cd.MT_DATASET_STOP: self.stop_logging( cast(str, msg.data.name), dest_mod_id=dest_mod_id ) case cd.MT_DATASET_PAUSE: self.pause_logging( cast(str, msg.data.name), dest_mod_id=dest_mod_id ) case cd.MT_DATASET_RESUME: self.resume_logging( cast(str, msg.data.name), dest_mod_id=dest_mod_id ) case cd.MT_DATASET_ADD: self.add_dataset( cast(cd.MDF_DATASET_ADD, msg.data), dest_mod_id=dest_mod_id, ) case cd.MT_DATASET_REMOVE: self.rm_dataset( cast(str, msg.data.name), dest_mod_id=dest_mod_id ) case cd.MT_DATA_LOGGER_RESET: self.reset(dest_mod_id=0) case cd.MT_DATASET_STATUS_REQUEST: self.send_status( cast(str, msg.data.name), dest_mod_id=msg.header.src_mod_id, ) case cd.MT_DATA_LOGGER_CONFIG_REQUEST: self.send_config(dest_mod_id=dest_mod_id) case cd.MT_EXIT: if msg.header.dest_mod_id == self.client.module_id: self._running = False self.client.info( "Received EXIT request. Closing..." ) case cd.MT_LM_EXIT: self._running = False self.client.info("Received LM_EXIT request. Closing...") except DataLoggerError as e: self.client.error(e.msg) self.send_error(e) self.update(msg) except KeyboardInterrupt: pass finally: for ds in self.datasets.values(): ds.stop() if self.client.connected: self.client.disconnect() self.client.info("DataLogger is exiting...")