"""pyrtma.manager module
Contains :py:class:`~MessageManager` class
"""
import socket
import select
import argparse
import logging
import time
import random
import ctypes
import os
import typing
from .client_logging import RTMALogger, ClientLike
from .validators import disable_message_validation
from .message import Message
from .header import MessageHeader, get_header_cls
from .message_data import MessageData
from .context import _get_core_defs
from .core_defs import ALL_MESSAGE_TYPES
from . import core_defs as cd
from typing import Dict, List, Tuple, Set, Type, Union, Optional
from itertools import chain
from dataclasses import dataclass, field
from collections import defaultdict, Counter
from contextlib import contextmanager
from contextvars import ContextVar
[docs]
@dataclass
class Module:
"""Module dataclass
Used internally by MessageManager to manage connections to each client module.
"""
uid: int
conn: socket.socket
address: Tuple[str, int]
header_cls: Type[MessageHeader]
name: str = ""
mod_id: int = 0
pid: int = 0
subs: Set[int] = field(default_factory=set)
connected: bool = False
is_logger: bool = False
is_daemon: bool = False
unique: bool = True
drops: int = 0
msg_count: int = 0
@property
def ipaddr(self) -> str:
return f"{self.address[0]}:{self.address[1]}"
@property
def addr(self) -> str:
return self.address[0]
@property
def port(self) -> int:
return self.address[1]
@property
def sub_all(self) -> bool:
return ALL_MESSAGE_TYPES in self.subs
[docs]
def send_message(self, header: MessageHeader, payload: Union[bytes, MessageData]):
"""Send a message
Args:
header (MessageHeader): Message header
payload (Union[bytes, MessageData]): Message data
"""
self.msg_count += 1
header.msg_count = self.msg_count
self.conn.sendall(header)
self.conn.sendall(payload)
[docs]
def send_ack(self):
"""Send ACKNOWLEDGE signal header"""
# Just send a header
header = self.header_cls()
header.msg_type = cd.MT_ACKNOWLEDGE
header.send_time = time.perf_counter()
header.src_mod_id = cd.MID_MESSAGE_MANAGER
header.dest_mod_id = self.mod_id
header.num_data_bytes = 0
self.msg_count += 1
header.msg_count = self.msg_count
self.conn.sendall(header)
[docs]
def close(self):
"""Close connection"""
self.conn.close()
self.connected = False
def __str__(self):
return f"{self.name or 'ID'}({self.mod_id}) @ {self.ipaddr}"
def __hash__(self):
return self.conn.__hash__()
[docs]
class MessageManager(ClientLike):
"""MessageManager class
RTMA Message Manager server implemented in python.
"""
INFO_INTERVAL = 5.0
def __init__(
self,
ip_address: str = "", # "" equivalent to socket.INADDR_ANY
port: int = 7111,
timecode=False,
log_level=logging.INFO,
debug=False,
send_msg_timing=True,
send_active_clients=True,
):
"""MessageManager class
RTMA Message Manager server implemented in python.
Args:
ip_address (str, optional): server IP address. Defaults to "".
port (int, optional): server port. Defaults to 7111.
timecode (bool, optional): Flag to use message header with timecode values. Defaults to False.
log_level (int, optional): logging level, defaults to logging.INFO.
debug (bool, optional): Flag for debug mode. Defaults to False.
send_msg_timing (bool, optional): Flag to send TIMING_MSG. Defaults to True.
send_active_clients (bool, optional): Flag to send ACTIVE_CLIENTS. Defaults to True.
"""
self._keep_running = False
self.ip_address = ip_address
self.port = port
self.header_cls = get_header_cls(timecode)
self.header_size = ctypes.sizeof(self.header_cls)
self.header_buffer = bytearray(self.header_size)
self.header_view = memoryview(self.header_buffer)
self.read_timeout = 0.200
self.write_timeout = 0 # c++ message manager uses timeout = 0 for all modules except logger modules, which uses -1 (blocking)
self._debug = debug
self.send_msg_timing = send_msg_timing
self.send_active_clients_msg = send_active_clients
self._logger = RTMALogger(f"message_manager", self, logging.INFO)
self.logger.set_all_levels(log_level)
if ip_address == socket.INADDR_ANY:
ip_address = "" # bind and Module require a string input, '' is treated as INADDR_ANY by bind
# Create the tcp listening socket
self.listen_socket = socket.socket(
family=socket.AF_INET, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP
)
self.listen_socket.bind((ip_address, port))
self.listen_socket.listen(socket.SOMAXCONN)
self.modules: Dict[socket.socket, Module] = {}
self.logger_modules: Set[Module] = set()
self.next_dynamic_mod_id_offset = 0
self.subscriptions: Dict[int, Set[Module]] = defaultdict(set)
self.sockets = [self.listen_socket]
self.start_time = time.time()
# dictionary of message type ids and message counts, reset each time timing_message is sent
self.message_counts: typing.Counter[int] = Counter()
self.t_last_message_count = time.perf_counter()
self.min_timing_message_period = 0.9
self.last_client_info: float = time.perf_counter()
# Disable Nagle Algorithm
self.listen_socket.setsockopt(
socket.getprotobyname("tcp"), socket.TCP_NODELAY, 1
)
self._uid = 0
self.wlist: List[socket.socket] = []
# Add message manager to its module list
self.mm_module = Module(
uid=0,
conn=self.listen_socket,
address=(ip_address, port),
header_cls=self.header_cls,
name="message_manager",
mod_id=0,
pid=os.getpid(),
connected=True,
is_logger=False,
)
self.modules[self.listen_socket] = self.mm_module
self.data_buffer = bytearray(1024**2)
self.data_view = memoryview(self.data_buffer)
# Address Reuse allowed for testing
if debug:
self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.logger.info("Message Manager Initialized.")
@property
def connected(self) -> bool:
return True
@property
def logger(self) -> RTMALogger:
return self._logger
def generate_uid(self) -> int:
self._uid += 1
return self._uid
[docs]
def assign_module_id(self) -> int:
"""Assign module ID dynamically to connecting module
Raises:
RuntimeError: Exceeded maximum number of allowed dynamic modules
Returns:
int: module ID
"""
current_ids = [mod.mod_id for mod in self.modules.values()]
MAX_DYN_IDS = cd.MAX_MODULES - cd.DYN_MOD_ID_START
for i in range(0, MAX_DYN_IDS):
mod_id = self.next_dynamic_mod_id_offset + cd.DYN_MOD_ID_START
self.next_dynamic_mod_id_offset += 1
if self.next_dynamic_mod_id_offset == MAX_DYN_IDS:
self.next_dynamic_mod_id_offset = 0
# check if mod id is already used, if it is, continue looping until we find an unused one
if mod_id not in current_ids:
return mod_id
# if we exit loop without returning, we failed to find a valid id
self.logger.error(
f"MessageManager::assign_module_id: All valid dynamic IDs are in use"
)
raise RuntimeError("Exceeded maximum limit of allowed modules.")
def decode_header(self) -> MessageHeader:
return self.header_cls.from_buffer(self.header_view)
[docs]
def connect_module(self, module: Module, msg: Message) -> bool:
"""Connect module
Args:
module (Module): Connecting module
msg (Message): Incoming connect message
Returns:
bool: success code
"""
# ignore v1 message that follows v2 message
if module.connected:
return False
if isinstance(msg.data, cd.MDF_CONNECT_V2):
module.mod_id = msg.data.mod_id
module.unique = msg.data.allow_multiple == 0
module.pid = msg.data.pid
module.name = msg.data.name
elif isinstance(msg.data, cd.MDF_CONNECT):
module.mod_id = msg.header.src_mod_id
else:
raise RuntimeError(
f"MessageManager::connect_module: Uknown Connect message definition"
)
# fields common to v1 and v2
module.is_logger = msg.data.logger_status == 1
module.is_daemon = msg.data.daemon_status == 1
if module.mod_id != 0:
if module.mod_id < 1 or module.mod_id > cd.DYN_MOD_ID_START:
self.logger.error(
f"Invalid Module id specified: {module.mod_id}. User assigned ids must be in range or 1 - {cd.DYN_MOD_ID_START}"
)
self.remove_module(module)
return False
for m in self.modules.values():
if m is module:
continue
if m.mod_id == module.mod_id:
if m.unique:
self.logger.error(
f"SET_ID - {module.ipaddr} - ID({module.mod_id}) - ID already in use. Closing connection."
)
self.remove_module(module)
return False
if module.unique:
self.logger.error(
f"SET_ID - {module.ipaddr} - ID({module.mod_id}) - ID already in use. Closing connection."
)
self.remove_module(module)
return False
if module.name:
if (m.unique or module.unique) and (m.name == module.name):
self.logger.error(
f"SET_NAME - {module.ipaddr} - ID({module.mod_id}) - {module.name} - Name already in use."
)
self.remove_module(module)
return False
self.logger.debug(
f"SET_NAME - {module.ipaddr} - ID({module.mod_id}) - {module.name}"
)
else:
module.mod_id = self.assign_module_id()
module.connected = True
if module.is_logger:
self.logger_modules.add(module)
return True
[docs]
def remove_module(self, module: Module):
"""Remove connected module
Args:
module (Module): Module object to remove
"""
# Drop all subscriptions for this module
for msg_type in module.subs:
self.subscriptions[msg_type].discard(module)
# Discard from logger module set if needed
self.logger_modules.discard(module)
# Drop from our module mapping
module.close()
self.send_client_close(module)
del self.modules[module.conn]
[docs]
def disconnect_module(self, src_module: Module):
"""Disconnect module
Args:
src_module (Module): Module object to disconnect
"""
self.remove_module(src_module)
[docs]
def add_subscription(self, src_module: Module, msg: Message):
"""Add message subscription
Args:
src_module (Module): Subscribing module
msg (Message): incoming SUBSCRIBE message
"""
sub = cd.MDF_SUBSCRIBE.from_buffer(msg.data)
if sub.msg_type == ALL_MESSAGE_TYPES:
self.subscriptions[sub.msg_type].add(src_module)
# Clear out the individual subs
for sub_type in src_module.subs:
self.subscriptions[sub_type].discard(src_module)
src_module.subs.clear()
src_module.subs.add(sub.msg_type)
self.logger.debug(f"SUBSCRIBE- {src_module!s} to ALL_MESSAGE_TYPES")
else:
# Ignore individual msg_types subs when subscribed to ALL_MESSAGE_TYPES
if src_module.sub_all:
return
self.subscriptions[sub.msg_type].add(src_module)
src_module.subs.add(sub.msg_type)
self.logger.debug(f"SUBSCRIBE- {src_module!s} to MT:{sub.msg_type}")
[docs]
def remove_subscription(self, src_module: Module, msg: Message):
"""Remove message subscription
Args:
src_module (Module): Unsubscribing module
msg (Message): incoming UNSUBSCRIBE message
"""
unsub = cd.MDF_UNSUBSCRIBE.from_buffer(msg.data)
if unsub.msg_type == ALL_MESSAGE_TYPES:
self.subscriptions[unsub.msg_type].discard(src_module)
# Clear out the individual subs
for sub_type in src_module.subs:
self.subscriptions[sub_type].discard(src_module)
src_module.subs.clear()
self.logger.debug(f"UNSUBSCRIBE- {src_module!s} from ALL_MESSAGE_TYPES")
else:
# Ignore individual msg_types unsubs when subscribed to ALL_MESSAGE_TYPES
if src_module.sub_all:
return
self.subscriptions[unsub.msg_type].discard(src_module)
src_module.subs.discard(unsub.msg_type)
self.logger.debug(f"UNSUBSCRIBE- {src_module!s} from MT:{unsub.msg_type}")
[docs]
def resume_subscription(self, src_module: Module, msg: Message):
"""Resume message subscription
Args:
src_module (Module): Subscribing module
msg (Message): incoming RESUME_SUBSCRIPTION message
"""
self.add_subscription(src_module, msg)
[docs]
def pause_subscription(self, src_module: Module, msg: Message):
"""Pause message subscription
Args:
src_module (Module): Subscribing module
msg (Message): incoming PAUSE_SUBSCRIPTION message
"""
self.remove_subscription(src_module, msg)
[docs]
def register_module_ready(self, src_module: Module, msg: Message):
"""Handle MODULE_READY message and register PID
Args:
src_module (Module): Module that is ready
msg (Message): Incoming MODULE_READY message
"""
mr = cd.MDF_MODULE_READY.from_buffer(msg.data)
src_module.pid = mr.pid
[docs]
def set_module_name(self, src_module: Module, msg: Message):
"""Set a module name
Args:
src_module (Module): Module that sent CLIENT_SET_NAME
msg (Message): Incoming CLIENT_SET_NAME message
"""
name_msg = cd.MDF_CLIENT_SET_NAME.from_buffer(msg.data)
src_module.name = name_msg.name or ""
self.logger.info(
f"SET_NAME - {src_module.ipaddr} - ID({src_module.mod_id}) - {src_module.name}"
)
[docs]
def read_message(self, sock: socket.socket) -> Optional[MessageHeader]:
"""Read an incoming message
Args:
sock (socket.socket): socket to read from
Returns:
MessageHeader: message header of incoming message
"""
# Read RTMA Header Section
nbytes = sock.recv_into(
self.header_buffer, self.header_size, socket.MSG_WAITALL
)
# Module that sent the message
mod = self.modules[sock]
if nbytes != self.header_size:
self.remove_module(mod)
self.logger.warning(
f"DROPPING - {mod!s} - Wrong number of header bytes returned from sock.recv_into. Got ({nbytes})."
)
return
header = self.decode_header()
# Read Data Section into Internal Buffer
data_size = header.num_data_bytes
if data_size:
if data_size > len(self.data_buffer):
self.logger.warning(
"Message Data size (data_size) exceeds buffer size. Header may be corrupted."
)
self.remove_module(mod)
return
nbytes = sock.recv_into(self.data_buffer, data_size, socket.MSG_WAITALL)
if nbytes != data_size:
self.logger.warning(
f"DROPPING - {mod!s} - Wrong number of data bytes returned from sock.recv_into. Got ({nbytes})"
)
self.remove_module(mod)
return
return header
[docs]
def forward_message(
self,
src_module: Module,
header: MessageHeader,
data: Union[bytes, MessageData],
):
"""Forward a message from other modules
The given message will be forwarded to:
- all subscribed logger modules
- if the message has a destination address, and it is subscribed to by that destination it will be forwarded only there
- if the message has no destination address, it will be forwarded to all subscribed modules or those subscribed to ALL_MESSAGE_TYPES
Args:
src_module (Module): Module where message originated from
header (MessageHeader): Message Header
data (Union[bytes, MessageData]): Message Data
"""
src_name = src_module.name or f"Module({src_module.mod_id})"
# Increment message counts
if self.send_msg_timing:
self.message_counts[header.msg_type] += 1
dest_mod_id = header.dest_mod_id
dest_host_id = header.dest_host_id
# Verify that the module & host ids are valid
if dest_mod_id < 0 or dest_mod_id > cd.MAX_MODULES:
self.logger.error(
f"MessageManager::forward_message: Got invalid dest_mod_id [{dest_mod_id}] from {src_name}"
)
return
if dest_host_id < 0 or dest_host_id > cd.MAX_HOSTS:
self.logger.error(
f"MessageManager::forward_message: Got invalid dest_host_id [{dest_host_id}] from {src_name}"
)
return
# Subscriber set for this message type
subscribers = list(
chain(
self.subscriptions[header.msg_type],
self.subscriptions[ALL_MESSAGE_TYPES],
)
)
for n in range(len(subscribers)):
module = subscribers[n]
if module.conn in self.wlist:
try:
if (
dest_mod_id == 0
or (module.mod_id == dest_mod_id)
or module.is_logger
):
module.send_message(header, data)
module.drops = 0
except ConnectionError as err:
self.remove_module(module)
self.logger.error(
f"Connection Error on write to {module!s} - {err!s}"
)
print("x", end="", flush=True)
self.send_failed_message(module, header, time.perf_counter())
elif module.is_logger:
# Block until logger is ready
select.select([], [module.conn], [], None)
try:
module.send_message(header, data)
module.drops = 0
except ConnectionError as err:
self.remove_module(module)
self.logger.error(
f"Connection Error on write to {module!s} - {err!s}"
)
print("x", end="", flush=True)
# this could result in infinite recursion,
# this is prevented by send_failed_message returning if
# failed message type is failed_message.
self.send_failed_message(module, header, time.perf_counter())
else:
module.drops += 1
print("x", end="", flush=True)
self.send_failed_message(module, header, time.perf_counter())
[docs]
def send_to_loggers(
self,
header: MessageHeader,
payload: Union[bytes, MessageData],
):
"""Forward message to registered logger modules
Args:
header (MessageHeader): Message header to send
payload (Union[bytes, MessageData]): Message data to send
"""
for module in self.logger_modules:
if module.conn not in self.wlist:
# Block until logger is ready
select.select([], [module.conn], [], None)
try:
module.send_message(header, payload)
module.drops = 0
except ConnectionError as err:
self.remove_module(module)
self.logger.error(f"Connection Error on write to {module!s} - {err!s}")
print("x", end="", flush=True)
# this could result in infinite recursion,
# this is prevented by send_failed_message returning if
# failed message type is failed_message.
self.send_failed_message(module, header, time.perf_counter())
[docs]
def send_message(
self,
msg_data: MessageData,
dest_mod_id: int = 0,
dest_host_id: int = 0,
timeout: float = 0,
):
"""Send a message originating from MessageManager module
Args:
msg_data (MessageData): Object containing the message to send
dest_mod_id (int, optional): Specific module ID to send to. Defaults to 0 (broadcast).
dest_host_id (int, optional): Specific host ID to send to. Defaults to 0 (broadcast).
timeout (float, optional): Reserved. Defaults to 0.
"""
header = self.header_cls()
header.msg_type = msg_data.type_id
header.send_time = time.perf_counter()
header.src_mod_id = cd.MID_MESSAGE_MANAGER
header.dest_mod_id = dest_mod_id
header.dest_host_id = dest_host_id
header.num_data_bytes = msg_data.type_size
self.forward_message(self.mm_module, header, msg_data)
[docs]
def send_ack(self, src_module: Module):
"""Send ACKNOWLEDGE signal header
Args:
src_module (Module): Module to send ACK to
"""
header = self.header_cls()
header.msg_type = cd.MT_ACKNOWLEDGE
header.send_time = time.perf_counter()
header.src_mod_id = cd.MID_MESSAGE_MANAGER
header.dest_mod_id = src_module.mod_id
header.num_data_bytes = 0
try:
src_module.send_message(header, b"")
except ConnectionError as err:
self.remove_module(src_module)
self.logger.error(f"Connection Error on write to {src_module!s} - {err!s}")
print("x", end="", flush=True)
self.send_failed_message(src_module, header, time.perf_counter())
# Always forward to logger modules
self.send_to_loggers(header, b"")
[docs]
def send_failed_message(
self,
dest_module: Module,
header: MessageHeader,
time_of_failure: float,
):
"""Send FAILED_MESSAGE
Args:
dest_module (Module): Intended destination
header (MessageHeader): Header of failed message
time_of_failure (float): Time of send failure
"""
# Avoid infinite recursion
if header.msg_type in (
cd.MT_FAILED_MESSAGE,
cd.MT_RTMA_LOG,
cd.MT_RTMA_LOG_CRITICAL,
cd.MT_RTMA_LOG_ERROR,
cd.MT_RTMA_LOG_WARNING,
cd.MT_RTMA_LOG_INFO,
cd.MT_RTMA_LOG_DEBUG,
):
return
out_header = self.header_cls()
data = cd.MDF_FAILED_MESSAGE()
out_header.msg_type = cd.MT_FAILED_MESSAGE
out_header.send_time = time.perf_counter()
out_header.src_mod_id = cd.MID_MESSAGE_MANAGER
out_header.num_data_bytes = ctypes.sizeof(data)
data.dest_mod_id = dest_module.mod_id
data.time_of_failure = time_of_failure
# Copy the values into the RTMA_MSG_HEADER
for fname, ftype, *_ in data.msg_header._fields_:
setattr(data.msg_header, fname, getattr(header, fname))
# send to logger modules AND modules subscribed to FAILED_MESSAGE
self.forward_message(self.mm_module, out_header, data)
[docs]
def send_timing_message(self):
"""Send TIMING_MESSAGE"""
data = cd.MDF_TIMING_MESSAGE()
for mt, count in self.message_counts.items():
data.timing[mt] = count
self.message_counts.clear()
for mod in self.modules.values():
data.ModulePID[mod.mod_id] = mod.pid
data.send_time = time.perf_counter()
[docs]
def send_client_close(self, module: Module):
"""Send CLIENT_CLOSED
Args:
module (Module): Closed module object
"""
self.logger.debug("CLIENT_CLOSE")
msg = cd.MDF_CLIENT_CLOSED()
msg.uid = module.uid
msg.pid = module.pid
msg.mod_id = module.mod_id
msg.is_logger = module.is_logger
msg.is_unique = module.unique
msg.port = module.port
msg.name = module.name
msg.addr = module.addr
self.send_message(msg)
[docs]
def send_client_info(self, module: Module):
"""Send CLIENT_INFO
Args:
module (Module): Module object to send info for
"""
self.logger.debug("CLIENT_INFO")
msg = cd.MDF_CLIENT_INFO()
msg.uid = module.uid
msg.pid = module.pid
msg.mod_id = module.mod_id
msg.is_logger = module.is_logger
msg.is_unique = module.unique
msg.port = module.port
msg.name = module.name
msg.addr = module.addr
self.send_message(msg)
[docs]
def send_active_clients(self):
"""Send ACTIVE_CLIENTS"""
self.logger.debug("ACTIVE_CLIENTS")
msg = cd.MDF_ACTIVE_CLIENTS()
msg.timestamp = time.perf_counter()
for i, (sock, module) in enumerate(self.modules.items()):
# if sock == self.listen_socket:
# continue
msg.client_mod_id[i] = module.mod_id
msg.client_pid[i] = module.pid
self.send_client_info(module)
msg.num_clients = len(self.modules) - 1
self.send_message(msg)
self.last_client_info = msg.timestamp
def decode_core_message(
self, src_module: Module, hdr: MessageHeader
) -> Union[Message, None]:
data_cls = _get_core_defs().get(hdr.msg_type)
if data_cls:
data = data_cls.from_buffer(self.data_buffer)
return Message(hdr, data)
else:
self.logger.critical(
f"Unknown core_def MT={hdr.msg_type} received from {src_module.name}"
)
self.remove_module(src_module)
return None
[docs]
def process_core_message(self, src_module: Module, header: MessageHeader):
"""Process incoming core message
Args:
src_module (Module): Message source module
header (MessageHeader): Message header of the incoming message
"""
msg_type = header.msg_type
if msg_type >= 100 or msg_type == cd.MT_DEBUG_TEXT:
# NOTE: DEBUG_TEXT is unsupported legacy STRING_DATA type
return
core_msg = self.decode_core_message(src_module, header)
if core_msg is None:
return
if msg_type == cd.MT_CONNECT or msg_type == cd.MT_CONNECT_V2:
if self.connect_module(src_module, core_msg):
self.send_ack(src_module)
self.send_client_info(src_module)
if msg_type == cd.MT_CONNECT:
self.logger.info(f"CONNECT - {src_module!s}")
else:
self.logger.info(f"CONNECT v2 - {src_module!s}")
elif msg_type == cd.MT_DISCONNECT:
self.disconnect_module(src_module)
self.logger.info(f"DISCONNECT - {src_module!s}")
elif msg_type == cd.MT_SUBSCRIBE:
self.add_subscription(src_module, core_msg)
self.send_ack(src_module)
elif msg_type == cd.MT_UNSUBSCRIBE:
self.remove_subscription(src_module, core_msg)
self.send_ack(src_module)
elif msg_type == cd.MT_PAUSE_SUBSCRIPTION:
self.pause_subscription(src_module, core_msg)
self.send_ack(src_module)
elif msg_type == cd.MT_RESUME_SUBSCRIPTION:
self.resume_subscription(src_module, core_msg)
self.send_ack(src_module)
elif msg_type == cd.MT_CLIENT_SET_NAME:
self.set_module_name(src_module, core_msg)
self.send_client_info(src_module)
elif msg_type == cd.MT_MODULE_READY:
self.register_module_ready(src_module, core_msg)
self.send_client_info(src_module)
[docs]
def process_message(self, src_module: Module, header: MessageHeader):
"""Process incoming message
Args:
src_module (Module): Message source module
header (MessageHeader): Message header of the incoming message
"""
# Handle any internal core messages that come through
self.process_core_message(src_module, header)
# Forward all messages that come through to MM
self.logger.debug(f"FORWARD - msg_type:{header.msg_type} from {src_module!s}")
data = self.data_view[: header.num_data_bytes]
self.forward_message(src_module, header, data)
[docs]
def close(self):
"""Close manager server"""
self._keep_running = False
[docs]
def run(self):
"""Start the message manager server"""
self._keep_running = True
try:
with disable_message_validation():
while self._keep_running:
rlist, _, _ = select.select(
self.modules.keys(), [], [], self.read_timeout
)
# Check for an incoming connection request
if len(rlist) > 0:
try:
rlist.remove(self.listen_socket)
conn, address = self.listen_socket.accept()
self.logger.info(
f"New connection accepted from {address[0]}:{address[1]}"
)
# Disable Nagle Algorithm
conn.setsockopt(
socket.getprotobyname("tcp"), socket.TCP_NODELAY, 1
)
self.sockets.append(conn)
self.modules[conn] = Module(
self.generate_uid(), conn, address, self.header_cls
)
except ValueError:
pass
# Randomly select the order of sockets with data.
random.shuffle(rlist)
# Check whichs clients are ready to receive data
self.wlist.clear()
if rlist:
_, self.wlist, _ = select.select(
[], self.modules.keys(), [], self.write_timeout
)
for client_socket in rlist:
# Check that module is still active
src = self.modules.get(client_socket)
if src:
try:
msg_hdr = self.read_message(client_socket)
except ConnectionError as err:
self.disconnect_module(src)
self.logger.error(
f"Connection Error on read, disconnecting {src!s} - {err!s}"
)
continue
if msg_hdr:
self.process_message(src, msg_hdr)
now = time.perf_counter()
if (
self.send_msg_timing
and (now - self.t_last_message_count)
> self.min_timing_message_period
):
self.send_timing_message()
self.t_last_message_count = now
if (
self.send_active_clients_msg
and (now - self.last_client_info) > self.INFO_INTERVAL
):
self.send_active_clients()
except KeyboardInterrupt:
self.logger.info("Stopping Message Manager")
finally:
for mod in self.modules:
mod.close()
[docs]
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-a",
"--addr",
type=str,
default="",
help="Listener address. IP address/hostname as a string. Default is '' which is evaluated as socket.INADDR_ANY.",
)
parser.add_argument(
"-p", "--port", type=int, default=7111, help="Listener port. Default is 7111."
)
parser.add_argument("-d", "--debug", action="store_true", help="Debug mode")
parser.add_argument(
"--log-level",
dest="log_level",
choices=["DEBUG", "INFO", "WARN", "ERROR"],
default="INFO",
help="Logging Level",
)
parser.add_argument(
"-t", "--timecode", action="store_true", help="Use timecode in message header"
)
parser.add_argument(
"-T",
"--disable_timing_msg",
action="store_true",
help="Disable sending of TIMING_MESSAGE",
)
parser.add_argument(
"--disable_active_clients_msg",
action="store_true",
help="Disable sending of ACTIVE_CLIENTS message periodically",
)
args = parser.parse_args()
if args.addr: # a non-empty host address was passed in.
ip_addr = args.addr
else:
ip_addr = "" # socket.INADDR_ANY
if args.log_level == "DEBUG":
level = logging.DEBUG
elif args.log_level == "INFO":
level = logging.INFO
elif args.log_level == "WARN":
level = logging.WARN
elif args.log_level == "ERROR":
level = logging.ERROR
else:
print("Unknown log level. Using INFO instead")
level = logging.INFO
with disable_message_validation():
msg_mgr = MessageManager(
ip_address=ip_addr,
port=args.port,
timecode=args.timecode,
log_level=level,
debug=args.debug,
send_msg_timing=(not args.disable_timing_msg),
send_active_clients=(not args.disable_active_clients_msg),
)
msg_mgr.run()
if __name__ == "__main__":
main()