42 lines
1.5 KiB
Python
42 lines
1.5 KiB
Python
from queue import Queue
|
|
from loguru import logger
|
|
from GenericMqttSubscriber import GenericMqttSubscriber
|
|
import importlib
|
|
|
|
|
|
class SlaveHandler(object):
|
|
def __init__(self, config, name, experiment):
|
|
self.config = config
|
|
self.name = name
|
|
localConfig = self.config[self.name]
|
|
|
|
queue = Queue()
|
|
self.mqttHandler = GenericMqttSubscriber(config, self.name, queue)
|
|
|
|
sinkHandlerName = localConfig["sinkHandler"]
|
|
sinkHandlerModule = importlib.import_module(sinkHandlerName)
|
|
logger.info(f"sink handler {sinkHandlerName} loaded, about to instantiate it")
|
|
sinkHandlerClass = getattr(sinkHandlerModule, sinkHandlerName)
|
|
logger.info(f"required class loaded")
|
|
self.sinkHandler = sinkHandlerClass(config, self.name, queue, experiment)
|
|
logger.info(f"sink handler {sinkHandlerName} instantiated")
|
|
|
|
def start(self):
|
|
logger.info(f"Starting slave handler {self.name}")
|
|
self.mqttHandler.start()
|
|
logger.debug("mqtt handler running")
|
|
self.sinkHandler.start()
|
|
logger.debug("sink handler running")
|
|
|
|
def stop(self):
|
|
logger.info(f"Stopping slave handler {self.name}")
|
|
self.mqttHandler.stop()
|
|
logger.info("mqtt handler stopped")
|
|
self.mqttHandler.join()
|
|
logger.info("mqtt handler joined")
|
|
|
|
self.sinkHandler.stop()
|
|
logger.info("sink handler stopped")
|
|
self.sinkHandler.join()
|
|
logger.info("sink handler joined")
|