Source code for pvp.gui.widgets.plot

import time
from collections import deque
import pdb
import typing

import numpy as np
from PySide2 import QtCore, QtWidgets, QtGui
import PySide2 # import so pyqtgraph recognizes as what we're using
import pyqtgraph as pg
# pg.setConfigOptions(antialias=True)




from pvp.gui import styles
from pvp.gui import mono_font
from pvp.gui import get_gui_instance
from pvp.common import unit_conversion
from pvp.common.message import SensorValues, ControlSetting
from pvp.common.values import ValueName, Value
from pvp.common.loggers import init_logger
from pvp.alarm import AlarmSeverity

PLOT_TIMER = None
"""
A :class:`~PySide2.QtCore.QTimer` that updates :class:`.TimedPlotCurveItem`s
"""

PLOT_FREQ = 5
"""
Update frequency of :class:`.Plot` s in Hz
"""

# TODO: add NamedLine back in order to label limit lines in plots with ValueNames

# class NamedLine(pg.InfiniteLine):
#     value_changed = QtCore.Signal(tuple)
#
#     def __init__(self, name: ValueName, *args, **kwargs):
#         super(NamedLine, self).__init__(*args, **kwargs)
#         self.name = name
#         self.sigPositionChanged.connect(self._value_changed)
#
#         self.setHoverPen(color=styles.SUBWAY_COLORS['lime'],
#                          width=self.pen.width())
#
#     def _value_changed(self):
#         self.value_changed.emit((self.name, self.value()))
#
#     def setSpan(self, mn, mx):
#         if self.span != (mn, mx):
#             self.span = (mn, mx)
#             self._invalidateCache()
#             self.update()


[docs]class Plot(pg.PlotWidget): limits_changed = QtCore.Signal(tuple)
[docs] def __init__(self, name, buffer_size = 4092, plot_duration = 10, abs_range = None, plot_limits: tuple = None, color=None, units='', **kwargs): """ Args: name: buffer_size: plot_duration: abs_range: plot_limits (tuple): tuple of (ValueName)s for which to make pairs of min and max range lines color: units: **kwargs: """ #super(Plot, self).__init__(axisItems={'bottom':TimeAxis(orientation='bottom')}) # construct title html string titlestr = "{title_text} ({units})".format( title_text=name, units =units) super(Plot, self).__init__(background=styles.BOX_BACKGROUND, title=titlestr) self.getPlotItem().titleLabel.item.setHtml( f"<span style='{styles.PLOT_TITLE_STYLE}'>{titlestr}</span>" ) self.getPlotItem().titleLabel.setAttr('justify', 'left') self.name = name # pdb.set_trace() # self.setViewportMargins(0,0,styles.BOX_MARGINS,0) self.timestamps = deque(maxlen=buffer_size) self.history = deque(maxlen=buffer_size) self.cycles = deque(maxlen=buffer_size) # TODO: Make @property to update buffer_size, preserving history self.plot_duration = plot_duration self.units = units self._convert_in = None self._convert_out = None self._start_time = time.time() self._last_time = time.time() self._last_relative_time = 0 self.abs_range = None if abs_range: self.abs_range = abs_range #self.setYRange(self.abs_range[0], self.abs_range[1]) #self.enableAutoRange(y=True) # split plot curve into two so that the endpoint doesn't get connected to the start point self.early_curve = self.plot(width=3) self.late_curve = self.plot(width=3) self._plot_limits = {} if plot_limits: for value in plot_limits: self._plot_limits[value] = ( pg.InfiniteLine(movable=False, angle=0, pos=0, pen=styles.SUBWAY_COLORS['red'], label=f'{value.name}:{{value:0.1f}}', labelOpts={ 'color': styles.TEXT_COLOR, 'fill': styles.SUBWAY_COLORS['red'], 'position': 0.1 }), pg.InfiniteLine(movable=False, angle=0, pos=0, pen=styles.SUBWAY_COLORS['red'], label=f'{value.name}:{{value:0.1f}}', labelOpts={ 'color': styles.TEXT_COLOR, 'fill': styles.SUBWAY_COLORS['red'], 'position': 0.9 }) ) # self.min_safe.sigPositionChanged.connect(self._safe_limits_changed) # self.max_safe.sigPositionChanged.connect(self._safe_limits_changed) self.addItem(self._plot_limits[value][0]) self.addItem(self._plot_limits[value][1]) # vline to indicate current time self.time_marker = pg.InfiniteLine(movable=False, angle=90, pos=0) self.time_marker.setPen(color="#FFFFFF", width=1) self.addItem(self.time_marker) self.setXRange(0, plot_duration) if color: self.early_curve.setPen(color=color, width=3) self.late_curve.setPen(color=color, width=3)
[docs] def set_duration(self, dur): self.plot_duration = int(round(dur)) self.setXRange(0, self.plot_duration)
[docs] def update_value(self, new_value: tuple): """ new_value (tuple): (timestamp from time.time(), breath_cycle, value) """ try: this_time = time.time() #time_diff = this_time-self._last_time # limits = self.getPlotItem().viewRange() current_relative_time = (this_time-self._start_time) % self.plot_duration # self.time_marker.setData([current_relative_time, current_relative_time], # [limits[1][0], limits[1][1]]) self.time_marker.setValue(current_relative_time) self.timestamps.append(new_value[0]) self.cycles.append(new_value[1]) self.history.append(new_value[2]) # filter values based on timestamps ts_array = np.array(self.timestamps) end_ind = len(self.history) start_ind = np.where(ts_array > (this_time - self.plot_duration))[0][0] # subtract start time and take modulus of duration to get wrapped timestamps plot_timestamps = np.mod(ts_array[start_ind:end_ind]-self._start_time, self.plot_duration) plot_values = np.array([self.history[i] for i in range(start_ind, end_ind)]) if self._convert_in: plot_values = self._convert_in(plot_values) # find the point where the time resets try: reset_ind = np.where(np.diff(plot_timestamps)<0)[0][0] # plot early and late self.early_curve.setData(plot_timestamps[0:reset_ind+1],plot_values[0:reset_ind+1] ) self.late_curve.setData(plot_timestamps[reset_ind+1:], plot_values[reset_ind+1:]) except IndexError: self.early_curve.setData(plot_timestamps, plot_values) self.late_curve.clear() except: # FIXME: Log this lol print('error plotting value: {}, timestamp: {}'.format(new_value[1], new_value[0]))
#self._last_time = this_time
[docs] def _safe_limits_changed(self, val): # ignore input val, just emit the current value of the lines self.limits_changed.emit((self.min_safe.value(), self.max_safe.value()))
# @QtCore.Slot(tuple)
[docs] def set_safe_limits(self, limits: ControlSetting): if self._plot_limits is None: return if limits.name in self._plot_limits.keys(): if limits.min_value: if self._convert_out: self._plot_limits[limits.name][0].setPos(self._convert_out(limits.min_value)) else: self._plot_limits[limits.name][0].setPos(limits.min_value) if limits.max_value: if self._convert_out: self._plot_limits[limits.name][1].setPos(self._convert_out(limits.max_value)) else: self._plot_limits[limits.name][1].setPos(limits.max_value)
[docs] def reset_start_time(self): self._start_time = time.time() self._last_time = time.time() self._last_relative_time = 0
[docs] def set_units(self, units): if self.name in ('Pressure',): if units == 'cmH2O': self.decimals = 1 self.units = units self._convert_in = None self._convert_out = None for range_pairs in self._plot_limits.values(): for a_line in range_pairs: a_line.setPos(unit_conversion.hPa_to_cmH2O(a_line.getPos()[1])) elif units == 'hPa': self.decimals = 0 self.units = units self._convert_in = unit_conversion.cmH2O_to_hPa self._convert_out = unit_conversion.hPa_to_cmH2O for range_pairs in self._plot_limits.values(): for a_line in range_pairs: a_line.setPos(unit_conversion.cmH2O_to_hPa(a_line.getPos()[1])) titlestr = "{title_text} ({units})".format( title_text=self.name, units=self.units) self.getPlotItem().titleLabel.item.setHtml( f"<span style='{styles.PLOT_TITLE_STYLE}'>{titlestr}</span>" )
[docs]class Plot_Container(QtWidgets.QGroupBox): def __init__(self, plot_descriptors: typing.Dict[ValueName, Value], *args, **kwargs): super(Plot_Container, self).__init__('Monitored Waveforms', *args, **kwargs) self.plot_descriptors = plot_descriptors self.plots = {} self.visible = list(plot_descriptors.keys()) self.setStyleSheet(styles.PLOT_BOX) self.setContentsMargins(0,0,0,0) self.logger = init_logger(__name__) self.init_ui() self.set_duration(10)
[docs] def init_ui(self): self.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding) self.layout = QtWidgets.QVBoxLayout() # initialize the buttons first!!! self.button_layout = QtWidgets.QHBoxLayout() # plot selection buttons # self.selection_button_group = QtWidgets.QButtonGroup() self.selection_buttons = {} for plot_key, plot_params in self.plot_descriptors.items(): new_button = QtWidgets.QPushButton(plot_params.name) new_button.setCheckable(True) new_button.setChecked(True) new_button.setObjectName(plot_key.name) new_button.setStyleSheet(styles.TOGGLE_BUTTON) new_button.toggled.connect(self.toggle_plot) self.button_layout.addWidget(new_button, 2) self.selection_buttons[plot_key.name] = new_button if plot_key == ValueName.FIO2: new_button.setChecked(False) self.button_layout.addStretch(2) # # select display mode # self.plot_mode_buttons = [ # QtWidgets.QPushButton('Waveform'), # QtWidgets.QPushButton('Cycle') # ] # # self.plot_mode_button_group = QtWidgets.QButtonGroup() # self.plot_mode_button_group.setExclusive(True) # self.plot_mode_layout = QtWidgets.QHBoxLayout() # # for button in self.plot_mode_buttons: # button.setObjectName(button.text()) # button.setCheckable(True) # self.plot_mode_button_group.addButton(button) # button.setStyleSheet(styles.TOGGLE_BUTTON) # self.plot_mode_layout.addWidget(button) # # self.plot_mode_buttons[0].setChecked(True) # self.plot_mode_button_group.buttonClicked.connect(self.set_plot_mode) # # self.button_layout.addLayout(self.plot_mode_layout) # time box and slider self.time_box = QtWidgets.QLineEdit() self.time_box.setValidator(QtGui.QIntValidator()) self.time_box.textEdited.connect(self.set_duration) self.button_layout.addWidget(self.time_box, 1) self.slider = QtWidgets.QSlider() self.slider.setMinimum(5) self.slider.setMaximum(60) self.slider.setOrientation(QtCore.Qt.Orientation.Horizontal) # self.slider.setTickPosition(QtWidgets.QSlider.TickPosition.TicksBelow) self.slider.valueChanged.connect(self.set_duration) self.button_layout.addWidget(self.slider, 2) self.layout.addLayout(self.button_layout) for i, (plot_key, plot_params) in enumerate(self.plot_descriptors.items()): if i % 2 == 0: plot_color = styles.SUBWAY_COLORS['orange'] else: plot_color = styles.SUBWAY_COLORS['ltblue'] self.plots[plot_key.name] = Plot(color=plot_color, **plot_params.to_dict()) self.layout.addWidget(self.plots[plot_key.name], 1) if plot_key == ValueName.FIO2: self.plots[plot_key.name].setVisible(False) self.setLayout(self.layout)
[docs] def update_value(self, vals: SensorValues): for plot_key, plot in self.plots.items(): if hasattr(vals, plot_key): try: plot.update_value((time.time(), getattr(vals, 'breath_count'), getattr(vals, plot_key))) except Exception as e: self.logger.exception(f'Couldnt update plot with {plot_key}, got error {e}')
[docs] def toggle_plot(self, state: bool): sender_name = self.sender().objectName() if sender_name in self.plots.keys(): if state: self.plots[sender_name].setVisible(True) else: self.plots[sender_name].setVisible(False)
[docs] def set_safe_limits(self, control: ControlSetting): for plot in self.plots.values(): plot.set_safe_limits(control)
[docs] def set_duration(self, duration: float): if isinstance(duration, str): duration = float(duration) self.time_box.setText(str(duration)) self.slider.setValue(duration) for plot in self.plots.values(): plot.set_duration(duration)
[docs] def reset_start_time(self): for plot in self.plots.values(): plot.reset_start_time()
[docs] def set_plot_mode(self): raise NotImplementedError('PVP 2!!!')