Source code for pvp.io.hal

""" Module for interacting with physical and/or simulated devices installed on the ventilator.

"""

from importlib import import_module
from ast import literal_eval
from .devices.sensors import Sensor
from .devices.base import PigpioConnection

import pvp.io.devices.valves as valves
import configparser


[docs]class Hal: """ Hardware Abstraction Layer for ventilator hardware. Defines a common API for interacting with the sensors & actuators on the ventilator. The types of devices installed on the ventilator (real or simulated) are specified in a configuration file. """
[docs] def __init__(self, config_file='pvp/io/config/devices.ini'): """ Initializes HAL from config_file. For each section in config_file, imports the class <type> from module <module>, and sets attribute self.<section> = <type>(**opts), where opts is a dict containing all of the options in <section> that are not <type> or <section>. For example, upon encountering the following entry in config_file.ini: [adc] type = ADS1115 module = devices i2c_address = 0x48 i2c_bus = 1 The Hal will: 1) Import pvp.io.devices.ADS1115 (or ADS1015) as a local variable: class_ = getattr(import_module('.devices', 'pvp.io'), 'ADS1115') 2) Instantiate an ADS1115 object with the arguments defined in config_file and set it as an attribute: self._adc = class_(pig=self.-pig,address=0x48,i2c_bus=1) Note: RawConfigParser.optionxform() is overloaded here s.t. options are case sensitive (they are by default case insensitive). This is necessary due to the kwarg MUX which is so named for consistency with the config registry documentation in the ADS1115 datasheet. For example, A P4vMini pressure_sensor on pin A0 (MUX=0) of the ADC is passed arguments like: analog_sensor = AnalogSensor( pig=self._pig, adc=self._adc, MUX=0, offset_voltage=0.25, output_span = 4.0, conversion_factor=2.54*20 ) Note: ast.literal_eval(opt) interprets integers, 0xFF, (a, b) etc. correctly. It does not interpret strings correctly, nor does it know 'adc' -> self._adc; therefore, these special cases are explicitly handled. Args: config_file (str): Path to the configuration file containing the definitions of specific components on the ventilator machine. (e.g., config_file = "pvp/io/config/devices.ini") """ self._setpoint_in = 0.0 # setpoint for inspiratory side self._setpoint_ex = 0.0 # setpoint for expiratory side self._adc = object self._inlet_valve = object self._control_valve = object self._expiratory_valve = object self._pressure_sensor = object self._aux_pressure_sensor = object self._flow_sensor_in = object self._flow_sensor_ex = object self._pig = PigpioConnection(show_errors=False) self.config = configparser.RawConfigParser() self.config.optionxform = lambda option: option self.config.read(config_file) for section in self.config.sections(): sdict = dict(self.config[section]) class_ = getattr(import_module('.' + sdict['module'], 'pvp.io'), sdict['type']) opts = {key: sdict[key] for key in sdict.keys() - ('module', 'type',)} for key in opts.keys(): if key == 'adc': opts[key] = self._adc else: opts[key] = literal_eval(opts[key]) print(" [ {device_name:^19} ] opts: {device_options}".format( device_name=section, device_options=opts )) # debug setattr(self, '_' + section, class_(pig=self._pig, **opts))
# TODO: Need exception handling whenever inlet valve is opened @property def pressure(self) -> float: """ Returns the pressure from the primary pressure sensor. """ return self._pressure_sensor.get() @property def oxygen(self) -> float: """ Returns the oxygen concentration from the primary oxygen sensor. """ return self._oxygen_sensor.get() @property def aux_pressure(self) -> float: """ Returns the pressure from the auxiliary pressure sensor, if so equipped. If a secondary pressure sensor is not defined, raises a RuntimeWarning. """ if isinstance(self._aux_pressure_sensor, Sensor): return self._aux_pressure_sensor.get() else: raise RuntimeWarning('Secondary pressure sensor not instantiated. Check your "devices.ini" file.') @property def flow_in(self) -> float: """ The measured flow rate inspiratory side.""" return self._flow_sensor_in.get() @property def flow_ex(self) -> float: """ The measured flow rate expiratory side.""" return self._flow_sensor_ex.get() @property def setpoint_in(self) -> float: """ The currently requested flow for the inspiratory proportional control valve as a proportion of maximum.""" return self._setpoint_in @setpoint_in.setter def setpoint_in(self, value: float): """ Sets the openness of the inspiratory valve to the requested value. Args: value: Requested flow, as a proportion of maximum. Must be in [0, 1]. """ if not 0 <= value <= 100: raise ValueError('setpoint must be a number between 0 and 100') if value > 0 and not self._inlet_valve.is_open: self._inlet_valve.open() elif value == 0 and self._inlet_valve.is_open: self._inlet_valve.close() self._control_valve.setpoint = value self._setpoint_in = value @property def setpoint_ex(self) -> float: """ The currently requested flow on the expiratory side as a proportion of the maximum.""" return self._setpoint_ex @setpoint_ex.setter def setpoint_ex(self, value): """ Sets the openness of the expiratory valve to the requested value. Args: value (float): Requested flow, as a proportion of maximum. Must be either 0 or 1 for OnOffValve, and between 0 and 1 for a (proportional) control valve. """ if ( isinstance(self._expiratory_valve, valves.OnOffValve) or isinstance(self._expiratory_valve, valves.SimOnOffValve) ): if value not in (0, 1): raise ValueError('setpoint must be either 0 or 1 for an On/Off expiratory valve') elif value == 1: self._expiratory_valve.open() else: self._expiratory_valve.close() elif ( isinstance(self._expiratory_valve, valves.PWMControlValve) or isinstance(self._expiratory_valve, valves.SimControlValve) ): if not 0 <= value <= 100: raise ValueError('setpoint must be between 0 and 100 for an expiratory control valve') else: self._expiratory_valve.setpoint = value self._setpoint_ex = value