Source code for vent.coordinator.coordinator

import pickle
import threading
from typing import List, Dict

import vent
import vent.controller.control_module
from vent.common.message import ControlSetting, Alarm
from vent.common.message import SensorValues
from vent.common.values import ValueName
from vent.coordinator.process_manager import ProcessManager
from vent.coordinator.rpc import get_rpc_client


[docs]class CoordinatorBase: def __init__(self, sim_mode=False): # get_ui_control_module handles single_process flag # self.lock = threading.Lock() pass # TODO: do we still need this # def get_msg_timestamp(self): # # return timestamp of last message # with self.lock: # last_message_timestamp = self.last_message_timestamp # return last_message_timestamp
[docs] def get_sensors(self) -> SensorValues: pass
[docs] def get_active_alarms(self) -> Dict[str, Alarm]: pass
[docs] def get_logged_alarms(self) -> List[Alarm]: pass
[docs] def clear_logged_alarms(self): pass
[docs] def set_control(self, control_setting: ControlSetting): pass
[docs] def get_control(self, control_setting_name: ValueName) -> ControlSetting: pass
[docs] def start(self): pass
[docs] def is_running(self) -> bool: pass
[docs] def stop(self): pass
[docs]class CoordinatorLocal(CoordinatorBase):
[docs] def __init__(self, sim_mode=False): """ Args: sim_mode: Attributes: _is_running (:class:`threading.Event`): ``.set()`` when thread should stop """ super().__init__(sim_mode=sim_mode) self.control_module = vent.controller.control_module.get_control_module(sim_mode)
[docs] def get_sensors(self) -> SensorValues: # return res return self.control_module.get_sensors()
[docs] def get_active_alarms(self) -> Dict[str, Alarm]: return self.control_module.get_active_alarms()
[docs] def get_logged_alarms(self) -> List[Alarm]: return self.control_module.get_logged_alarms()
[docs] def clear_logged_alarms(self): # TODO: implement this raise NotImplementedError
[docs] def set_control(self, control_setting: ControlSetting): self.control_module.set_control(control_setting)
[docs] def get_control(self, control_setting_name: ValueName) -> ControlSetting: return self.control_module.get_control(control_setting_name)
[docs] def start(self): """ Start the coordinator. This does a soft start (not allocating a process). """ self.control_module.start()
[docs] def is_running(self) -> bool: """ Test whether the whole system is running """ return self.control_module._running
[docs] def stop(self): """ Stop the coordinator. This does a soft stop (not kill a process) """ self.control_module.stop()
[docs]class CoordinatorRemote(CoordinatorBase): def __init__(self, sim_mode=False): super().__init__(sim_mode=sim_mode) # TODO: according to documentation, pass max_heartbeat_interval? self.process_manager = ProcessManager(sim_mode) self.rpc_client = get_rpc_client() # TODO: make sure the ipc connection is setup. There should be a clever method
[docs] def get_sensors(self) -> SensorValues: sensor_values = pickle.loads(self.rpc_client.get_sensors().data) return sensor_values
[docs] def get_active_alarms(self) -> Dict[str, Alarm]: pickled_res = self.rpc_client.get_active_alarms().data return pickle.loads(pickled_res)
[docs] def get_logged_alarms(self) -> List[Alarm]: pickled_res = self.rpc_client.get_logged_alarms().data return pickle.loads(pickled_res)
[docs] def clear_logged_alarms(self): # TODO: implement this raise NotImplementedError
[docs] def set_control(self, control_setting: ControlSetting): pickled_args = pickle.dumps(control_setting) self.rpc_client.set_control(pickled_args)
[docs] def get_control(self, control_setting_name: ValueName) -> ControlSetting: pickled_args = pickle.dumps(control_setting_name) pickled_res = self.rpc_client.get_control(pickled_args).data return pickle.loads(pickled_res)
[docs] def start(self): """ Start the coordinator. This does a soft start (not allocating a process). """ self.rpc_client.start()
[docs] def is_running(self) -> bool: """ Test whether the whole system is running """ return self.rpc_client.is_running()
[docs] def stop(self): """ Stop the coordinator. This does a soft stop (not kill a process) """ try: self.rpc_client.stop() except ConnectionRefusedError: pass self.process_manager.try_stop_process()
def __del__(self): self.stop()
[docs]def get_coordinator(single_process=False, sim_mode=False) -> CoordinatorBase: if single_process: return CoordinatorLocal(sim_mode) else: return CoordinatorRemote(sim_mode)