import operator
import types
import importlib
from pvp.alarm import AlarmType, AlarmSeverity
from pvp.common.message import SensorValues
from pvp.common.values import ValueName
[docs]def get_alarm_manager():
try:
return Alarm_Manager()
except:
manager_module = importlib.import_module('pvp.alarm.alarm_manager')
globals()['Alarm_Manager'] = getattr(manager_module, 'Alarm_Manager')
return Alarm_Manager()
[docs]class Condition(object):
"""
Base class for specifying alarm test conditions
Need to be able to condition alarms based on
* value ranges
* value ranges & durations
* levels of other alarms
Attributes:
manager (:class:`pvp.alarm.alarm_manager.Alarm_Manager`): alarm manager, used to get status of alarms
_child (:class:`Condition`): if another condition is added to this one, store a reference to it
"""
[docs] def __init__(self, depends: dict = None, *args, **kwargs):
"""
Args:
depends (list, dict): a list of, or a single dict::
{'value_name':ValueName,
'value_attr': attr in ControlMessage,
'condition_attr',
optional: transformation: callable)
that declare what values are needed to update
*args:
**kwargs:
"""
self._manager = None
self._child = None
self._check = None
self.depends = depends
@property
def manager(self):
if self._manager is None:
self._manager = get_alarm_manager()
return self._manager
[docs] def check(self, sensor_values):
raise NotImplementedError("Every condition needs to override check!!")
[docs] def reset(self):
"""
If a condition is stateful, need to provide some method of resetting the state
"""
raise NotImplementedError("every condition needs to override reset!")
def __add__(self, other):
"""
Add another :class:`Condition` object to check in series.
Conditions are evaluated left-to-right, and return if any along the sequence is False
Args:
other (:class:`Condition`)
"""
# can't just add any ole apples n oranges
assert(issubclass(type(other), Condition))
if self._child is None:
# if something hasn't been added to us yet...
# claim our child
self._child = other
# override our check method so we check recursively
# make a quick backup first tho yno
self._check = self.check
def new_check(self, sensor_values):
if not self._check(sensor_values):
# if our stashed condition check is false,
# return immediately
return False
else:
# otherwise call check (potentially recursively)
return self._child.check(sensor_values)
# use python types to programmatically reassign method
self.check = types.MethodType(new_check, self)
else:
# if we have already had something added to us,
# add it to our child instead, (also potentially recursively)
self._child = self._child + other
return self
[docs]class ValueCondition(Condition):
"""
value is greater or lesser than some max/min
"""
[docs] def __init__(self,
value_name: ValueName,
limit: (int, float),
mode: str,
*args, **kwargs):
"""
Args:
value_name (ValueName): Which value to check
limit (int, float): value to check against
mode ('min', 'max'): whether the limit is a minimum or maximum
*args:
**kwargs:
"""
super(ValueCondition, self).__init__(*args, **kwargs)
# self.arguments = [value_name]
self.value_name = value_name
self.limit = limit
self._mode = None
self.operator = None
self.mode = mode
@property
def mode(self):
return self._mode
@mode.setter
def mode(self, mode):
assert(mode in ('min', 'max'))
if mode == 'min':
# if we're a minimum, True (raise alarm) if value is less than limit
self.operator = operator.lt
elif mode == 'max':
self.operator = operator.gt
else:
raise ValueError('needs to be max or min')
self._mode = mode
[docs] def check(self, sensor_values):
assert(isinstance(sensor_values, SensorValues))
return self.operator(sensor_values[self.value_name], self.limit)
[docs] def reset(self):
"""
not stateful, do nothing.
"""
pass
[docs]class CycleValueCondition(ValueCondition):
"""
value goes out of range for a specific number of breath cycles
Attributes:
_start_cycle (int): The breath cycle where the
_mid_check (bool): whether a value has left the acceptable range and we are counting consecutive breath cycles
"""
def __init__(self, n_cycles, *args, **kwargs):
super(CycleValueCondition, self).__init__(*args, **kwargs)
self._n_cycles = None
self.n_cycles = n_cycles
self._start_cycle = 0
self._mid_check = False
@property
def n_cycles(self):
return self._n_cycles
@n_cycles.setter
def n_cycles(self, n_cycles):
if not isinstance(n_cycles, int):
n_cycles = int(round(n_cycles))
assert(n_cycles>0)
self._n_cycles = n_cycles
[docs] def check(self, sensor_values):
# first check if we are outside of the range
if super(CycleValueCondition, self).check(sensor_values):
breath_cycle = sensor_values.breath_count
# if we're currently in a consecutive set of out-of-range alarms..
# note: doing it this way because we *dont* want to alarm if there are
# in-range values seen in the waiting period, but we *do* want to
# alarm if we miss a value from a breath cycle but haven't seen any
# in-range values.
if self._mid_check:
# if we have progressed the required number of cycles...
if breath_cycle >= self._start_cycle + self.n_cycles:
return True
else:
return False
else:
# otherwise, this is the first time we've gone out of bounds
self._mid_check = True
self._start_cycle = breath_cycle
# don't check yet, n_cycles must > 0
return False
else:
# if we're not outside the range, false.
# reset the flag that says we're inside a check
self._mid_check = False
return False
[docs] def reset(self):
self._mid_check = False
self._start_cycle = 0
[docs]class TimeValueCondition(ValueCondition):
"""
value goes out of range for specific amount of time
"""
[docs] def __init__(self, time, *args, **kwargs):
"""
Args:
time (float): number of seconds value must be out of range
*args:
**kwargs:
"""
super(TimeValueCondition, self).__init__(*args, **kwargs)
self.time = time
[docs] def check(self, sensor_values):
pass
[docs]class AlarmSeverityCondition(Condition):
[docs] def __init__(self,
alarm_type: AlarmType,
severity: AlarmSeverity,
mode: str = 'min',
*args, **kwargs):
"""
Args:
alarm_type:
severity:
mode (str): one of 'min', 'equals', or 'max'.
'min' returns true if the alarm is at least this value
(note the difference from ValueCondition which returns true if the alarm is less than..)
and vice versa for 'max'.
.. note::
'min' and 'max' use >= and <= rather than > and <
*args:
**kwargs:
"""
super(AlarmSeverityCondition, self).__init__(*args, **kwargs)
self.alarm_type = alarm_type
self.severity = severity
self._mode = None
self.operator = None
self.mode = mode
@property
def mode(self):
return self._mode
@mode.setter
def mode(self, mode):
assert(mode in ('min', 'eq', 'max'))
if mode == 'min':
# if we're a minimum, True (raise alarm) if value is less than limit
self.operator = operator.ge
elif mode == 'eq':
self.operator = operator.eq
elif mode == 'max':
self.operator = operator.le
else:
raise ValueError(f'needs to be max or min, got {mode}')
self._mode = mode
[docs] def check(self, sensor_values):
alarm_severity = self.manager.get_alarm_severity(self.alarm_type)
return self.operator(alarm_severity, self.severity)
[docs]class CycleAlarmSeverityCondition(AlarmSeverityCondition):
"""
alarm goes out of range for a specific number of breath cycles
.. todo::
note that this is exactly the same as CycleValueCondition. Need to do the multiple inheritance thing
Attributes:
_start_cycle (int): The breath cycle where the
_mid_check (bool): whether a value has left the acceptable range and we are counting consecutive breath cycles
"""
def __init__(self, n_cycles, *args, **kwargs):
super(CycleAlarmSeverityCondition, self).__init__(*args, **kwargs)
self._n_cycles = None
self.n_cycles = n_cycles
self._start_cycle = 0
self._mid_check = False
@property
def n_cycles(self):
return self._n_cycles
@n_cycles.setter
def n_cycles(self, n_cycles):
if not isinstance(n_cycles, int):
n_cycles = int(round(n_cycles))
assert(n_cycles>0)
self._n_cycles = n_cycles
[docs] def check(self, sensor_values):
# first check if we are outside of the range
if super(CycleAlarmSeverityCondition, self).check(sensor_values):
breath_cycle = sensor_values.breath_count
# if we're currently in a consecutive set of out-of-range alarms..
# note: doing it this way because we *dont* want to alarm if there are
# in-range values seen in the waiting period, but we *do* want to
# alarm if we miss a value from a breath cycle but haven't seen any
# in-range values.
if self._mid_check:
# if we have progressed the required number of cycles...
if breath_cycle >= self._start_cycle + self.n_cycles:
return True
else:
return False
else:
# otherwise, this is the first time we've gone out of bounds
self._mid_check = True
self._start_cycle = breath_cycle
# don't check yet, n_cycles must > 0
return False
else:
# if we're not outside the range, false.
# reset the flag that says we're inside a check
self._mid_check = False
return False
[docs] def reset(self):
self._mid_check = False
self._start_cycle = 0