coordinator module

The coordinator provides an interface between the process threads, and facilitates inter-process communication. It is a wrapper around xml-rpc, which allowed us to use defined data-structures such as SensorValues.

Submodules

coordinator

Classes:

Alarm(alarm_type, severity, start_time, …)

Representation of alarm status and parameters

ControlSetting(name, value, min_value, …)

Message containing ventilation control parameters.

CoordinatorBase([sim_mode])

CoordinatorLocal([sim_mode])

param sim_mode

CoordinatorRemote([sim_mode])

ProcessManager(sim_mode[, startCommandLine, …])

SensorValues([timestamp, loop_counter, …])

Structured class for communicating sensor readings throughout PVP.

ValueName(value)

Canonical names of all values used in PVP.

Data:

Dict

The central part of internal API.

List

The central part of internal API.

Functions:

get_coordinator([single_process, sim_mode])

get_rpc_client()

init_logger(module_name[, log_level, …])

Initialize a logger for logging events.

class pvp.coordinator.coordinator.CoordinatorBase(sim_mode=False)[source]

Bases: object

Methods:

get_alarms()

get_breath_detection()

get_control(control_setting_name)

get_sensors()

is_running()

kill()

set_breath_detection(breath_detection)

set_control(control_setting)

start()

stop()

get_sensors()pvp.common.message.SensorValues[source]
get_alarms() → Union[None, Tuple[pvp.alarm.alarm.Alarm]][source]
set_control(control_setting: pvp.common.message.ControlSetting)[source]
get_control(control_setting_name: pvp.common.values.ValueName)pvp.common.message.ControlSetting[source]
set_breath_detection(breath_detection: bool)[source]
get_breath_detection()bool[source]
start()[source]
is_running()bool[source]
kill()[source]
stop()[source]
class pvp.coordinator.coordinator.CoordinatorLocal(sim_mode=False)[source]

Bases: pvp.coordinator.coordinator.CoordinatorBase

Parameters

sim_mode

_is_running

.set() when thread should stop

Type

threading.Event

Methods:

__init__([sim_mode])

param sim_mode

get_alarms()

get_breath_detection()

get_control(control_setting_name)

get_sensors()

is_running()

Test whether the whole system is running

kill()

set_breath_detection(breath_detection)

set_control(control_setting)

start()

Start the coordinator.

stop()

Stop the coordinator.

__init__(sim_mode=False)[source]
Parameters

sim_mode

_is_running

.set() when thread should stop

Type

threading.Event

get_sensors()pvp.common.message.SensorValues[source]
get_alarms() → Union[None, Tuple[pvp.alarm.alarm.Alarm]][source]
set_control(control_setting: pvp.common.message.ControlSetting)[source]
get_control(control_setting_name: pvp.common.values.ValueName)pvp.common.message.ControlSetting[source]
set_breath_detection(breath_detection: bool)[source]
get_breath_detection()bool[source]
start()[source]

Start the coordinator. This does a soft start (not allocating a process).

is_running()bool[source]

Test whether the whole system is running

stop()[source]

Stop the coordinator. This does a soft stop (not kill a process)

kill()[source]
class pvp.coordinator.coordinator.CoordinatorRemote(sim_mode=False)[source]

Bases: pvp.coordinator.coordinator.CoordinatorBase

Methods:

get_alarms()

get_breath_detection()

get_control(control_setting_name)

get_sensors()

is_running()

Test whether the whole system is running

kill()

Stop the coordinator and end the whole program

set_breath_detection(breath_detection)

set_control(control_setting)

start()

Start the coordinator.

stop()

Stop the coordinator.

get_sensors()pvp.common.message.SensorValues[source]
get_alarms() → Union[None, Tuple[pvp.alarm.alarm.Alarm]][source]
set_control(control_setting: pvp.common.message.ControlSetting)[source]
get_control(control_setting_name: pvp.common.values.ValueName)pvp.common.message.ControlSetting[source]
set_breath_detection(breath_detection: bool)[source]
get_breath_detection()bool[source]
start()[source]

Start the coordinator. This does a soft start (not allocating a process).

is_running()bool[source]

Test whether the whole system is running

stop()[source]

Stop the coordinator. This does a soft stop (not kill a process)

kill()[source]

Stop the coordinator and end the whole program

pvp.coordinator.coordinator.get_coordinator(single_process=False, sim_mode=False)pvp.coordinator.coordinator.CoordinatorBase[source]

ipc

Classes:

SimpleXMLRPCServer(addr[, requestHandler, …])

Simple XML-RPC server.

Functions:

get_alarms()

get_breath_detection()

get_control(control_setting_name)

get_rpc_client()

get_sensors()

init_logger(module_name[, log_level, …])

Initialize a logger for logging events.

rpc_server_main(sim_mode, serve_event[, …])

set_breath_detection(breath_detection)

set_control(control_setting)

pvp.coordinator.rpc.get_sensors()[source]
pvp.coordinator.rpc.get_alarms()[source]
pvp.coordinator.rpc.set_control(control_setting)[source]
pvp.coordinator.rpc.get_control(control_setting_name)[source]
pvp.coordinator.rpc.set_breath_detection(breath_detection)[source]
pvp.coordinator.rpc.get_breath_detection()[source]
pvp.coordinator.rpc.rpc_server_main(sim_mode, serve_event, addr='localhost', port=9533)[source]
pvp.coordinator.rpc.get_rpc_client()[source]

process_manager

Classes:

ProcessManager(sim_mode[, startCommandLine, …])

class pvp.coordinator.process_manager.ProcessManager(sim_mode, startCommandLine=None, maxHeartbeatInterval=None)[source]

Bases: object

Methods:

heartbeat(timestamp)

restart_process()

start_process()

try_stop_process()

start_process()[source]
try_stop_process()[source]
restart_process()[source]
heartbeat(timestamp)[source]