Source code for pyrtma.data_logger.dataset_writer

"""DataSet class used internally by data_logger"""

import pathlib
import io
import math
import time
import threading
import pyrtma

import pyrtma.core_defs as cd
from ..message import Message
from ..core_defs import ALL_MESSAGE_TYPES
from .data_formatter import get_formatter
from typing import Type, Optional, IO, Any, List

from .data_formatter import DataFormatter
from .exceptions import (
    DatasetError,
    InvalidFormatter,
    DatasetExistsError,
    DatasetThreadError,
    DataLoggerError,
    DatasetWriterError,
)


[docs] class DatasetWriter: MIN_INTERVAL = 30 MAX_INTERVAL = 600 CONTINUOUS = math.inf WRITE_PERIOD = 15.0 def __init__( self, name: str, save_path: str, filename: str, formatter: str, subdivide_interval: int, msg_types: List[int], ): self._dead = False self._recording = False self._paused = False self._close = False self._elapsed_time = 0.0 self.name = name self.client = pyrtma.Client(0, name=f"dataset.{name}") # Note: We don't connect. Just want to use the RTMALogger # self.client.connect(mm_ip) self.logger = self.client.logger self.save_path = pathlib.Path(save_path) self.formatter_name = formatter formatter_cls = get_formatter(formatter) if formatter_cls is None: raise InvalidFormatter(self.name, f"No DataFormatter class named {name}") else: self.formatter_cls = formatter_cls # Append the formatter extension self.filename = filename + self.formatter_cls.ext self.base_file_name = filename if subdivide_interval <= 0: self.subdivide_interval = DatasetWriter.CONTINUOUS elif subdivide_interval < DatasetWriter.MIN_INTERVAL: self.subdivide_interval = DatasetWriter.MIN_INTERVAL elif subdivide_interval > DatasetWriter.MAX_INTERVAL: self.subdivide_interval = DatasetWriter.MAX_INTERVAL else: self.subdivide_interval = subdivide_interval # Placeholder for type checking purposes if "b" in self.formatter_cls.mode: self.formatter = self.formatter_cls(io.BytesIO()) else: self.formatter = self.formatter_cls(io.StringIO()) self.rbuf: List[Message] = [] self.wbuf: List[Message] = [] self.fd: Optional[IO[Any]] = None self.write_to_disk = threading.Event() self.write_finished = threading.Event() self.stop_flag = threading.Event() self.use_thread = True self.write_thread = threading.Thread(target=self.write) self.msg_types = [m for m in msg_types if m > 0] if ALL_MESSAGE_TYPES in msg_types: self.all_sub = True else: self.all_sub = False self.ref_time = -1 self.start_time = -1 self.subdivide_flag = False self.next_subdivide = math.inf self.sub_index = 0 self.error_event = threading.Event() self.error = cd.MDF_DATA_LOGGER_ERROR() # Save location try: self.save_path.mkdir(parents=True, exist_ok=True) except FileNotFoundError as e: raise DatasetError(self.name, f"Save path not found: {self.save_path}") except PermissionError as e: raise DatasetError(self.name, f"File Permission Error: Access is denied.") self.file_path = self.save_path / self.filename # List of all the files saved self.saved_files: list[pathlib.Path] = [] self.file_saved = threading.Event() self.save_msg = cd.MDF_DATASET_SAVED() if self.file_path.exists(): raise DatasetExistsError(self.name, f"{self.file_path} already exists.") def __del__(self): if self._dead: return else: if self.recording: self.stop() def pause(self): elapsed = self.elapsed_time self._elapsed_time = elapsed self._paused = True def resume(self): self._paused = False self.ref_time = time.time() @property def elapsed_time(self) -> float: if self.stopped: return 0 if self.paused: return self._elapsed_time return self._elapsed_time + (time.time() - self.ref_time) @property def total_elapsed_time(self) -> float: return time.time() - self.start_time @property def paused(self) -> bool: return self._paused @property def recording(self) -> bool: return self._recording @property def stopped(self) -> bool: return not self._recording @property def dead(self) -> bool: return self._dead def start(self): # Reset some tracking vars self.sub_index = 0 self.subdivide_flag = False # Start the write thread self.write_thread.start() self.fd = open(self.file_path, self.formatter_cls.mode) self.formatter = self.formatter_cls(self.fd) self.next_subdivide = self.subdivide_interval self.start_time = time.time() self.ref_time = self.start_time self.next_write = DatasetWriter.WRITE_PERIOD self._recording = True self._paused = False def update(self, msg: Optional[Message]): if self._paused or not self._recording: return if not self.write_thread.is_alive(): raise DatasetThreadError( self.name, "Data collection write thread is no longer active. Reset the logger.", ) if msg: if self.all_sub or msg.type_id in self.msg_types: self.rbuf.append(msg) elapsed = self.elapsed_time if elapsed > self.next_subdivide: self.next_subdivide = elapsed + self.subdivide_interval self.subdivide_flag = True write = True elif elapsed > self.next_write: write = True else: write = False if write: if self.write_to_disk.is_set(): # TODO: What is the right behavior here? raise DatasetWriterError("Unable to write fast enough.") else: self.next_write = elapsed + DatasetWriter.WRITE_PERIOD self.stage_for_write() def store_saved(self): if not self.file_saved.is_set(): self.save_msg = cd.MDF_DATASET_SAVED() self.save_msg.name = self.name self.save_msg.filepath = str(self.file_path) self.file_saved.set() self.saved_files.append(self.file_path) self.last_saved = self.file_path def subdivide(self): self.sub_index += 1 self.formatter.finalize(self.wbuf) if self.fd: self.fd.close() self.store_saved() new_filename = ( f"{self.base_file_name}_{self.sub_index:04d}{self.formatter_cls.ext}" ) self.file_path = self.file_path.parent / new_filename if self.file_path.exists(): raise DatasetExistsError(self.name, f"{self.file_path} already exists.") self.logger.debug(f"Sub-dividing dataset: {self.file_path}") self.fd = open(self.file_path, self.formatter_cls.mode) self.formatter = self.formatter_cls(self.fd) def stop(self): if self.recording: self.stop_flag.set() self.start_time = -1 self.ref_time = -1 self._recording = False self._paused = False self.next_write = -1.0 def send_error(self, exc: DataLoggerError): self.error.dataset_name = exc.dataset self.error.exc_type = type(exc).__name__ self.error.msg = exc.msg self.error_event.set()
[docs] def stage_for_write(self): """Set the write buffer data for the data collection write thread""" self.wbuf = self.rbuf self.rbuf = [] self.write_finished.clear() self.write_to_disk.set() self.logger.debug("Writing dataset buffers to disk.")
[docs] def write(self): """Write Threads main function""" try: while True: if self.write_to_disk.wait(0.5): self.formatter.write(self.wbuf) self.wbuf.clear() if not self.stopped and self.subdivide_flag: self.subdivide_flag = False self.subdivide() self.write_to_disk.clear() self.write_finished.set() elif self.stop_flag.is_set(): self.stage_for_write() self.formatter.finalize(self.wbuf) if self.fd is not None: self.fd.close() self.store_saved() self._dead = True break except KeyboardInterrupt: pass except DataLoggerError as e: self.client.error(e.msg) self.send_error(e) finally: self._dead = True self.logger.debug("Collection write thread exited.")
[docs] def blocking_write(self): """Blocking write without bg thread""" if self.write_to_disk.wait(0.5): self.write() self.write_to_disk.clear() self.write_finished.set()