optional initialization of sink handlers
This commit is contained in:
parent
37af62c014
commit
85f384d52c
@ -7,16 +7,19 @@ POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None)
|
||||
|
||||
|
||||
class AbstractSinkHandler(threading.Thread):
|
||||
def __init__(self, config, inQueue, experiment):
|
||||
def __init__(self, config, name, inQueue, experiment):
|
||||
super().__init__()
|
||||
|
||||
self.config = config
|
||||
self.name = name
|
||||
self.inQueue = inQueue
|
||||
self.experiment = experiment
|
||||
|
||||
def run(self):
|
||||
logger.debug("loop started")
|
||||
|
||||
self.init()
|
||||
|
||||
while True:
|
||||
dataObject = self.inQueue.get()
|
||||
if (dataObject == POISON_PILL):
|
||||
@ -24,6 +27,9 @@ class AbstractSinkHandler(threading.Thread):
|
||||
break
|
||||
self.sinkAction(dataObject)
|
||||
|
||||
def init(self):
|
||||
logger.debug("no initialization needed")
|
||||
|
||||
def sinkAction(self, dataObject):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
@ -1,9 +1,9 @@
|
||||
from loguru import logger
|
||||
from AbstractSinkHandler import AbstractSinkHandler
|
||||
|
||||
# make sure other derived modules/classes from AbstractSinkHandler/AbstractSinkHandler
|
||||
# use the same name for the module and the class within
|
||||
|
||||
class GenericSinkHandler(AbstractSinkHandler):
|
||||
def __init__(self, config, inQueue, experiment):
|
||||
super().__init__(config, inQueue, experiment)
|
||||
|
||||
def sinkAction(self, dataObject):
|
||||
logger.info(f"received {dataObject} for experiment {self.experiment}")
|
||||
logger.info(f"sink {self.name} received {dataObject} for experiment {self.experiment}")
|
||||
|
@ -18,7 +18,7 @@ class SlaveHandler(object):
|
||||
sinkHandlerModule = importlib.import_module(sinkHandlerName)
|
||||
logger.info(f"sink handler {sinkHandlerName} loaded, about to instantiate it")
|
||||
sinkHandlerClass = getattr(sinkHandlerModule, sinkHandlerName)
|
||||
self.sinkHandler = sinkHandlerClass(config, queue, experiment)
|
||||
self.sinkHandler = sinkHandlerClass(config, self.name, queue, experiment)
|
||||
|
||||
def start(self):
|
||||
logger.info(f"Starting slave handler {self.name}")
|
||||
|
Loading…
x
Reference in New Issue
Block a user