Source code for tests.test_controller

import time
import numpy as np
import pytest
import random

from pvp import prefs
prefs.init()

from pvp.common import values
from pvp.common.message import ControlSetting
from pvp.alarm import Alarm, AlarmType, AlarmSeverity
from pvp.common.values import ValueName
from pvp.controller.control_module import get_control_module



######################################################################
#########################   TEST 1  ##################################
######################################################################
#
#   Make sure the controller remembers settings, and can be started
#   and stopped repeatedly a couple of times, and performs resets as intended
#

[docs]@pytest.mark.parametrize("control_setting_name", values.CONTROL.keys()) def test_control_settings(control_setting_name): ''' This test set_controls and get_controls Set and read a all five variables, make sure that they are identical. ''' Controller = get_control_module(sim_mode=True) Controller.start() time.sleep(0.1) Controller.start() time.sleep(0.1) Controller.stop() time.sleep(0.1) Controller.stop() v = random.randint(10, 100) v_min = random.randint(10, 100) v_max = random.randint(10, 100) t = time.time() c_set = ControlSetting(name=control_setting_name, value=np.random.random(), min_value = np.random.random(), max_value = np.random.random()) # c_set = ControlSetting(name=control_setting_name, value=v) Controller.set_control(c_set) c_get = Controller.get_control(control_setting_name) print(str(c_set.name)) if not str(control_setting_name) == 'ValueName.IE_RATIO': assert c_get.name == c_set.name assert c_get.value == c_set.value assert (c_get.timestamp - c_set.timestamp)**2 < 0.01**2 #both should be executed close to one another
[docs]def test_restart_controller(): ''' This tests whether the controller can be started and stopped 10 times without problems ''' Controller = get_control_module(sim_mode=True) for counter in range(10): time.sleep(0.1) Controller.start() vals_start = Controller.get_sensors() time.sleep(0.3) Controller.stop() vals_stop = Controller.get_sensors() assert vals_stop.loop_counter > vals_start.loop_counter
[docs]def test_reset_controller(): """ Tests the reset functionality """ Controller = get_control_module(sim_mode=True, simulator_dt=1.5) # dt>1 tests the physics_reset Controller.start() time.sleep(1) Controller._control_reset() assert np.abs(Controller._cycle_start - time.time()) < 0.05 # tests control_reset Controller.stop()
###################################################################### ######################### TEST 2 ################################## ###################################################################### # # Make sure the controller controlls, and the controll values look # good. (i.e. close to target within narrow margins). #
[docs]def test_control_dynamical(): ''' This tests whether the controller is controlling pressure as intended. Start controller, set control values, measure whether actually there. ''' Controller = get_control_module(sim_mode=True, simulator_dt=0.01) Controller._LOOP_UPDATE_TIME = 0.01 Controller.set_breath_detection(False) assert Controller.get_breath_detection() == False Controller.set_breath_detection(True) assert Controller.get_breath_detection() == True vals_start = Controller.get_sensors() v_peep = 5 command = ControlSetting(name=ValueName.PEEP, value=v_peep) Controller.set_control(command) v_pip = random.randint(15, 30) command = ControlSetting(name=ValueName.PIP, value=v_pip) Controller.set_control(command) v_bpm = random.randint(6, 20) command = ControlSetting(name=ValueName.BREATHS_PER_MINUTE, value=v_bpm) Controller.set_control(command) v_iphase = (0.3 + np.random.random()*0.5) * 60/v_bpm command = ControlSetting(name=ValueName.INSPIRATION_TIME_SEC, value=v_iphase) Controller.set_control(command) Controller.start() time.sleep(0.1) vals_start = Controller.get_sensors() temp_vals = Controller.get_sensors() while temp_vals.breath_count < 5: time.sleep(0.1) temp_vals = Controller.get_sensors() assert Controller.is_running() == True Controller.stop() # consecutive stops should be ignored Controller.stop() Controller.stop() vals_stop = Controller.get_sensors() alarms_stop = Controller.get_alarms() # Test whether get_sensors() return the right values COPY_peep = Controller.COPY_sensor_values.PEEP COPY_pip = Controller.COPY_sensor_values.PIP COPY_fio2 = Controller.COPY_sensor_values.FIO2 COPY_pressure = Controller.COPY_sensor_values.PRESSURE COPY_vte = Controller.COPY_sensor_values.VTE COPY_bpm = Controller.COPY_sensor_values.BREATHS_PER_MINUTE COPY_Iinsp = Controller.COPY_sensor_values.INSPIRATION_TIME_SEC COPY_tt = Controller.COPY_sensor_values.timestamp COPY_lc = Controller.COPY_sensor_values.loop_counter assert COPY_peep == vals_stop.PEEP assert COPY_pip == vals_stop.PIP assert COPY_fio2 == vals_stop.FIO2 assert COPY_pressure == vals_stop.PRESSURE assert COPY_vte == vals_stop.VTE assert COPY_bpm == vals_stop.BREATHS_PER_MINUTE assert COPY_Iinsp == vals_stop.INSPIRATION_TIME_SEC assert COPY_tt == vals_stop.timestamp assert COPY_lc == vals_stop.loop_counter print(v_peep) print(v_pip) print(v_bpm) print(v_iphase) assert (vals_stop.loop_counter - vals_start.loop_counter) > 100 # In 20s, this program should go through a good couple of loops assert np.abs(vals_stop.PEEP - v_peep) < 5 # PEEP error correct within 5 cmH2O assert np.abs(vals_stop.PIP - v_pip) < 5 # PIP error correct within 5 cmH2O assert np.abs(vals_stop.BREATHS_PER_MINUTE - v_bpm) < 1 # Breaths per minute correct within 5 bpm assert np.abs(vals_stop.INSPIRATION_TIME_SEC - v_iphase) < 0.2 # Inspiration time, correct within 20% hb1 = Controller.get_heartbeat() assert hb1 > 0 # Test the heartbeat assert np.abs(hb1 - COPY_lc) < Controller._NUMBER_CONTROLL_LOOPS_UNTIL_UPDATE + 2 # true heart-beat should be close to the sensor loop counter archive = Controller.get_past_waveforms() # Test archive assert type(archive) == list for i in range(len(archive)): print(i) assert type(archive[i]) == np.ndarray columns, rows = archive[0].shape assert rows == 3
[docs]def test_missed_heartbeat_alarm(): Controller = get_control_module(sim_mode=True, simulator_dt=0.01) Controller.start() Controller._critical_time = 4.5 # Make sure that we get a warning after 5 seconds t0 = time.time() time.sleep(5) assert np.abs(Controller._time_last_contact - t0 ) < 0.01 assert np.abs(Controller._time_last_contact - time.time() + 5 ) < 0.01 Controller.stop() t1 = time.time() assert np.abs(Controller._time_last_contact - t1) < 0.01 a = Controller.get_alarms()[0][0] assert a.alarm_type == AlarmType.MISSED_HEARTBEAT
###################################################################### ######################### TEST 3 ################################## ###################################################################### # # Make sure that after timeout, software starts with MockHAL to display # technical alert # Make sure the Controller is not broken by random, or stuck HAL values # #
[docs]def test_random_HAL(): """ Simulates a broken HAL, providing (physiologically unreasonable) random numbers to infinity """ Controller = get_control_module(sim_mode=False, simulator_dt=0.01) Controller._LOOP_UPDATE_TIME = 0.01 pressures = [] oxygens = [] flows = [] Controller.start() time.sleep(0.1) command = ControlSetting(name=ValueName.PIP, value=30) # Test of set_control for hal Controller.set_control(command) time.sleep(0.1) temp_vals = Controller.get_sensors() while temp_vals.breath_count < 5: # Random HAL Controller.HAL.pressure = 1000*np.random.random()-50 #gat a nice stream of HAPA alerts Controller.HAL.flow_ex = 100*np.random.random()-50 Controller.HAL.setpoint_in = 100*np.random.random()-50 Controller.HAL.setpoint_ex = 100*np.random.random()-50 Controller.HAL.oxygen = 100*np.random.random()-50 time.sleep(0.1) temp_vals = Controller.get_sensors() pressures = np.append(pressures, temp_vals.PRESSURE) oxygens = np.append(oxygens, temp_vals.FIO2) flows = np.append(flows, temp_vals.FLOWOUT) while temp_vals.breath_count < 10: # Stuck HAL Controller.HAL.pressure = 0 Controller.HAL.flow_ex = 0 Controller.HAL.oxygen = -10 time.sleep(0.1) temp_vals = Controller.get_sensors() time.sleep(0.1) Controller._maxdt = 0 time.sleep(0.5) Controller.stop() assert np.isfinite( np.mean(pressures) ) assert np.isfinite( np.mean(oxygens) ) assert np.isfinite( np.mean(flows) ) for alarms in Controller.get_alarms(): assert type(alarms[0]) == Alarm assert temp_vals.breath_count == 10
[docs]def test_nan_HAL(): """ nan should work to, make PEEP et al. nan """ Controller = get_control_module(sim_mode=True, simulator_dt=0.01) Controller.start() command = ControlSetting(name=ValueName.PIP, value=20) Controller.set_control(command) command = ControlSetting(name=ValueName.PEEP, value=5) Controller.set_control(command) temp_vals = Controller.get_sensors() while temp_vals.breath_count < 2: # NAN HAL time.sleep(0.1) temp_vals = Controller.get_sensors() Controller._DATA_PRESSURE_LIST.append(np.nan) #inject a nan into pressures Controller.stop() Controller.stop() assert np.isnan(Controller._DATA_PEEP) assert np.isnan(Controller._DATA_PIP_PLATEAU) assert np.isnan(Controller._DATA_PIP) assert np.isnan(Controller._DATA_PIP_TIME) assert np.isnan(Controller._DATA_PEEP_TIME) assert np.isnan(Controller._DATA_I_PHASE) # While we're at it: make sure all alarms are returned correctly Controller.HAPA = None Controller.TECHA = [] assert Controller.get_alarms() == None time.sleep(0.1) Controller.HAPA = Alarm(AlarmType.HIGH_PRESSURE, AlarmSeverity.HIGH, time.time(), value=100) Controller.TECHA = [Alarm( AlarmType.SENSORS_STUCK, AlarmSeverity.TECHNICAL, )] assert len(Controller.get_alarms()) == 2 time.sleep(0.1) Controller.HAPA = None Controller.TECHA = [Alarm( AlarmType.SENSORS_STUCK, AlarmSeverity.TECHNICAL, )] assert len(Controller.get_alarms()) == 1 a = Controller.get_alarms()[0][0] assert(a.alarm_type == AlarmType.SENSORS_STUCK) time.sleep(0.1) Controller.HAPA = Alarm(AlarmType.HIGH_PRESSURE, AlarmSeverity.HIGH, time.time(), value=100) Controller.TECHA = [] assert len(Controller.get_alarms()) == 1 a = Controller.get_alarms()[0] assert(a.alarm_type == AlarmType.HIGH_PRESSURE)
###################################################################### ######################### TEST 4 ################################## ###################################################################### # # More involved test, randomized waittimes and make sure the system works #
[docs]def test_erratic_dt(): ''' This is a function to test whether the controller works with random update times ''' Controller = get_control_module(sim_mode=True) command = ControlSetting(name=ValueName.PEEP, value=5) Controller.set_control(command) command = ControlSetting(name=ValueName.PIP, value=20) Controller.set_control(command) command = ControlSetting(name=ValueName.PIP_TIME, value=1) Controller.set_control(command) Controller.start() ls = [] test_loops = 500 for t in range(test_loops): Controller._LOOP_UPDATE_TIME = np.random.randint(100)/1000 # updates anywhere between 0ms and 500ms time.sleep(0.05) vals = Controller.get_sensors() ls.append(vals) Controller.stop() cc = Controller.get_control(control_setting_name = ValueName.PEEP) target_peep = cc.value cc = Controller.get_control(control_setting_name = ValueName.PIP) target_pip = cc.value peeps = np.unique([np.abs(s.PEEP - target_peep) for s in ls if s.PEEP is not None]) pips = np.unique([np.abs(s.PIP - target_pip) for s in ls if s.PIP is not None]) print(target_peep) print(target_pip) assert np.mean(peeps) < 8 assert np.mean(pips) < 8