101 lines
2.4 KiB
Python
101 lines
2.4 KiB
Python
from MqttPublish import MqttPublish
|
|
from Statistics import StatisticsCollector
|
|
|
|
from loguru import logger
|
|
import argparse
|
|
import json
|
|
import threading
|
|
import queue
|
|
import signal
|
|
|
|
|
|
deathBell = threading.Event()
|
|
|
|
|
|
def exceptHook(args):
|
|
global deathBell
|
|
logger.error("Exception in thread caught: {}".format(args))
|
|
deathBell.set()
|
|
logger.error("rang the death bell")
|
|
|
|
|
|
def terminateHook(sig, frame):
|
|
global deathBell
|
|
logger.error("SIGINT received")
|
|
deathBell.set()
|
|
logger.error("rang the death bell")
|
|
|
|
|
|
logger.info("opcua2mqtt bridge starting")
|
|
|
|
parser = argparse.ArgumentParser(description="opcua2mqtt")
|
|
parser.add_argument('--config', '-f',
|
|
help='Config file, default is $pwd/config.json',
|
|
required=False,
|
|
default='./config.json')
|
|
args = parser.parse_args()
|
|
|
|
with open(args.config) as f:
|
|
config = json.load(f)
|
|
|
|
|
|
match config['opcua']['type']:
|
|
case 'requester':
|
|
logger.info("Loading OpcUaRequester")
|
|
from OpcUaRequester import OpcUaRequester as OpcUa
|
|
case 'subscriber':
|
|
logger.info("Loading OpcUaSubscriber")
|
|
from OpcUaSubscriber import OpcUaSubscriber as OpcUa
|
|
case _:
|
|
raise Exception("unknown OpcUa type")
|
|
|
|
|
|
queue = queue.Queue()
|
|
|
|
threads = []
|
|
try:
|
|
statsThread = StatisticsCollector(config, queue)
|
|
statsThread.start()
|
|
threads.append(statsThread)
|
|
logger.info("StatisticsCollector started")
|
|
|
|
publishThread = MqttPublish(config, statsThread, queue)
|
|
publishThread.start()
|
|
threads.append(publishThread)
|
|
logger.info("MqttPublish started")
|
|
|
|
|
|
opcuaThreads = []
|
|
for o in config['opcua']["servers"]:
|
|
if o['enabled'] != 'true':
|
|
continue
|
|
ot = OpcUa(o, statsThread, queue)
|
|
ot.start()
|
|
logger.info(f"OpcUaRequester thread for {o['name']} started")
|
|
threads.append(ot)
|
|
except Exception as e:
|
|
logger.error(f"caught exception {type(e)}, {e} during start-up phase")
|
|
deathBell.set()
|
|
|
|
threading.excepthook = exceptHook
|
|
logger.info("Threading excepthook set")
|
|
|
|
signal.signal(signal.SIGINT, terminateHook)
|
|
logger.info("SIGINT handler set")
|
|
|
|
logger.info("opcua2mqtt bridge is running")
|
|
|
|
|
|
deathBell.wait()
|
|
logger.error("opcua2mqtt bridge is dying")
|
|
|
|
for t in threads:
|
|
t.stop()
|
|
logger.error(f"thread {t.name} stopped")
|
|
|
|
for t in threads:
|
|
t.join()
|
|
logger.error(f"thread {t.name} joined")
|
|
|
|
logger.error("opcua2mqtt bridge is terminated")
|