slave and sink handler added

This commit is contained in:
Wolfgang Hottgenroth 2021-11-09 17:40:13 +01:00
parent 48086191d4
commit 1953fd386c
Signed by: wn
GPG Key ID: 6C1E5E531E0D5D7F
7 changed files with 136 additions and 12 deletions

View File

@ -37,7 +37,7 @@ def sendValue(client):
msg = {
"ts": str(datetime.now()),
"dt": 100,
"v": sample(range(0xffffffff), 3000)
"v": sample(range(0xffffffff), 3)
}
topic = f"{TOPIC_PRE}/rig01/dev05/md"
client.publish(topic, json.dumps(msg))

View File

@ -1,6 +1,5 @@
from json.decoder import JSONDecodeError
import paho.mqtt.client as mqtt
import threading
from loguru import logger
from time import sleep
import json

30
src/GenericSinkHandler.py Normal file
View File

@ -0,0 +1,30 @@
import threading
from loguru import logger
from DataObject import DataObject
POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None)
class GenericSinkHandler(threading.Thread):
def __init__(self, config, inQueue, experiment):
super().__init__()
self.config = config
self.inQueue = inQueue
self.experiment = experiment
def run(self):
logger.debug("GenericSinkHandler loop started")
while True:
dataObject = self.inQueue.get()
if (dataObject == POISON_PILL):
logger.debug("GenericSinkHandler swallowed the poison pill")
break
logger.info(f"GenericSinkHandler received {dataObject} for experiment {self.experiment}")
def stop(self):
self.inQueue.put(POISON_PILL)
logger.debug("kill flag set")

View File

@ -1,51 +1,98 @@
from math import exp
from queue import Empty
import threading
from loguru import logger
from time import sleep
from DataObject import DataObject
from enum import Enum
from SlaveHandler import SlaveHandler
POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None)
TIMEOUT_PILL = DataObject(name="TimeoutPill", topic="timeout", payload=None)
GRACE_PERIOD = 50
class e_State(Enum):
IDLE = 0
RUNNING = 1
WAITING = 2
class RigCmdHandler(threading.Thread):
def __init__(self, config, inQueue):
super().__init__()
self.config = config
localConfig = self.config["master"]
self.slaveNames = localConfig["slaves"].split(' ')
self.slaves = []
self.inQueue = inQueue
self.state = e_State.IDLE
def run(self):
logger.debug("RigCmdHandler loop started")
state = e_State.IDLE
waitCnt = 0
experiment = None
while True:
dataObject = self.inQueue.get()
try:
dataObject = self.inQueue.get(timeout=0.1)
except Empty:
dataObject = TIMEOUT_PILL
if (dataObject == POISON_PILL):
logger.error("RigCmdHandler swallowed the poison pill")
break
logger.info(f"State is {self.state}")
# logger.debug(f"State is {state}, WaitCnt is {waitCnt}")
# Python 3.10 is required!!
match self.state:
match state:
case e_State.IDLE:
if (dataObject == TIMEOUT_PILL):
continue
if (dataObject.name == 'cmd') and (dataObject.payload["es"] == 'start'):
logger.info("start command received, switch to RUNNING state")
self.state = e_State.RUNNING
experiment = dataObject.payload["en"]
logger.info(f"start command received, switch to RUNNING state, experiment is {experiment}")
state = e_State.RUNNING
self.experimentStart(experiment)
elif (dataObject.name == 'cmd'):
logger.error(f"illegal command {dataObject.name} received in IDLE state")
else:
logger.error("illegal message received in IDLE state")
case e_State.RUNNING:
if (dataObject == TIMEOUT_PILL):
continue
if (dataObject.name == 'cmd') and (dataObject.payload["es"] == 'stop'):
logger.info("stop command received, switch to IDLE state")
self.state = e_State.IDLE
logger.info("stop command received, switch to WAITING state")
state = e_State.WAITING
waitCnt = 0
elif (dataObject.name == 'cmd'):
logger.error(f"illegal command {dataObject.name} received in RUNNING state")
else:
logger.error("illegal message received in RUNNING state")
case e_State.WAITING:
if (dataObject == TIMEOUT_PILL):
waitCnt += 1
if (waitCnt >= GRACE_PERIOD):
logger.info("grace period is over, switch to IDLE state")
state = e_State.IDLE
self.experimentStop(experiment)
else:
logger.error("illegal message (not TIMEOUT) received in WAITING state")
def experimentStart(self, experiment):
logger.info(f"experiment started, {experiment}")
for slaveName in self.slaveNames:
slave = SlaveHandler(self.config, slaveName, experiment)
slave.start()
self.slaves.append(slave)
def experimentStop(self, experiment):
logger.info(f"experiment stopped, {experiment}")
for slave in self.slaves:
slave.stop()
self.slaves.clear()
def stop(self):

33
src/SlaveHandler.py Normal file
View File

@ -0,0 +1,33 @@
from queue import Queue
from loguru import logger
from GenericMqttSubscriber import GenericMqttSubscriber
from GenericSinkHandler import GenericSinkHandler
class SlaveHandler(object):
def __init__(self, config, name, experiment):
self.config = config
self.name = name
queue = Queue()
self.mqttHandler = GenericMqttSubscriber(config, self.name, queue)
self.sinkHandler = GenericSinkHandler(config, queue, experiment)
def start(self):
logger.info(f"Starting slave handler {self.name}")
self.mqttHandler.start()
logger.debug("mqtt handler running")
self.sinkHandler.start()
logger.debug("sink handler running")
def stop(self):
logger.info(f"Stopping slave handler {self.name}")
self.mqttHandler.stop()
logger.info("mqtt handler stopped")
self.mqttHandler.join()
logger.info("mqtt handler joined")
self.sinkHandler.stop()
logger.info("sink handler stopped")
self.sinkHandler.join()
logger.info("sink handler joined")

View File

@ -4,5 +4,19 @@ password=
broker=172.16.2.16
[master]
topic=rd/+/+/cmd
topic=rd/set01/rig01/cmd
dataObjectName=cmd
slaves=dev05 dev06 rig01
[dev05]
topic=rd/set01/rig01/dev05/md
dataObjectName=value
[dev06]
topic=rd/set01/rig01/dev06/md
dataObjectName=value
[rig01]
topic=rd/set01/rig01/log
dataObjectName=log

View File

@ -56,7 +56,8 @@ logger.debug("IIoTFeeder2 running")
# ------------------------------------------------------------------------------
while not knell.wait(1.0):
logger.debug("master queue size {}".format(masterQueue.qsize()))
# logger.debug("master queue size {}".format(masterQueue.qsize()))
pass
logger.error("IIoTFeeder2 is dying")