separate abstract and generic sink handler

This commit is contained in:
Wolfgang Hottgenroth 2021-11-09 18:00:36 +01:00
parent 1953fd386c
commit 37af62c014
Signed by: wn
GPG Key ID: 6C1E5E531E0D5D7F
4 changed files with 49 additions and 28 deletions

View File

@ -0,0 +1,32 @@
import threading
from loguru import logger
from DataObject import DataObject
POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None)
class AbstractSinkHandler(threading.Thread):
def __init__(self, config, inQueue, experiment):
super().__init__()
self.config = config
self.inQueue = inQueue
self.experiment = experiment
def run(self):
logger.debug("loop started")
while True:
dataObject = self.inQueue.get()
if (dataObject == POISON_PILL):
logger.debug("swallowed the poison pill")
break
self.sinkAction(dataObject)
def sinkAction(self, dataObject):
raise NotImplementedError()
def stop(self):
self.inQueue.put(POISON_PILL)
logger.debug("kill flag set")

View File

@ -1,30 +1,9 @@
import threading
from loguru import logger from loguru import logger
from DataObject import DataObject from AbstractSinkHandler import AbstractSinkHandler
class GenericSinkHandler(AbstractSinkHandler):
POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None)
class GenericSinkHandler(threading.Thread):
def __init__(self, config, inQueue, experiment): def __init__(self, config, inQueue, experiment):
super().__init__() super().__init__(config, inQueue, experiment)
self.config = config def sinkAction(self, dataObject):
self.inQueue = inQueue logger.info(f"received {dataObject} for experiment {self.experiment}")
self.experiment = experiment
def run(self):
logger.debug("GenericSinkHandler loop started")
while True:
dataObject = self.inQueue.get()
if (dataObject == POISON_PILL):
logger.debug("GenericSinkHandler swallowed the poison pill")
break
logger.info(f"GenericSinkHandler received {dataObject} for experiment {self.experiment}")
def stop(self):
self.inQueue.put(POISON_PILL)
logger.debug("kill flag set")

View File

@ -2,16 +2,23 @@ from queue import Queue
from loguru import logger from loguru import logger
from GenericMqttSubscriber import GenericMqttSubscriber from GenericMqttSubscriber import GenericMqttSubscriber
from GenericSinkHandler import GenericSinkHandler from GenericSinkHandler import GenericSinkHandler
import importlib
class SlaveHandler(object): class SlaveHandler(object):
def __init__(self, config, name, experiment): def __init__(self, config, name, experiment):
self.config = config self.config = config
self.name = name self.name = name
localConfig = self.config[name]
queue = Queue() queue = Queue()
self.mqttHandler = GenericMqttSubscriber(config, self.name, queue) self.mqttHandler = GenericMqttSubscriber(config, self.name, queue)
self.sinkHandler = GenericSinkHandler(config, queue, experiment)
sinkHandlerName = localConfig["sinkHandler"]
sinkHandlerModule = importlib.import_module(sinkHandlerName)
logger.info(f"sink handler {sinkHandlerName} loaded, about to instantiate it")
sinkHandlerClass = getattr(sinkHandlerModule, sinkHandlerName)
self.sinkHandler = sinkHandlerClass(config, queue, experiment)
def start(self): def start(self):
logger.info(f"Starting slave handler {self.name}") logger.info(f"Starting slave handler {self.name}")

View File

@ -11,12 +11,15 @@ slaves=dev05 dev06 rig01
[dev05] [dev05]
topic=rd/set01/rig01/dev05/md topic=rd/set01/rig01/dev05/md
dataObjectName=value dataObjectName=value
sinkHandler=GenericSinkHandler
[dev06] [dev06]
topic=rd/set01/rig01/dev06/md topic=rd/set01/rig01/dev06/md
dataObjectName=value dataObjectName=value
sinkHandler=GenericSinkHandler
[rig01] [rig01]
topic=rd/set01/rig01/log topic=rd/set01/rig01/log
dataObjectName=log dataObjectName=log
sinkHandler=GenericSinkHandler