import operator
import types
import importlib
import copy
from pvp.alarm import AlarmType, AlarmSeverity
from pvp.common.message import SensorValues
from pvp.common.values import ValueName
[docs]def get_alarm_manager():
try:
return Alarm_Manager()
except:
manager_module = importlib.import_module('pvp.alarm.alarm_manager')
globals()['Alarm_Manager'] = getattr(manager_module, 'Alarm_Manager')
return Alarm_Manager()
[docs]class Condition(object):
"""
Base class for specifying alarm test conditions
Subclasses must define :meth:`.Condition.check` and :meth:`.Conditino.reset`
Condition objects can be added together to create compound conditions.
Attributes:
_child (:class:`Condition`): if another condition is added to this one, store a reference to it
"""
[docs] def __init__(self, depends: dict = None, *args, **kwargs):
"""
Args:
depends (list, dict): a list of, or a single dict::
{'value_name':ValueName,
'value_attr': attr in ControlMessage,
'condition_attr',
optional: transformation: callable)
that declare what values are needed to update
*args:
**kwargs:
"""
self._manager = None
self._child = None
self._check = None
self.depends = depends
@property
def manager(self):
"""
The active alarm manager, used to get status of alarms
Returns:
:class:`pvp.alarm.alarm_manager.Alarm_Manager`
"""
if self._manager is None:
self._manager = get_alarm_manager()
return self._manager
[docs] def check(self, sensor_values) -> bool:
"""
Every Condition subclass needs to define this method that accepts :class:`.SensorValues` and returns a boolean
Args:
sensor_values ( :class:`.SensorValues` ): SensorValues used to compute alarm status
Returns:
bool
"""
raise NotImplementedError("Every condition needs to override check!!")
[docs] def reset(self):
"""
If a condition is stateful, need to provide some method of resetting the state
"""
raise NotImplementedError("every condition needs to override reset!")
def __add__(self, other: 'Condition'):
"""
Add another :class:`Condition` object to check in series.
Conditions are evaluated left-to-right, and return if any along the sequence is False
Args:
other (:class:`Condition`)
"""
# can't just add any ole apples n oranges
assert(issubclass(type(other), Condition))
_self = copy.deepcopy(self)
if _self._child is None:
# if something hasn't been added to us yet...
# claim our child
_self._child = other
# override our check method so we check recursively
# make a quick backup first tho yno
_self._check = _self.check
_self._reset = _self.reset
def new_check(self, sensor_values):
if not self._check(sensor_values):
# if our stashed condition check is false,
# return immediately
return False
else:
# otherwise call check (potentially recursively)
return self._child.check(sensor_values)
# use python types to programmatically reassign method
_self.check = types.MethodType(new_check, _self)
def new_reset(self):
self._reset()
self._child.reset()
_self.reset = types.MethodType(new_reset, _self)
else:
# if we have already had something added to us,
# add it to our child instead, (also potentially recursively)
_self._child = _self._child + other
return _self
[docs]class ValueCondition(Condition):
"""
Value is greater or lesser than some max/min
"""
[docs] def __init__(self,
value_name: ValueName,
limit: (int, float),
mode: str,
*args, **kwargs):
"""
Args:
value_name (ValueName): Which value to check
limit (int, float): value to check against
mode ('min', 'max'): whether the limit is a minimum or maximum
*args:
**kwargs:
Attributes:
operator (callable): Either the less than or greater than operators, depending on whether mode is ``'min'`` or ``'max'``
"""
super(ValueCondition, self).__init__(*args, **kwargs)
# self.arguments = [value_name]
self.value_name = value_name
self.limit = limit
self._mode = None
self.operator = None
self.mode = mode
@property
def mode(self):
"""
One of 'min' or 'max', defines how the incoming sensor values are compared to the set value
Returns:
"""
return self._mode
@mode.setter
def mode(self, mode):
assert(mode in ('min', 'max'))
if mode == 'min':
# if we're a minimum, True (raise alarm) if value is less than limit
self.operator = operator.lt
elif mode == 'max':
self.operator = operator.gt
else: # pragma: no cover
raise ValueError('needs to be max or min')
self._mode = mode
[docs] def check(self, sensor_values):
"""
Check that the relevant value in SensorValues is either greater or lesser than the limit
Args:
sensor_values ( :class:`.SensorValues` ):
Returns:
bool
"""
assert(isinstance(sensor_values, SensorValues))
return self.operator(sensor_values[self.value_name], self.limit)
[docs] def reset(self): # pragma: no cover
"""
not stateful, do nothing.
"""
pass
[docs]class CycleValueCondition(ValueCondition):
"""
Value goes out of range for a specific number of breath cycles
Args:
n_cycles (int): number of cycles required
Attributes:
_start_cycle (int): The breath cycle where the
_mid_check (bool): whether a value has left the acceptable range and we are counting consecutive breath cycles
"""
def __init__(self, n_cycles: int, *args, **kwargs):
super(CycleValueCondition, self).__init__(*args, **kwargs)
self._n_cycles = None
self.n_cycles = n_cycles
self._start_cycle = 0
self._mid_check = False
@property
def n_cycles(self) -> int:
"""Number of cycles required"""
return self._n_cycles
@n_cycles.setter
def n_cycles(self, n_cycles: int):
if not isinstance(n_cycles, int): # pragma: no cover
n_cycles = int(round(n_cycles))
assert(n_cycles>0)
self._n_cycles = n_cycles
[docs] def check(self, sensor_values) -> bool:
"""
Check if outside of range, and then check if number of breath cycles have elapsed
Args:
sensor_values ():
Returns:
bool
"""
# first check if we are outside of the range
if super(CycleValueCondition, self).check(sensor_values):
breath_cycle = sensor_values.breath_count
# if we're currently in a consecutive set of out-of-range alarms..
# note: doing it this way because we *dont* want to alarm if there are
# in-range values seen in the waiting period, but we *do* want to
# alarm if we miss a value from a breath cycle but haven't seen any
# in-range values.
if self._mid_check:
# if we have progressed the required number of cycles...
if breath_cycle >= self._start_cycle + self.n_cycles:
return True
else:
return False
else:
# otherwise, this is the first time we've gone out of bounds
self._mid_check = True
self._start_cycle = breath_cycle
# don't check yet, n_cycles must > 0
return False
else:
# if we're not outside the range, false.
# reset the flag that says we're inside a check
self._mid_check = False
return False
[docs] def reset(self):
"""
Reset check status and start cycle
"""
self._mid_check = False
self._start_cycle = 0
[docs]class TimeValueCondition(ValueCondition): # pragma: no cover
"""
value goes out of range for specific amount of time
.. warning::
Not implemented!
"""
[docs] def __init__(self, time, *args, **kwargs):
"""
Args:
time (float): number of seconds value must be out of range
*args:
**kwargs:
"""
super(TimeValueCondition, self).__init__(*args, **kwargs)
self.time = time
raise NotImplementedError('Time condition has not been implemented!')
[docs] def check(self, sensor_values):
pass
[docs]class AlarmSeverityCondition(Condition):
[docs] def __init__(self,
alarm_type: AlarmType,
severity: AlarmSeverity,
mode: str = 'min',
*args, **kwargs):
"""
Alarm is above or below a certain severity.
Get alarm severity status from :meth:`.Alarm_Manager.get_alarm_severity` .
Args:
alarm_type ( :class:`.AlarmType` ): Alarm type to check
severity ( :class:`.AlarmSeverity` ): Alarm severity to check against
mode (str): one of 'min', 'equals', or 'max'.
'min' returns true if the alarm is at least this value
(note the difference from ValueCondition which returns true if the alarm is less than..)
and vice versa for 'max'.
.. note::
'min' and 'max' use >= and <= rather than > and <
*args:
**kwargs:
"""
super(AlarmSeverityCondition, self).__init__(*args, **kwargs)
self.alarm_type = alarm_type
self.severity = severity
self._mode = None
self.operator = None
self.mode = mode
@property
def mode(self) -> str: # pragma: no cover
"""
'min' returns true if the alarm is at least this value
(note the difference from ValueCondition which returns true if the alarm is less than..)
and vice versa for 'max'.
.. note::
'min' and 'max' use >= and <= rather than > and <
Returns:
str: one of 'min', 'equals', or 'max'.
"""
return self._mode
@mode.setter
def mode(self, mode: str):
assert(mode in ('min', 'eq', 'max'))
if mode == 'min':
# if we're a minimum, True (raise alarm) if value is less than limit
self.operator = operator.ge
elif mode == 'eq':
self.operator = operator.eq
elif mode == 'max':
self.operator = operator.le
else: # pragma: no cover
raise ValueError(f'needs to be max or min, got {mode}')
self._mode = mode
[docs] def check(self, sensor_values=None):
alarm_severity = self.manager.get_alarm_severity(self.alarm_type)
return self.operator(alarm_severity, self.severity)
[docs] def reset(self): # pragma: no cover
pass
[docs]class CycleAlarmSeverityCondition(AlarmSeverityCondition):
"""
alarm goes out of range for a specific number of breath cycles
.. todo::
note that this is exactly the same as CycleValueCondition. Need to do the multiple inheritance thing
Attributes:
_start_cycle (int): The breath cycle where the
_mid_check (bool): whether a value has left the acceptable range and we are counting consecutive breath cycles
"""
def __init__(self, n_cycles, *args, **kwargs):
super(CycleAlarmSeverityCondition, self).__init__(*args, **kwargs)
self._n_cycles = None
self.n_cycles = n_cycles
self._start_cycle = 0
self._mid_check = False
@property
def n_cycles(self):
return self._n_cycles
@n_cycles.setter
def n_cycles(self, n_cycles):
if not isinstance(n_cycles, int): # pragma: no cover
n_cycles = int(round(n_cycles))
assert(n_cycles>0)
self._n_cycles = n_cycles
[docs] def check(self, sensor_values):
# first check if we are outside of the range
if super(CycleAlarmSeverityCondition, self).check(sensor_values):
breath_cycle = sensor_values.breath_count
# if we're currently in a consecutive set of out-of-range alarms..
# note: doing it this way because we *dont* want to alarm if there are
# in-range values seen in the waiting period, but we *do* want to
# alarm if we miss a value from a breath cycle but haven't seen any
# in-range values.
if self._mid_check:
# if we have progressed the required number of cycles...
if breath_cycle >= self._start_cycle + self.n_cycles:
return True
else:
return False
else:
# otherwise, this is the first time we've gone out of bounds
self._mid_check = True
self._start_cycle = breath_cycle
# don't check yet, n_cycles must > 0
return False
else: # pragma: no cover - usually this comes after a check for this that should return false, so we never reach here.
# if we're not outside the range, false.
# reset the flag that says we're inside a check
self._mid_check = False
return False
[docs] def reset(self):
self._mid_check = False
self._start_cycle = 0