Wolfgang Hottgenroth 695594fd1c subscriber
2022-03-03 16:54:50 +01:00

101 lines
2.4 KiB
Python

from MqttPublish import MqttPublish
from Statistics import StatisticsCollector
from loguru import logger
import argparse
import json
import threading
import queue
import signal
deathBell = threading.Event()
def exceptHook(args):
global deathBell
logger.error("Exception in thread caught: {}".format(args))
deathBell.set()
logger.error("rang the death bell")
def terminateHook(sig, frame):
global deathBell
logger.error("SIGINT received")
deathBell.set()
logger.error("rang the death bell")
logger.info("opcua2mqtt bridge starting")
parser = argparse.ArgumentParser(description="opcua2mqtt")
parser.add_argument('--config', '-f',
help='Config file, default is $pwd/config.json',
required=False,
default='./config.json')
args = parser.parse_args()
with open(args.config) as f:
config = json.load(f)
match config['opcua']['type']:
case 'requester':
logger.info("Loading OpcUaRequester")
from OpcUaRequester import OpcUaRequester as OpcUa
case 'subscriber':
logger.info("Loading OpcUaSubscriber")
from OpcUaSubscriber import OpcUaSubscriber as OpcUa
case _:
raise Exception("unknown OpcUa type")
queue = queue.Queue()
threads = []
try:
statsThread = StatisticsCollector(config, queue)
statsThread.start()
threads.append(statsThread)
logger.info("StatisticsCollector started")
publishThread = MqttPublish(config, statsThread, queue)
publishThread.start()
threads.append(publishThread)
logger.info("MqttPublish started")
opcuaThreads = []
for o in config['opcua']["servers"]:
if o['enabled'] != 'true':
continue
ot = OpcUa(o, statsThread, queue)
ot.start()
logger.info(f"OpcUaRequester thread for {o['name']} started")
threads.append(ot)
except Exception as e:
logger.error(f"caught exception {type(e)}, {e} during start-up phase")
deathBell.set()
threading.excepthook = exceptHook
logger.info("Threading excepthook set")
signal.signal(signal.SIGINT, terminateHook)
logger.info("SIGINT handler set")
logger.info("opcua2mqtt bridge is running")
deathBell.wait()
logger.error("opcua2mqtt bridge is dying")
for t in threads:
t.stop()
logger.error(f"thread {t.name} stopped")
for t in threads:
t.join()
logger.error(f"thread {t.name} joined")
logger.error("opcua2mqtt bridge is terminated")