controller module

Classes

Balloon_Simulator(peep_valve)

Physics simulator for inflating a balloon with an attached PEEP valve.

ControlModuleBase(save_logs, flush_every)

Abstract controller class for simulation/hardware.

ControlModuleDevice([save_logs, …])

Uses ControlModuleBase to control the hardware.

ControlModuleSimulator([simulator_dt, …])

Controlling Simulation.

Functions

get_control_module([sim_mode, simulator_dt])

Generates control module.

class pvp.controller.control_module.ControlModuleBase(save_logs: bool = False, flush_every: int = 10)[source]

Bases: object

Abstract controller class for simulation/hardware.

1. General notes: All internal variables fall in three classes, denoted by the beginning of the variable:

Methods

__analyze_last_waveform()

This goes through the last waveform, and updates the internal variables: VTE, PEEP, PIP, PIP_TIME, I_PHASE, FIRST_PEEP and BPM.

__calculate_control_signal_in(dt)

Calculates the PID control signal by: - Combining the the three gain parameters.

__get_PID_error(ytarget, yis, dt, RC)

Calculates the three terms for PID control.

__save_values()

Helper function to reorganize key parameters in the main PID control loop, into a SensorValues object, that can be stored in the logfile, using a method from the DataLogger.

__start_new_breathcycle()

Some housekeeping.

__test_for_alarms()

Implements tests that are to be executed in the main control loop: - Test for HAPA - Test for Technical Alert, making sure sensor values are plausible - Test for Technical Alert, make sure continuous in contact Currently: Alarms are time.time() of first occurance.

_PID_update(dt)

This instantiates the PID control algorithms.

__init__(save_logs, flush_every)

Initializes the ControlModuleBase class.

_control_reset()

Resets the internal controller cycle to zero, i.e.

_controls_from_COPY()

_get_control_signal_in()

Produces the INSPIRATORY control-signal that has been calculated in __calculate_control_signal_in(dt)

_get_control_signal_out()

Produces the EXPIRATORY control-signal for the different states, i.e.

_initialize_set_to_COPY()

Makes a copy of internal variables.

_sensor_to_COPY()

_start_mainloop()

Prototype method to start main PID loop.

get_alarms()

A method callable from the outside to get a copy of the alarms, that the controller checks: High airway pressure, and technical alarms.

get_control(control_setting_name)

A method callable from the outside to get current control settings.

get_heartbeat()

Returns an independent heart-beat of the controller, i.e.

get_past_waveforms()

Public method to return a list of past waveforms from __cycle_waveform_archive.

get_sensors()

A method callable from the outside to get a copy of sensorValues

interrupt()

If the controller seems stuck, this generates a new thread, and starts the main loop.

is_running()

Public Method to assess whether the main loop thread is running.

set_breath_detection(breath_detection)

set_control(control_setting)

A method callable from the outside to set alarms.

start()

Method to start _start_mainloop as a thread.

stop()

Method to stop the main loop thread, and close the logfile.

  • COPY_varname: These are copies (for safe threading purposes) that are regularly sync’ed with internal variables.

  • __varname: These are variables only used in the ControlModuleBase-Class

  • _varname: These are variables used in derived classes.

2. Set and get values. Internal variables should only to be accessed though the set_ and get_ functions.

These functions act on COPIES of internal variables (__ and _), that are sync’d every few iterations. How often this is done is adjusted by the variable self._NUMBER_CONTROLL_LOOPS_UNTIL_UPDATE. To avoid multiple threads manipulating the same variables at the same time, every manipulation of COPY_ is surrounded by a thread lock.

Public Methods:
  • get_sensors(): Returns a copy of the current sensor values.

  • get_alarms(): Returns a List of all alarms, active and logged

  • get_control(ControlSetting): Sets a controll-setting. Is updated at latest within self._NUMBER_CONTROLL_LOOPS_UNTIL_UPDATE

  • get_past_waveforms(): Returns a List of waveforms of pressure and volume during at the last N breath cycles, N<self. _RINGBUFFER_SIZE, AND clears this archive.

  • start(): Starts the main-loop of the controller

  • stop(): Stops the main-loop of the controller

  • set_control(): Set the control

  • interrupt(): Interrupt the controller, and re-spawns the thread. Used to restart a stuck controller

  • is_running(): Returns a bool whether the main-thread is running

  • get_heartbeat(): Returns a heartbeat, more specifically, the continuously increasing iteration-number of the main control loop.

Initializes the ControlModuleBase class.

Parameters
  • save_logs (bool, optional) – Should sensor data and controls should be saved with the DataLogger? Defaults to False.

  • flush_every (int, optional) – Flush and rotate logs every n breath cycles. Defaults to 10.

Raises

alert – [description]

__init__(save_logs: bool = False, flush_every: int = 10)[source]

Initializes the ControlModuleBase class.

Parameters
  • save_logs (bool, optional) – Should sensor data and controls should be saved with the DataLogger? Defaults to False.

  • flush_every (int, optional) – Flush and rotate logs every n breath cycles. Defaults to 10.

Raises

alert – [description]

_initialize_set_to_COPY()[source]

Makes a copy of internal variables. This is used to facilitate threading

_sensor_to_COPY()[source]
_controls_from_COPY()[source]
__analyze_last_waveform()
This goes through the last waveform, and updates the internal variables:

VTE, PEEP, PIP, PIP_TIME, I_PHASE, FIRST_PEEP and BPM.

get_sensors()pvp.common.message.SensorValues[source]

A method callable from the outside to get a copy of sensorValues

Returns

A set of current sensorvalues, handeled by the controller.

Return type

SensorValues

get_alarms() → Union[None, Tuple[pvp.alarm.alarm.Alarm]][source]

A method callable from the outside to get a copy of the alarms, that the controller checks: High airway pressure, and technical alarms.

Returns

A tuple of alarms

Return type

typing.Union[None, typing.Tuple[Alarm]]

set_control(control_setting: pvp.common.message.ControlSetting)[source]

A method callable from the outside to set alarms. This updates the entries of COPY with new control values.

Parameters

control_setting (ControlSetting) – [description]

get_control(control_setting_name: pvp.common.values.ValueName)pvp.common.message.ControlSetting[source]

A method callable from the outside to get current control settings. This returns values of COPY to the outside world.

Parameters

control_setting_name (ValueName) – The specific control asked for

Returns

ControlSettings-Object that contains relevant data

Return type

ControlSetting

set_breath_detection(breath_detection: bool)[source]
__get_PID_error(ytarget, yis, dt, RC)

Calculates the three terms for PID control. Also takes a timestep “dt” on which the integral-term is smoothed.

Parameters
  • ytarget (float) – target value of pressure

  • yis (float) – current value of pressure

  • dt (float) – timestep

  • RC (float) – time constant for calculation of integral term.

__calculate_control_signal_in(dt)
Calculates the PID control signal by:
  • Combining the the three gain parameters.

  • And smoothing the control signal with a moving window of three frames (~10ms)

Parameters

dt (float) – timestep

_get_control_signal_in()[source]

Produces the INSPIRATORY control-signal that has been calculated in __calculate_control_signal_in(dt)

Returns

the numerical control signal for the inspiratory prop valve

Return type

float

_get_control_signal_out()[source]

Produces the EXPIRATORY control-signal for the different states, i.e. open/close

Returns

numerical control signal for expiratory side: open (1) close (0)

Return type

float

_control_reset()[source]

Resets the internal controller cycle to zero, i.e. restarts the breath cycle. Used for autonomous breath detection.

__test_for_alarms()
Implements tests that are to be executed in the main control loop:
  • Test for HAPA

  • Test for Technical Alert, making sure sensor values are plausible

  • Test for Technical Alert, make sure continuous in contact

Currently: Alarms are time.time() of first occurance.

__start_new_breathcycle()
Some housekeeping. This has to be executed when the next breath cycles starts:
  • starts new breathcycle

  • initializes newe __cycle_waveform

  • analyzes last breath waveform for PIP, PEEP etc. with __analyze_last_waveform()

  • flushes the logfile

_PID_update(dt)[source]

This instantiates the PID control algorithms. During the breathing cycle, it goes through the four states:

  1. Rise to PIP, speed is controlled by flow (variable: __SET_PIP_GAIN)

  2. Sustain PIP pressure

  3. Quick fall to PEEP

  4. Sustaint PEEP pressure

Once the cycle is complete, it checks the cycle for any alarms, and starts a new one. A record of pressure/volume waveforms is kept and saved

Parameters

dt (float) – timesstep since last update

__save_values()

Helper function to reorganize key parameters in the main PID control loop, into a SensorValues object, that can be stored in the logfile, using a method from the DataLogger.

get_past_waveforms()[source]

Public method to return a list of past waveforms from __cycle_waveform_archive. Note: After calling this function, archive is emptied! The format is

  • Returns a list of [Nx3] waveforms, of [time, pressure, volume]

  • Most recent entry is waveform_list[-1]

Returns

[Nx3] waveforms, of [time, pressure, volume]

Return type

list

_start_mainloop()[source]

Prototype method to start main PID loop. Will depend on simulation or device, specified below.

start()[source]

Method to start _start_mainloop as a thread.

stop()[source]

Method to stop the main loop thread, and close the logfile.

interrupt()[source]

If the controller seems stuck, this generates a new thread, and starts the main loop. No parameters have changed.

is_running()[source]

Public Method to assess whether the main loop thread is running.

Returns

Return true if and only if the main thread of controller is running.

Return type

bool

get_heartbeat()[source]

Returns an independent heart-beat of the controller, i.e. the internal loop counter incremented in _start_mainloop.

Returns

exact value of self._loop_counter

Return type

int

class pvp.controller.control_module.ControlModuleDevice(save_logs=True, flush_every=10, config_file=None)[source]

Bases: pvp.controller.control_module.ControlModuleBase

Uses ControlModuleBase to control the hardware.

Initializes the ControlModule for the physical system. Inherits methods from ControlModuleBase

Parameters
  • save_logs (bool, optional) – Should logs be kept? Defaults to True.

  • flush_every (int, optional) – How often are log-files to be flushed, in units of main-loop-itertions? Defaults to 10.

  • config_file (str, optional) – Path to device config file, e.g. ‘pvp/io/config/dinky-devices.ini’. Defaults to None.

Methods

__init__([save_logs, flush_every, config_file])

Initializes the ControlModule for the physical system.

_get_HAL()

Get sensor values from HAL, decorated with timeout.

_sensor_to_COPY()

Copies the current measurements to`COPY_sensor_values`, so that it can be queried from the outside.

_set_HAL(valve_open_in, valve_open_out)

Set Controls with HAL, decorated with a timeout.

_start_mainloop()

This is the main loop.

set_valves_standby()

This returns valves back to normal setting (in: closed, out: open)

__init__(save_logs=True, flush_every=10, config_file=None)[source]

Initializes the ControlModule for the physical system. Inherits methods from ControlModuleBase

Parameters
  • save_logs (bool, optional) – Should logs be kept? Defaults to True.

  • flush_every (int, optional) – How often are log-files to be flushed, in units of main-loop-itertions? Defaults to 10.

  • config_file (str, optional) – Path to device config file, e.g. ‘pvp/io/config/dinky-devices.ini’. Defaults to None.

_sensor_to_COPY()[source]

Copies the current measurements to`COPY_sensor_values`, so that it can be queried from the outside.

_set_HAL(valve_open_in, valve_open_out)[source]

Set Controls with HAL, decorated with a timeout.

As hardware communication is the speed bottleneck. this code is slightly optimized in so far as only changes are sent to hardware.

Parameters
  • valve_open_in (float) – setting of the inspiratory valve; should be in range [0,100]

  • valve_open_out (float) – setting of the expiratory valve; should be 1/0 (open and close)

_get_HAL()[source]

Get sensor values from HAL, decorated with timeout. As hardware communication is the speed bottleneck. this code is slightly optimized in so far as some sensors are queried only in certain phases of the breatch cycle. This is done to run the primary PID loop as fast as possible:

  • pressure is always queried

  • Flow is queried only outside of inspiration

  • In addition, oxygen is only read every 5 seconds.

set_valves_standby()[source]

This returns valves back to normal setting (in: closed, out: open)

_start_mainloop()[source]

This is the main loop. This method should be run as a thread (see the start() method in ControlModuleBase)

class pvp.controller.control_module.Balloon_Simulator(peep_valve)[source]

Bases: object

Physics simulator for inflating a balloon with an attached PEEP valve. For math, see https://en.wikipedia.org/wiki/Two-balloon_experiment

Methods

OUupdate(variable, dt, mu, sigma, tau)

This is a simple function to produce an OU process on variable.

_reset()

Resets Balloon to default settings.

get_pressure()

get_volume()

set_flow_in(Qin, dt)

set_flow_out(Qout, dt)

update(dt)

get_pressure()[source]
get_volume()[source]
set_flow_in(Qin, dt)[source]
set_flow_out(Qout, dt)[source]
update(dt)[source]
OUupdate(variable, dt, mu, sigma, tau)[source]

This is a simple function to produce an OU process on variable. It is used as model for fluctuations in measurement variables.

Parameters
  • variable (float) – value of a variable at previous time step

  • dt (float) – timestep

  • mu (float)) – mean

  • sigma (float) – noise amplitude

  • tau (float) – time scale

Returns

value of “variable” at next time step

Return type

float

_reset()[source]

Resets Balloon to default settings.

class pvp.controller.control_module.ControlModuleSimulator(simulator_dt=None, peep_valve_setting=5)[source]

Bases: pvp.controller.control_module.ControlModuleBase

Controlling Simulation.

Initializes the ControlModuleBase with the simple simulation (for testing/dev).

Parameters
  • simulator_dt (float, optional) – timestep between updates. Defaults to None.

  • peep_valve_setting (int, optional) – Simulates action of a PEEP valve. Pressure cannot fall below. Defaults to 5.

Methods

__SimulatedPropValve(x)

This simulates the action of a proportional valve.

__SimulatedSolenoid(x)

This simulates the action of a two-state Solenoid valve.

__init__([simulator_dt, peep_valve_setting])

Initializes the ControlModuleBase with the simple simulation (for testing/dev).

_sensor_to_COPY()

Make the sensor value object from current (simulated) measurements

_start_mainloop()

This is the main loop.

__init__(simulator_dt=None, peep_valve_setting=5)[source]

Initializes the ControlModuleBase with the simple simulation (for testing/dev).

Parameters
  • simulator_dt (float, optional) – timestep between updates. Defaults to None.

  • peep_valve_setting (int, optional) – Simulates action of a PEEP valve. Pressure cannot fall below. Defaults to 5.

__SimulatedPropValve(x)

This simulates the action of a proportional valve. Flow-current-curve eye-balled from generic prop vale with logistic activation.

Parameters

x (float) – A control variable [like pulse-width-duty cycle or mA]

Returns

flow through the valve

Return type

float

__SimulatedSolenoid(x)

This simulates the action of a two-state Solenoid valve.

Parameters

x (float) – If x==0: valve closed; x>0: flow set to “1”

Returns

current flow

Return type

float

_sensor_to_COPY()[source]

Make the sensor value object from current (simulated) measurements

_start_mainloop()[source]

This is the main loop. This method should be run as a thread (see the start() method in ControlModuleBase)

pvp.controller.control_module.get_control_module(sim_mode=False, simulator_dt=None)[source]

Generates control module.

Parameters
  • sim_mode (bool, optional) – if true: returns simulation, else returns hardware. Defaults to False.

  • simulator_dt (float, optional) – a timescale for thee simulation. Defaults to None.

Returns

Either configured for simulation, or physical device.

Return type

ControlModule-Object