Source code for pvp.coordinator.rpc

import logging
import typing
import pickle
import socket
import xmlrpc.client
from xmlrpc.server import SimpleXMLRPCServer

from pvp.controller import control_module
from pvp.common.loggers import init_logger

default_addr = 'localhost'
default_port = 9533
default_timeout = 10
socket.setdefaulttimeout(default_timeout)

remote_controller = None # type: typing.Union[None, control_module.ControlModuleBase]


# General comment on the "# pragma: no cover":
# These functions are extensively tested in the UI-tests, but not monitored by travis 

[docs]def get_sensors(): # pragma: no cover res = remote_controller.get_sensors() return pickle.dumps(res)
[docs]def get_alarms(): # pragma: no cover res = remote_controller.get_alarms() return pickle.dumps(res)
[docs]def set_control(control_setting): # pragma: no cover args = pickle.loads(control_setting.data) remote_controller.set_control(args)
[docs]def get_control(control_setting_name): # pragma: no cover args = pickle.loads(control_setting_name.data) res = remote_controller.get_control(args) return pickle.dumps(res)
[docs]def set_breath_detection(breath_detection): # pragma: no cover args = pickle.loads(breath_detection.data) remote_controller.set_breath_detection(args)
[docs]def get_breath_detection(): # pragma: no cover res = remote_controller.get_breath_detection() return pickle.dumps(res)
[docs]def rpc_server_main(sim_mode, serve_event, addr=default_addr, port=default_port): # pragma: no cover logger = init_logger(__name__) logger.info('controller process init') global remote_controller if addr != default_addr: raise NotImplementedError if port != default_port: raise NotImplementedError remote_controller = control_module.get_control_module(sim_mode) server = SimpleXMLRPCServer((addr, port), allow_none=True, logRequests=False) server.register_function(get_sensors, "get_sensors") server.register_function(set_control, "set_control") server.register_function(get_control, "get_control") server.register_function(remote_controller.start, "start") server.register_function(remote_controller.is_running, "is_running") server.register_function(remote_controller.stop, "stop") server.register_function(get_alarms, 'get_alarms') server.register_function(set_breath_detection, 'set_breath_detection') server.register_function(get_breath_detection, "get_breath_detection") serve_event.set() server.serve_forever()
[docs]def get_rpc_client(): # https://mail.python.org/pipermail/python-bugs-list/2015-January/260126.html #transport = xmlrpc.client.Transport() #con = transport.make_connection(f"http://{default_addr}:{default_port}/") #con.timeout = 5 # proxy = xmlrpc.client.ServerProxy(f"http://{default_addr}:{default_port}/") return proxy