import pytest
import pdb
import time
import copy
import numpy as np
from pvp.alarm import condition, ALARM_RULES, AlarmType, AlarmSeverity, Alarm, Alarm_Manager
from pvp.alarm.rule import Alarm_Rule
from pvp.common.values import ValueName, SENSOR
from pvp.common.message import SensorValues, ControlSetting
##########
# conditions
n_samples = 100
[docs]@pytest.fixture
def fake_sensors():
def _fake_sensor(arg=None):
# make an empty SensorValues
vals = {k:0 for k in ValueName}
vals.update({k:0 for k in SensorValues.additional_values})
# since 0 is out of range for fio2, manually set it
# FIXME: find values that by definition don't raise any of the default rules
vals[ValueName.FIO2] = 80
# update with any in kwargs
if arg:
for k, v in arg.items():
vals[k] = v
sensors = SensorValues(vals=vals)
return sensors
return _fake_sensor
[docs]@pytest.fixture
def fake_rule():
def _fake_rule(alarm_type = AlarmType.HIGH_PRESSURE,
latch = False,
conditions = None,
mode="max"):
if not conditions:
conditions = (
(
AlarmSeverity.LOW,
condition.ValueCondition(
value_name=ValueName.PRESSURE,
limit=1,
mode=mode,
depends={
'value_name': ValueName.PIP,
'value_attr': 'value',
'condition_attr': 'limit',
'transform': lambda x: x + 1
}
)
),
(
AlarmSeverity.MEDIUM,
condition.ValueCondition(
value_name=ValueName.PRESSURE,
limit=2,
mode=mode,
depends={
'value_name': ValueName.PIP,
'value_attr': 'value',
'condition_attr': 'limit',
'transform': lambda x: x + 2
}
)
),
(
AlarmSeverity.HIGH,
condition.ValueCondition(
value_name=ValueName.PRESSURE,
limit=3,
mode=mode,
depends={
'value_name': ValueName.PIP,
'value_attr': 'value',
'condition_attr': 'limit',
'transform': lambda x: x + 3
}
)
)
)
rule = Alarm_Rule(
name=alarm_type,
latch=latch,
conditions=conditions
)
return rule
return _fake_rule
#############################
# conditions
[docs]@pytest.mark.parametrize("test_value", [k for k in SENSOR.keys()])
def test_value_condition(fake_sensors, test_value):
for i in range(n_samples):
# test min
test_val = (np.random.rand()-0.5)*100
min_cond = condition.ValueCondition(test_value, test_val, 'min')
min_no_alarms = fake_sensors({test_value: test_val+1})
min_yes_alarms = fake_sensors({test_value: test_val - 1})
assert min_cond.check(min_no_alarms) == False
assert min_cond.check(min_yes_alarms) == True
# test max
max_cond = condition.ValueCondition(test_value, test_val, 'max')
max_no_alarms = fake_sensors({test_value: test_val - 1})
max_yes_alarms = fake_sensors({test_value: test_val + 1})
assert max_cond.check(max_no_alarms) == False
assert max_cond.check(max_yes_alarms) == True
# test that the @mode.setter works for already created objects
min_cond.mode = 'max'
max_cond.mode = 'min'
assert min_cond.check(max_no_alarms) == False
assert min_cond.check(max_yes_alarms) == True
assert max_cond.check(min_no_alarms) == False
assert max_cond.check(min_yes_alarms) == True
# test that other values don't do anything
other_cond = condition.ValueCondition(test_value, 1, 'max')
for value in ValueName:
if value == test_value:
other_sensor = fake_sensors({value: 2})
else:
other_sensor = fake_sensors({value: 2, test_value: 0})
assert other_sensor[value] == 2
if value == test_value:
assert other_cond.check(other_sensor) == True
else:
assert other_cond.check(other_sensor) == False
[docs]@pytest.mark.parametrize("test_value", [k for k in SENSOR.keys()])
def test_cyclevalue_condition(fake_sensors, test_value):
for i in range(n_samples):
n_cycles = np.random.randint(1, 100)
cond = condition.CycleValueCondition(
value_name=test_value,
limit=1,
mode='max',
n_cycles=n_cycles
)
sensors = fake_sensors()
# test that just checking alone without incrementing cycle doesn't trigger
sensors[test_value] = 2
for j in range(n_cycles * 2):
assert cond.check(sensors) == False
# test the straightforward case, goes out of range and stays out of range
for j in range(n_cycles * 2):
sensors.breath_count = j
if j < n_cycles:
assert cond.check(sensors) == False
else:
assert cond.check(sensors) == True
# test that going under the limit resets the cycle count check
sensors[test_value] = 0
sensors.breath_count += 1
assert cond.check(sensors) == False
sensors[test_value] = 2
sensors.breath_count += 1
assert cond.check(sensors) == False
# test discontinuous breath cycles
# test that discontinuous checks without conflicting info still trigger
cond = condition.CycleValueCondition(
value_name=test_value,
limit=1,
mode='max',
n_cycles=10
)
sensors = fake_sensors()
sensors[test_value] = 2
assert cond.check(sensors) == False
sensors.breath_count = 11
assert cond.check(sensors) == True
# en passant check resets
cond.reset()
assert cond.check(sensors) == False
# but that discontinuous checks with conflicting info dont trigger
cond = condition.CycleValueCondition(
value_name=test_value,
limit=1,
mode='max',
n_cycles=10
)
sensors = fake_sensors()
sensors[test_value] = 2
assert cond.check(sensors) == False
sensors.breath_count = 5
sensors[test_value] = 0
assert cond.check(sensors) == False
sensors.breath_count = 11
sensors[test_value] = 2
assert cond.check(sensors) == False
[docs]def test_alarmseverity_condition(fake_rule, fake_sensors):
manager = Alarm_Manager()
manager.reset()
manager.rules = {}
manager.dependencies = {}
rule = fake_rule()
manager.load_rule(rule)
cond_1 = condition.AlarmSeverityCondition(
AlarmType.HIGH_PRESSURE, AlarmSeverity.LOW, mode='min')
cond_2 = condition.AlarmSeverityCondition(
AlarmType.HIGH_PRESSURE, AlarmSeverity.MEDIUM, mode='max')
cond_3 = condition.AlarmSeverityCondition(
AlarmType.HIGH_PRESSURE, AlarmSeverity.HIGH, mode='eq')
assert cond_1.check() == False
assert cond_2.check() == True
assert cond_3.check() == False
# raise low alarm
sensor = fake_sensors({ValueName.PRESSURE: 1.5})
manager.update(sensor)
assert cond_1.check() == True
assert cond_2.check() == True
assert cond_3.check() == False
# raise med alarm
sensor = fake_sensors({ValueName.PRESSURE: 2.5})
manager.update(sensor)
assert cond_1.check() == True
assert cond_2.check() == True
assert cond_3.check() == False
# raise high alarm
sensor = fake_sensors({ValueName.PRESSURE: 3.5})
manager.update(sensor)
assert cond_1.check() == True
assert cond_2.check() == False
assert cond_3.check() == True
manager.rules = {}
manager.load_rules()
[docs]def test_cyclealarmseverity_condition():
# FIXME
pass
[docs]def test_condition_addition(fake_sensors):
no_alarm = fake_sensors({ValueName.PIP: 0})
yes_alarm = fake_sensors({ValueName.PIP: 5})
cond_1 = condition.ValueCondition(ValueName.PIP, 3, 'max')
assert cond_1.check(no_alarm) == False
assert cond_1.check(yes_alarm) == True
cond_2 = condition.ValueCondition(ValueName.PEEP, 1, 'min')
no_alarm_2 = fake_sensors({ValueName.PEEP: 2})
yes_alarm_2 = fake_sensors({ValueName.PEEP: 0})
assert cond_2.check(no_alarm_2) == False
assert cond_2.check(yes_alarm_2) == True
cond_3 = cond_1 + cond_2
cond_4 = cond_2 + cond_1
yes_alarm_3_1 = fake_sensors({ValueName.PIP: 4, ValueName.PEEP:0})
no_alarm_3_1 = fake_sensors({ValueName.PIP: 2, ValueName.PEEP:2})
no_alarm_3_2 = fake_sensors({ValueName.PIP: 4, ValueName.PEEP:2})
assert cond_3.check(yes_alarm_3_1) == True
assert cond_3.check(no_alarm_3_2) == False
assert cond_3.check(no_alarm_3_1) == False
assert cond_4.check(yes_alarm_3_1) == True
assert cond_4.check(no_alarm_3_2) == False
assert cond_4.check(no_alarm_3_1) == False
# test another level of addition
cond_5 = cond_3 + condition.ValueCondition(ValueName.FIO2, 5, 'min')
yes_alarm_4_1 = fake_sensors({ValueName.PIP: 4, ValueName.PEEP: 0, ValueName.FIO2 : 0})
no_alarm_4_2 = fake_sensors({ValueName.PIP: 2, ValueName.PEEP: 0, ValueName.FIO2 : 0})
no_alarm_4_3 = fake_sensors({ValueName.PIP: 4, ValueName.PEEP: 6, ValueName.FIO2 : 0})
no_alarm_4_1 = fake_sensors({ValueName.PIP: 4, ValueName.PEEP: 0, ValueName.FIO2 : 6})
assert cond_5.check(yes_alarm_4_1) == True
assert cond_5.check(no_alarm_4_2) == False
assert cond_5.check(no_alarm_4_3) == False
assert cond_5.check(no_alarm_4_1) == False
[docs]@pytest.mark.parametrize('test_mode', ['max', 'min'])
def test_condition_dependency(fake_rule, test_mode):
rule = fake_rule(mode=test_mode)
manager = Alarm_Manager()
manager.reset()
manager.rules = {}
manager.dependencies = {}
manager.depends_callbacks = []
manager.load_rule(rule)
# create callback to catch limit changes
global limits_changed
limits_changed = []
def depends_cb(control_message):
global limits_changed
limits_changed.append(control_message)
manager.add_dependency_callback(depends_cb)
assert manager.rules[rule.name].conditions[0][1].limit == 1
assert manager.rules[rule.name].conditions[1][1].limit == 2
assert manager.rules[rule.name].conditions[2][1].limit == 3
# test with blank ControlSetting
blank_message = ControlSetting(ValueName.PIP, min_value=1)
manager.update_dependencies(blank_message)
assert len(limits_changed) == 0
control_message = ControlSetting(ValueName.PIP, value=5)
manager.update_dependencies(control_message)
assert manager.rules[rule.name].conditions[0][1].limit == 6
assert manager.rules[rule.name].conditions[1][1].limit == 7
assert manager.rules[rule.name].conditions[2][1].limit == 8
assert len(limits_changed) == 3
if test_mode == "max":
assert limits_changed[0].max_value == 6
assert limits_changed[1].max_value == 7
assert limits_changed[2].max_value == 8
elif test_mode == "min":
assert limits_changed[0].min_value == 6
assert limits_changed[1].min_value == 7
assert limits_changed[2].min_value == 8
manager.rules = {}
manager.dependencies = {}
manager.depends_callbacks = []
manager.load_rules()
###############################
# rules
#@pytest.mark.parametrize("alarm_rule", [k for k in ALARM_RULES.values()])
[docs]def test_alarm_rule(fake_sensors):
"""
test the alarm rule class itself
assume the individual conditions have been tested
"""
rule = Alarm_Rule(
name=AlarmType.HIGH_PRESSURE,
latch=False,
conditions=(
(
AlarmSeverity.LOW,
condition.ValueCondition(
value_name=ValueName.PRESSURE,
limit=1,
mode='max',
depends={
'value_name': ValueName.PIP,
'value_attr': 'value',
'condition_attr': 'limit',
'transform': lambda x: x + (x * 0.05)
}
)
),
(
AlarmSeverity.MEDIUM,
condition.ValueCondition(
value_name=ValueName.PRESSURE,
limit=2,
mode='max',
depends={
'value_name': ValueName.PIP,
'value_attr': 'value',
'condition_attr': 'limit',
'transform': lambda x: x + (x * 0.10)
}
) + \
condition.CycleAlarmSeverityCondition(
alarm_type = AlarmType.HIGH_PRESSURE,
severity = AlarmSeverity.LOW,
n_cycles = 2
)
),
(
AlarmSeverity.HIGH,
condition.ValueCondition(
value_name=ValueName.PRESSURE,
limit=3,
mode='max',
depends={
'value_name': ValueName.PIP,
'value_attr': 'value',
'condition_attr': 'limit',
'transform': lambda x: x + (x * 0.15)
},
) + \
condition.CycleAlarmSeverityCondition(
alarm_type = AlarmType.HIGH_PRESSURE,
severity = AlarmSeverity.MEDIUM,
n_cycles = 2
)
),
)
)
# check that depends property works
assert rule.depends == [ValueName.PIP]
sensors = fake_sensors()
# test that initial check is off
assert rule.check(sensors) == AlarmSeverity.OFF
assert rule.severity == AlarmSeverity.OFF
# test low severity alarm
sensors.PRESSURE = 1.5
sensors.breath_count += 1
assert rule.check(sensors) == AlarmSeverity.LOW
assert rule.severity == AlarmSeverity.LOW
# register alarm manually
# (alarm should call register_alarm)
low_alarm = Alarm(
AlarmType.HIGH_PRESSURE,
AlarmSeverity.LOW,
latch=False
)
Alarm_Manager().register_alarm(low_alarm)
# test that we don't jump to medium
sensors.PRESSURE = 2.5
sensors.breath_count += 1
assert rule.check(sensors) == AlarmSeverity.LOW
assert rule.severity == AlarmSeverity.LOW
# do another check to make sure we dont increment just by calling again
assert rule.check(sensors) == AlarmSeverity.LOW
assert rule.severity == AlarmSeverity.LOW
# now check that we go to medium
sensors.breath_count += 2
assert rule.check(sensors) == AlarmSeverity.MEDIUM
assert rule.severity == AlarmSeverity.MEDIUM
med_alarm = Alarm(
AlarmType.HIGH_PRESSURE,
AlarmSeverity.MEDIUM,
latch=False
)
Alarm_Manager().register_alarm(med_alarm)
# keep at medium
sensors.PRESSURE = 3.5
sensors.breath_count += 1
assert rule.check(sensors) == AlarmSeverity.MEDIUM
assert rule.severity == AlarmSeverity.MEDIUM
sensors.breath_count += 2
assert rule.check(sensors) == AlarmSeverity.HIGH
assert rule.severity == AlarmSeverity.HIGH
# test resets
rule.reset()
assert rule.check(sensors) == AlarmSeverity.LOW
assert rule.severity == AlarmSeverity.LOW
# test alarm deactivate while we're at it
Alarm_Manager().deactivate_alarm(med_alarm)
assert med_alarm not in Alarm_Manager().active_alarms.values()
[docs]def test_rules_individual():
# test that each individual rule does what we think it does
# FIXME
pass
##############################
#
[docs]def test_alarm_manager_raise(fake_sensors):
"""
Test that the alarm manager raises a single alarm
Args:
fake_sensors:
Returns:
"""
manager = Alarm_Manager()
manager.reset()
assert len(manager.active_alarms) == 0
# check that we got all the alarm rules
for alarm_type, rule in ALARM_RULES.items():
assert alarm_type in manager.rules.keys()
# make callback to count emitted alarms
global alarms_emitted
alarms_emitted = 0
def alarm_cb(alarm):
assert isinstance(alarm, Alarm)
global alarms_emitted
alarms_emitted += 1
manager.add_callback(alarm_cb)
# take a value out of range and test that an alarm is raised an emitted
sensor = fake_sensors()
manager.update(sensor)
assert len(manager.active_alarms) == 0
assert alarms_emitted == 0
sensor.PRESSURE = ALARM_RULES[AlarmType.HIGH_PRESSURE].conditions[0][1].limit + 1
manager.update(sensor)
assert AlarmType.HIGH_PRESSURE in manager.active_alarms.keys()
assert alarms_emitted == 1
[docs]def test_alarm_manager_escalation(fake_sensors, fake_rule):
"""
For unlatched alarms, test that alarms are:
* emitted when alarm severity is raised
* decremented when alarm severity is decreased and alarm is not latched
* deactivated and deleted
"""
manager = Alarm_Manager()
manager.reset()
assert len(manager.active_alarms) == 0
#
# create alarm rule for testing
rule = fake_rule(latch=False)
manager.load_rule(rule)
assert manager.rules[rule.name] is rule
# callback to catch emitted alarms
global caught_alarm
caught_alarm = None
def catch_alarm(alarm):
global caught_alarm
caught_alarm = alarm
manager.add_callback(catch_alarm)
# raise and see that we get a new alarm
sensors = fake_sensors()
sensors.PRESSURE = 1.5
manager.update(sensors)
assert isinstance(caught_alarm, Alarm)
assert caught_alarm.severity == AlarmSeverity.LOW
assert manager.active_alarms[rule.name] is caught_alarm
# raise to medium
sensors.PRESSURE = 2.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.MEDIUM
assert manager.active_alarms[rule.name] is caught_alarm
# then high
sensors.PRESSURE = 3.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.HIGH
assert manager.active_alarms[rule.name] is caught_alarm
# now decrement
sensors.PRESSURE = 2.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.MEDIUM
assert manager.active_alarms[rule.name] is caught_alarm
sensors.PRESSURE = 1.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.LOW
assert manager.active_alarms[rule.name] is caught_alarm
# when alarm is brought back into safe range, should be deleted from active_alarms
sensors.PRESSURE = 0.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.OFF
assert rule.name not in manager.active_alarms.keys()
assert len(manager.active_alarms) == 0
[docs]def test_alarm_manager_latch(fake_sensors, fake_rule):
"""
* not decremented when alarm severity is decreased and alarm is latched
* dismissed when latched *only when* alarm condition is OFF AND has been dismissed
Args:
fake_sensors:
fake_rule:
Returns:
"""
manager = Alarm_Manager()
manager.reset()
assert len(manager.active_alarms) == 0
# create alarm rule for testing
rule = fake_rule(latch=True)
manager.load_rule(rule)
assert manager.rules[rule.name] is rule
# callback to catch emitted alarms
global caught_alarm
global n_alarms
n_alarms = 0
caught_alarm = None
def catch_alarm(alarm):
global caught_alarm
global n_alarms
caught_alarm = alarm
n_alarms += 1
manager.add_callback(catch_alarm)
# raise and see that we get a new alarm
sensors = fake_sensors()
# raise alarm to HIGH
sensors.PRESSURE = 3.5
manager.update(sensors)
assert isinstance(caught_alarm, Alarm)
assert caught_alarm.severity == AlarmSeverity.HIGH
assert manager.active_alarms[rule.name] is caught_alarm
assert n_alarms == 1
# raise alarm range to OFF, but don't request dismiss
sensors.PRESSURE = 0.5
manager.update(sensors)
# no alarm should have been emitted
assert caught_alarm.severity == AlarmSeverity.HIGH
assert manager.active_alarms[rule.name] is caught_alarm
assert n_alarms == 1
# raise back to HIGH
sensors.PRESSURE = 3.5
manager.update(sensors)
# dismiss but only drop to MEDIUM, so alarm should not be emitted
manager.dismiss_alarm(rule.name)
assert rule.name in manager.pending_clears
sensors.PRESSURE = 2.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.HIGH
assert manager.active_alarms[rule.name] is caught_alarm
assert n_alarms == 1
# dropping to OFF should finally clear the alarm
sensors.PRESSURE = 0.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.OFF
assert rule.name not in manager.active_alarms.keys()
assert len(manager.active_alarms) == 0
assert len(manager.pending_clears) == 0
assert n_alarms == 2
[docs]def test_alarm_manager_dismiss(fake_sensors, fake_rule):
"""
* dismissed when not latched until alarm conditions return to off then back on
* dismissed when not latched until duration regardless of alarm condition
Args:
fake_sensors:
fake_rule:
Returns:
"""
manager = Alarm_Manager()
manager.reset()
assert len(manager.active_alarms) == 0
# create alarm rule for testing
rule = fake_rule(latch=False)
manager.load_rule(rule)
assert manager.rules[rule.name] is rule
# callback to catch emitted alarms
global caught_alarm
global n_alarms
n_alarms = 0
caught_alarm = None
def catch_alarm(alarm):
global caught_alarm
global n_alarms
caught_alarm = alarm
n_alarms += 1
manager.add_callback(catch_alarm)
# raise and see that we get a new alarm
sensors = fake_sensors()
sensors.PRESSURE = 3.5
manager.update(sensors)
assert isinstance(caught_alarm, Alarm)
assert caught_alarm.severity == AlarmSeverity.HIGH
assert n_alarms == 1
# dismiss the alarm, make sure it's gone
manager.dismiss_alarm(rule.name)
assert rule.name in manager.cleared_alarms
assert caught_alarm.severity == AlarmSeverity.OFF
assert n_alarms == 2
# update with same sensor values, make sure alarm isn't emitted
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.OFF
assert n_alarms == 2
# take down to OFF range and back up, should get alarm
sensors.PRESSURE = 0.5
manager.update(sensors)
assert rule.name not in manager.cleared_alarms
assert n_alarms == 2
sensors.PRESSURE = 3.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.HIGH
assert n_alarms == 3
#####################
# test timed dismissal
dismiss_time = 2
manager.dismiss_alarm(rule.name, duration = dismiss_time)
assert rule.name in manager.snoozed_alarms.keys()
assert caught_alarm.severity == AlarmSeverity.OFF
assert n_alarms ==4
# stash snooze time to check we're not going over
snooze_time = manager.snoozed_alarms[rule.name]
# make sure it's not some crazy value
assert snooze_time <= time.time() + dismiss_time
# give flip off and on again, check that alarm is not emitted
# as would be typical
sensors.PRESSURE = 0.5
manager.update(sensors)
sensors.PRESSURE = 3.5
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.OFF
assert n_alarms == 4
assert time.time() < snooze_time
while time.time() < snooze_time:
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.OFF
assert n_alarms == 4
time.sleep(0.1)
# now that time has passed, should get an alarm
assert time.time() > snooze_time
manager.update(sensors)
assert caught_alarm.severity == AlarmSeverity.HIGH
assert n_alarms == 5
[docs]def test_alarm_manager_logged_alarms(fake_sensors, fake_rule):
manager = Alarm_Manager()
manager.reset()
assert len(manager.active_alarms) == 0
# create alarm rule for testing
rule = fake_rule(latch=False)
manager.load_rule(rule)
assert manager.rules[rule.name] is rule
# callback to catch emitted alarms
global caught_alarm
global n_alarms
n_alarms = 0
caught_alarm = None
def catch_alarm(alarm):
global caught_alarm
global n_alarms
caught_alarm = alarm
n_alarms += 1
manager.add_callback(catch_alarm)
# raise and see that we get a new alarm
sensors = fake_sensors()
# raise alarm, catch it, turn alarm off, check that it's put in logged_alarms
sensors.PRESSURE = 3.5
manager.update(sensors)
assert isinstance(caught_alarm, Alarm)
assert caught_alarm.severity == AlarmSeverity.HIGH
high_alarm = caught_alarm
sensors.PRESSURE = 0.5
manager.update(sensors)
assert high_alarm in manager.logged_alarms
assert high_alarm not in manager.active_alarms.values()