From 51bc396c3913366a1a54fdcd03bcd6fd0fc8ef20 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 20:25:32 +0100 Subject: [PATCH] done so far --- opcua2mqtt/OpcUaRequester.py | 22 ++++++++++++++-------- opcua2mqtt/bridge.py | 27 +++++++++++++++++---------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py index 703d50b..665aa2d 100644 --- a/opcua2mqtt/OpcUaRequester.py +++ b/opcua2mqtt/OpcUaRequester.py @@ -6,27 +6,33 @@ from DataObject import DataObject class OpcUaRequester(threading.Thread): - def __init__(self, config, name, queue): + def __init__(self, config, queue): super().__init__() - self.config = config["opcua"][name] + self.config = config self.queue = queue + self.name = self.config['name'] + self.url = self.config['url'] + self.nodes = self.config['nodes'] + self.delay = self.config['delay'] + # consider this flag in the localLoop self.killBill = False self.killEvent = threading.Event() async def opcUaRequesterInnerLoop(self): - while True: - async with Client(url=URL, timeout=10.0) as client: - for nodeSpec in NODE_SPECS: + while not self.killBill: + async with Client(url=self.url, timeout=10.0) as client: + for nodeSpec in self.nodes: + logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}") value = await node.read_value() displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text - print(f"{displayName=} = {value=}") - self.queue.put(DataObject(SERVER, nodeSpec['ns'], displayName, value)) - await asyncio.sleep(DELAY) + logger.debug(f"Got: {displayName=} = {value=}") + self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value)) + await asyncio.sleep(self.delay) def run(self): loop = asyncio.new_event_loop() diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py index 87854fc..6370900 100644 --- a/opcua2mqtt/bridge.py +++ b/opcua2mqtt/bridge.py @@ -2,7 +2,7 @@ from MqttPublish import MqttPublish from OpcUaRequester import OpcUaRequester from loguru import logger import argparse -import configparser +import json import threading import queue @@ -24,8 +24,8 @@ parser.add_argument('--config', '-f', default='./config.json') args = parser.parse_args() -config = configparser.ConfigParser() -config.read(args.config) +with open(args.config) as f: + config = json.load(f) queue = queue.Queue() @@ -34,9 +34,12 @@ publishThread = MqttPublish(config, queue) publishThread.start() logger.info("MqttPublish started") -opcuaThread = OpcUaRequester(config, queue) -opcuaThread.start() -logger.info("OpcUaRequest started") +opcuaThreads = [] +for o in config['opcua']: + ot = OpcUaRequester(o, queue) + ot.start() + logger.info(f"OpcUaRequester thread for {o['name']} started") + opcuaThreads.append(ot) threading.excepthook = exceptHook logger.info("Threading excepthook set") @@ -49,12 +52,16 @@ logger.error("opcua2mqtt bridge is dying") publishThread.stop() logger.error("publishThread stopped") -opcuaThread.stop() -logger.error("opcuaThread stopped") + +for ot in opcuaThreads: + ot.stop() + logger.error(f"opcua thread {ot.name} stopped") publishThread.join() logger.error("publishThread joined") -opcuaThread.join() -logger.error("opcuaThread joined") + +for ot in opcuaThreads: + ot.join() + logger.error(f"opcua thread {ot.name} joined") logger.error("opcua2mqtt bridge is terminated")