coordinator module

Submodules

coordinator

Classes

CoordinatorBase([sim_mode])

CoordinatorLocal([sim_mode])

param sim_mode

CoordinatorRemote([sim_mode])

Functions

get_coordinator([single_process, sim_mode])

class pvp.coordinator.coordinator.CoordinatorBase(sim_mode=False)[source]

Bases: object

Methods

get_alarms()

get_control(control_setting_name)

get_sensors()

get_target_waveform()

is_running()

kill()

set_breath_detection(breath_detection)

set_control(control_setting)

start()

stop()

get_sensors()pvp.common.message.SensorValues[source]
get_alarms() → Union[None, Tuple[pvp.alarm.alarm.Alarm]][source]
set_control(control_setting: pvp.common.message.ControlSetting)[source]
get_control(control_setting_name: pvp.common.values.ValueName)pvp.common.message.ControlSetting[source]
set_breath_detection(breath_detection: bool)[source]
get_target_waveform()[source]
start()[source]
is_running()bool[source]
kill()[source]
stop()[source]
class pvp.coordinator.coordinator.CoordinatorLocal(sim_mode=False)[source]

Bases: pvp.coordinator.coordinator.CoordinatorBase

Parameters

sim_mode

Methods

__init__([sim_mode])

param sim_mode

get_alarms()

get_control(control_setting_name)

get_sensors()

get_target_waveform()

is_running()

Test whether the whole system is running

kill()

set_breath_detection(breath_detection)

set_control(control_setting)

start()

Start the coordinator.

stop()

Stop the coordinator.

_is_running

.set() when thread should stop

Type

threading.Event

__init__(sim_mode=False)[source]
Parameters

sim_mode

_is_running

.set() when thread should stop

Type

threading.Event

get_sensors()pvp.common.message.SensorValues[source]
get_alarms() → Union[None, Tuple[pvp.alarm.alarm.Alarm]][source]
set_control(control_setting: pvp.common.message.ControlSetting)[source]
get_control(control_setting_name: pvp.common.values.ValueName)pvp.common.message.ControlSetting[source]
set_breath_detection(breath_detection: bool)[source]
get_target_waveform()[source]
start()[source]

Start the coordinator. This does a soft start (not allocating a process).

is_running()bool[source]

Test whether the whole system is running

stop()[source]

Stop the coordinator. This does a soft stop (not kill a process)

kill()[source]
class pvp.coordinator.coordinator.CoordinatorRemote(sim_mode=False)[source]

Bases: pvp.coordinator.coordinator.CoordinatorBase

Methods

get_alarms()

get_control(control_setting_name)

get_sensors()

get_target_waveform()

is_running()

Test whether the whole system is running

kill()

Stop the coordinator and end the whole program

set_breath_detection(breath_detection)

set_control(control_setting)

start()

Start the coordinator.

stop()

Stop the coordinator.

get_sensors()pvp.common.message.SensorValues[source]
get_alarms() → Union[None, Tuple[pvp.alarm.alarm.Alarm]][source]
set_control(control_setting: pvp.common.message.ControlSetting)[source]
get_control(control_setting_name: pvp.common.values.ValueName)pvp.common.message.ControlSetting[source]
set_breath_detection(breath_detection: bool)[source]
get_target_waveform()[source]
start()[source]

Start the coordinator. This does a soft start (not allocating a process).

is_running()bool[source]

Test whether the whole system is running

stop()[source]

Stop the coordinator. This does a soft stop (not kill a process)

kill()[source]

Stop the coordinator and end the whole program

pvp.coordinator.coordinator.get_coordinator(single_process=False, sim_mode=False)pvp.coordinator.coordinator.CoordinatorBase[source]

ipc

Functions

get_alarms()

get_control(control_setting_name)

get_rpc_client()

get_sensors()

get_target_waveform()

rpc_server_main(sim_mode, serve_event[, …])

set_breath_detection(breath_detection)

set_control(control_setting)

pvp.coordinator.rpc.get_sensors()[source]
pvp.coordinator.rpc.get_alarms()[source]
pvp.coordinator.rpc.set_control(control_setting)[source]
pvp.coordinator.rpc.get_control(control_setting_name)[source]
pvp.coordinator.rpc.set_breath_detection(breath_detection)[source]
pvp.coordinator.rpc.get_target_waveform()[source]
pvp.coordinator.rpc.rpc_server_main(sim_mode, serve_event, addr='localhost', port=9533)[source]
pvp.coordinator.rpc.get_rpc_client()[source]

process_manager

Classes

ProcessManager(sim_mode[, startCommandLine, …])

class pvp.coordinator.process_manager.ProcessManager(sim_mode, startCommandLine=None, maxHeartbeatInterval=None)[source]

Bases: object

Methods

heartbeat(timestamp)

restart_process()

start_process()

try_stop_process()

start_process()[source]
try_stop_process()[source]
restart_process()[source]
heartbeat(timestamp)[source]