Source code for pvp.coordinator.coordinator

import pickle
import threading
from typing import List, Dict
import typing

import pvp
import pvp.controller.control_module
from pvp.common.message import ControlSetting
from pvp.alarm import Alarm
from pvp.common.message import SensorValues
from pvp.common.values import ValueName
from pvp.common.loggers import init_logger
from pvp.coordinator.process_manager import ProcessManager
from pvp.coordinator.rpc import get_rpc_client



[docs]class CoordinatorBase: def __init__(self, sim_mode=False): # get_ui_control_module handles single_process flag # self.lock = threading.Lock() self.logger = init_logger(__name__) self.logger.info('coordinator init') # TODO: do we still need this # def get_msg_timestamp(self): # # return timestamp of last message # with self.lock: # last_message_timestamp = self.last_message_timestamp # return last_message_timestamp
[docs] def get_sensors(self) -> SensorValues: # pragma: no cover pass
[docs] def get_alarms(self) -> typing.Union[None, typing.Tuple[Alarm]]: # pragma: no cover pass
[docs] def set_control(self, control_setting: ControlSetting): # pragma: no cover pass
[docs] def get_control(self, control_setting_name: ValueName) -> ControlSetting: # pragma: no cover pass
[docs] def set_breath_detection(self, breath_detection: bool): # pragma: no cover pass
[docs] def get_breath_detection(self) -> bool: # pragma: no cover pass
[docs] def start(self): # pragma: no cover pass
[docs] def is_running(self) -> bool: # pragma: no cover pass
[docs] def kill(self): # pragma: no cover pass
[docs] def stop(self): # pragma: no cover pass
[docs]class CoordinatorLocal(CoordinatorBase):
[docs] def __init__(self, sim_mode=False): """ Args: sim_mode: Attributes: _is_running (:class:`threading.Event`): ``.set()`` when thread should stop """ super().__init__(sim_mode=sim_mode) self.control_module = pvp.controller.control_module.get_control_module(sim_mode)
[docs] def get_sensors(self) -> SensorValues: # return res return self.control_module.get_sensors()
[docs] def get_alarms(self) -> typing.Union[None, typing.Tuple[Alarm]]: return self.control_module.get_alarms()
[docs] def set_control(self, control_setting: ControlSetting): self.control_module.set_control(control_setting)
[docs] def get_control(self, control_setting_name: ValueName) -> ControlSetting: return self.control_module.get_control(control_setting_name)
[docs] def set_breath_detection(self, breath_detection: bool): self.control_module.set_breath_detection(breath_detection)
[docs] def get_breath_detection(self) -> bool: return self.control_module.get_breath_detection()
[docs] def start(self): """ Start the coordinator. This does a soft start (not allocating a process). """ self.control_module.start()
[docs] def is_running(self) -> bool: """ Test whether the whole system is running """ return self.control_module._running
[docs] def stop(self): """ Stop the coordinator. This does a soft stop (not kill a process) """ self.control_module.stop()
[docs] def kill(self): # pragma: no cover # dont need to do anything since should just go away on its own pass
[docs]class CoordinatorRemote(CoordinatorBase): def __init__(self, sim_mode=False): super().__init__(sim_mode=sim_mode) # TODO: according to documentation, pass max_heartbeat_interval? self.process_manager = ProcessManager(sim_mode) self.rpc_client = get_rpc_client() # TODO: make sure the ipc connection is setup. There should be a clever method
[docs] def get_sensors(self) -> SensorValues: sensor_values = pickle.loads(self.rpc_client.get_sensors().data) return sensor_values
[docs] def get_alarms(self) -> typing.Union[None, typing.Tuple[Alarm]]: controller_alarms = pickle.loads(self.rpc_client.get_alarms().data) return controller_alarms
[docs] def set_control(self, control_setting: ControlSetting): pickled_args = pickle.dumps(control_setting) self.rpc_client.set_control(pickled_args)
[docs] def get_control(self, control_setting_name: ValueName) -> ControlSetting: pickled_args = pickle.dumps(control_setting_name) pickled_res = self.rpc_client.get_control(pickled_args).data return pickle.loads(pickled_res)
[docs] def set_breath_detection(self, breath_detection: bool): pickled_args = pickle.dumps(breath_detection) self.rpc_client.set_breath_detection(pickled_args)
[docs] def get_breath_detection(self) -> bool: return pickle.loads(self.rpc_client.get_breath_detection().data)
[docs] def start(self): """ Start the coordinator. This does a soft start (not allocating a process). """ self.rpc_client.start()
[docs] def is_running(self) -> bool: """ Test whether the whole system is running """ return self.rpc_client.is_running()
[docs] def stop(self): """ Stop the coordinator. This does a soft stop (not kill a process) """ try: self.rpc_client.stop() except ConnectionRefusedError: # pragma: no cover pass
[docs] def kill(self): """ Stop the coordinator and end the whole program """ self.stop() self.process_manager.try_stop_process()
def __del__(self): self.kill()
[docs]def get_coordinator(single_process=False, sim_mode=False) -> CoordinatorBase: if single_process: return CoordinatorLocal(sim_mode) else: return CoordinatorRemote(sim_mode)