coordinator module

Submodules

coordinator

Classes

CoordinatorBase([sim_mode])

CoordinatorLocal([sim_mode])

param sim_mode

CoordinatorRemote([sim_mode])

Functions

get_coordinator([single_process, sim_mode])

class vent.coordinator.coordinator.CoordinatorBase(sim_mode=False)[source]

Bases: object

Methods

clear_logged_alarms()

get_active_alarms()

get_control(control_setting_name)

get_logged_alarms()

get_sensors()

is_running()

set_control(control_setting)

start()

stop()

get_sensors()vent.common.message.SensorValues[source]
get_active_alarms() → Dict[str, vent.common.message.Alarm][source]
get_logged_alarms() → List[vent.common.message.Alarm][source]
clear_logged_alarms()[source]
set_control(control_setting: vent.common.message.ControlSetting)[source]
get_control(control_setting_name: vent.common.values.ValueName)vent.common.message.ControlSetting[source]
start()[source]
is_running()bool[source]
stop()[source]
class vent.coordinator.coordinator.CoordinatorLocal(sim_mode=False)[source]

Bases: vent.coordinator.coordinator.CoordinatorBase

Parameters

sim_mode

Methods

__init__([sim_mode])

param sim_mode

clear_logged_alarms()

get_active_alarms()

get_control(control_setting_name)

get_logged_alarms()

get_sensors()

is_running()

Test whether the whole system is running

set_control(control_setting)

start()

Start the coordinator.

stop()

Stop the coordinator.

_is_running

.set() when thread should stop

Type

threading.Event

__init__(sim_mode=False)[source]
Parameters

sim_mode

_is_running

.set() when thread should stop

Type

threading.Event

get_sensors()vent.common.message.SensorValues[source]
get_active_alarms() → Dict[str, vent.common.message.Alarm][source]
get_logged_alarms() → List[vent.common.message.Alarm][source]
clear_logged_alarms()[source]
set_control(control_setting: vent.common.message.ControlSetting)[source]
get_control(control_setting_name: vent.common.values.ValueName)vent.common.message.ControlSetting[source]
start()[source]

Start the coordinator. This does a soft start (not allocating a process).

is_running()bool[source]

Test whether the whole system is running

stop()[source]

Stop the coordinator. This does a soft stop (not kill a process)

class vent.coordinator.coordinator.CoordinatorRemote(sim_mode=False)[source]

Bases: vent.coordinator.coordinator.CoordinatorBase

Methods

clear_logged_alarms()

get_active_alarms()

get_control(control_setting_name)

get_logged_alarms()

get_sensors()

is_running()

Test whether the whole system is running

set_control(control_setting)

start()

Start the coordinator.

stop()

Stop the coordinator.

get_sensors()vent.common.message.SensorValues[source]
get_active_alarms() → Dict[str, vent.common.message.Alarm][source]
get_logged_alarms() → List[vent.common.message.Alarm][source]
clear_logged_alarms()[source]
set_control(control_setting: vent.common.message.ControlSetting)[source]
get_control(control_setting_name: vent.common.values.ValueName)vent.common.message.ControlSetting[source]
start()[source]

Start the coordinator. This does a soft start (not allocating a process).

is_running()bool[source]

Test whether the whole system is running

stop()[source]

Stop the coordinator. This does a soft stop (not kill a process)

vent.coordinator.coordinator.get_coordinator(single_process=False, sim_mode=False)vent.coordinator.coordinator.CoordinatorBase[source]

ipc

process_manager

Classes

ProcessManager(sim_mode[, startCommandLine, …])

class vent.coordinator.process_manager.ProcessManager(sim_mode, startCommandLine=None, maxHeartbeatInterval=None)[source]

Bases: object

Methods

heartbeat(timestamp)

restart_process()

start_process()

try_stop_process()

start_process()[source]
try_stop_process()[source]
restart_process()[source]
heartbeat(timestamp)[source]