RigCmdHandler started

This commit is contained in:
Wolfgang Hottgenroth 2021-11-09 16:31:32 +01:00
parent 0de79b82c5
commit 48086191d4
Signed by: wn
GPG Key ID: 6C1E5E531E0D5D7F
6 changed files with 94 additions and 21 deletions

View File

@ -22,8 +22,7 @@ class AbstractMqttHandler(threading.Thread):
self.client = mqtt.Client(userdata=self) self.client = mqtt.Client(userdata=self)
# consider this flag in the localLoop # consider this flag in the localLoop
self.killBill = False self.poisonPill = False
self.killEvent = threading.Event()
def run(self): def run(self):
self.client.on_message = mqttOnMessageCallback self.client.on_message = mqttOnMessageCallback
@ -44,9 +43,10 @@ class AbstractMqttHandler(threading.Thread):
def stop(self): def stop(self):
self.client.loop_stop() self.client.loop_stop()
self.client.disconnect()
logger.debug("about to stop loop") logger.debug("about to stop loop")
self.killBill = True self.poisonPill = True
logger.debug("kill flag set") logger.debug("kill flag set")
def onConnect(self): def onConnect(self):

View File

@ -15,6 +15,7 @@ def sendCmd(client, cmd):
} }
topic = f"{TOPIC_PRE}/rig01/cmd" topic = f"{TOPIC_PRE}/rig01/cmd"
client.publish(topic, json.dumps(msg)) client.publish(topic, json.dumps(msg))
logger.debug(topic)
def sendLog(client): def sendLog(client):
msg = { msg = {
@ -30,30 +31,24 @@ def sendLog(client):
} }
topic = f"{TOPIC_PRE}/rig01/log" topic = f"{TOPIC_PRE}/rig01/log"
client.publish(topic, json.dumps(msg)) client.publish(topic, json.dumps(msg))
logger.debug(topic)
def sendValue(client): def sendValue(client):
msg = { msg = {
"ts": str(datetime.now()), "ts": str(datetime.now()),
"dt": 100, "dt": 100,
"v": sample(range(0xffffffff), 1000) "v": sample(range(0xffffffff), 3000)
} }
topic = f"{TOPIC_PRE}/rig01/dev05/md" topic = f"{TOPIC_PRE}/rig01/dev05/md"
client.publish(topic, json.dumps(msg)) client.publish(topic, json.dumps(msg))
logger.debug(topic)
def perform(client, params): def perform(client, params):
logger.info("Start") logger.info("Start")
sendCmd(client, "start") sendCmd(client, "start")
for i in range(10): for i in range(2):
sendValue(client)
sleep(1.0)
sendValue(client)
sleep(1.0)
sendValue(client)
sleep(1.0)
sendValue(client)
sleep(1.0)
sendValue(client) sendValue(client)
sleep(1.0) sleep(1.0)
sendValue(client) sendValue(client)

View File

@ -17,7 +17,7 @@ class GenericMqttSubscriber(AbstractMqttHandler):
self.queue = queue self.queue = queue
def localLoop(self): def localLoop(self):
while not self.killBill: while not self.poisonPill:
sleep(float(1.0)) sleep(float(1.0))
def onMessage(self, topic, payload): def onMessage(self, topic, payload):

53
src/RigCmdHandler.py Normal file
View File

@ -0,0 +1,53 @@
import threading
from loguru import logger
from time import sleep
from DataObject import DataObject
from enum import Enum
POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None)
class e_State(Enum):
IDLE = 0
RUNNING = 1
class RigCmdHandler(threading.Thread):
def __init__(self, config, inQueue):
super().__init__()
self.config = config
self.inQueue = inQueue
self.state = e_State.IDLE
def run(self):
logger.debug("RigCmdHandler loop started")
while True:
dataObject = self.inQueue.get()
if (dataObject == POISON_PILL):
logger.error("RigCmdHandler swallowed the poison pill")
break
logger.info(f"State is {self.state}")
# Python 3.10 is required!!
match self.state:
case e_State.IDLE:
if (dataObject.name == 'cmd') and (dataObject.payload["es"] == 'start'):
logger.info("start command received, switch to RUNNING state")
self.state = e_State.RUNNING
elif (dataObject.name == 'cmd'):
logger.error(f"illegal command {dataObject.name} received in IDLE state")
else:
logger.error("illegal message received in IDLE state")
case e_State.RUNNING:
if (dataObject.name == 'cmd') and (dataObject.payload["es"] == 'stop'):
logger.info("stop command received, switch to IDLE state")
self.state = e_State.IDLE
elif (dataObject.name == 'cmd'):
logger.error(f"illegal command {dataObject.name} received in RUNNING state")
else:
logger.error("illegal message received in RUNNING state")
def stop(self):
self.inQueue.put(POISON_PILL)
logger.debug("kill flag set")

View File

@ -0,0 +1,8 @@
[mqtt]
login=
password=
broker=172.16.2.16
[master]
topic=rd/+/+/cmd
dataObjectName=cmd

View File

@ -3,37 +3,51 @@ import argparse
import configparser import configparser
import threading import threading
from queue import Queue from queue import Queue
import signal
from GenericMqttSubscriber import GenericMqttSubscriber from GenericMqttSubscriber import GenericMqttSubscriber
from RigCmdHandler import RigCmdHandler
deathBell = threading.Event() knell = threading.Event()
def exceptHook(args): def exceptHook(args):
global deathBell global deathBell
logger.error("Exception in thread caught: {}".format(args)) logger.error("Exception in thread caught: {}".format(args))
deathBell.set() knell.set()
logger.error("rang the death bell") logger.error("rang the knell")
def sigintHook(sig, frame):
logger.error("SIGINT received")
knell.set()
logger.error("rang the knell")
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
logger.info("IIoTFeeder2 starting") logger.info("IIoTFeeder2 starting")
parser = argparse.ArgumentParser(description="IIoTFeeder2") parser = argparse.ArgumentParser(description="IIoTFeeder2")
parser.add_argument('--config', '-f', parser.add_argument('--config', '-f',
help='Config file, default is $pwd/config/config.ini', help='Config file, default is $pwd/config/iiotFeederConfig.ini',
required=False, required=False,
default='./config/config.ini') default='./config/iiotFeederConfig.ini')
args = parser.parse_args() args = parser.parse_args()
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(args.config) config.read(args.config)
signal.signal(signal.SIGINT, sigintHook)
logger.debug("SIGINT hook set")
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
masterQueue = Queue() masterQueue = Queue()
masterSubscriber = GenericMqttSubscriber(config, "master", masterQueue) masterSubscriber = GenericMqttSubscriber(config, "master", masterQueue)
masterSubscriber.start() masterSubscriber.start()
logger.debug("MasterSubscriber started") logger.debug("MasterSubscriber started")
rigCmdHandler = RigCmdHandler(config, masterQueue)
rigCmdHandler.start()
logger.debug("RigCmdHandler started")
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
threading.excepthook = exceptHook threading.excepthook = exceptHook
logger.debug("Threading excepthook set") logger.debug("Threading excepthook set")
@ -41,16 +55,19 @@ logger.debug("Threading excepthook set")
logger.debug("IIoTFeeder2 running") logger.debug("IIoTFeeder2 running")
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
while not deathBell.wait(1.0): while not knell.wait(1.0):
logger.debug("master queue size {}".format(masterQueue.qsize())) logger.debug("master queue size {}".format(masterQueue.qsize()))
logger.error("IIoTFeeder2 is dying") logger.error("IIoTFeeder2 is dying")
masterSubscriber.stop() masterSubscriber.stop()
masterSubscriber.join() masterSubscriber.join()
logger.error("masterSubscriber joined") logger.error("masterSubscriber joined")
rigCmdHandler.stop()
rigCmdHandler.join()
logger.error("rigCmdHandler joined")
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
logger.info("IIoTFeeder2 to terminate") logger.info("IIoTFeeder2 to terminate")