From 41c31249cff9d62ac305ea62cf19d1217d13a3f3 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 19:46:15 +0100 Subject: [PATCH 01/15] changes --- .gitignore | 1 + opcua2mqtt/DataObject.py | 24 ++++++++++++ opcua2mqtt/MqttBase.py | 75 ++++++++++++++++++++++++++++++++++++ opcua2mqtt/MqttPublish.py | 30 +++++++++++++++ opcua2mqtt/OpcUaRequester.py | 43 +++++++++++++++++++++ opcua2mqtt/bridge.py | 60 +++++++++++++++++++++++++++++ opcua2mqtt/config.json | 22 +++++++++++ 7 files changed, 255 insertions(+) create mode 100644 .gitignore create mode 100644 opcua2mqtt/DataObject.py create mode 100644 opcua2mqtt/MqttBase.py create mode 100644 opcua2mqtt/MqttPublish.py create mode 100644 opcua2mqtt/OpcUaRequester.py create mode 100644 opcua2mqtt/bridge.py create mode 100644 opcua2mqtt/config.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c01bd8c --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*/__pycache__/ diff --git a/opcua2mqtt/DataObject.py b/opcua2mqtt/DataObject.py new file mode 100644 index 0000000..0653afd --- /dev/null +++ b/opcua2mqtt/DataObject.py @@ -0,0 +1,24 @@ +import re + + +class InvalidDataObjectException(Exception): + def __init__(self, message): + super().__init__(message) + +class DataObject(object): + invalidChars = re.compile("[#+/\s]") + + def __init__(self, serverName, nameSpaceIndex, variableName, value): + self.serverName = serverName + self.nameSpaceIndex = nameSpaceIndex + self.variableName = variableName + self.value = value + + def isValid(self): + return (not (DataObject.invalidChars.search(self.serverName) or DataObject.invalidChars.search(self.variableName))) and (type(self.nameSpaceIndex) == int) + + def getTopicPost(self): + return self.serverName + '/' + str(self.nameSpaceIndex) + '/' + self.variableName + + def getPayload(self): + return self.value \ No newline at end of file diff --git a/opcua2mqtt/MqttBase.py b/opcua2mqtt/MqttBase.py new file mode 100644 index 0000000..6033846 --- /dev/null +++ b/opcua2mqtt/MqttBase.py @@ -0,0 +1,75 @@ +import paho.mqtt.client as mqtt +from loguru import logger +import threading +import ssl + + + +def mqttOnConnectCallback(client, userdata, flags, rc): + userdata.onConnect() + +def mqttOnMessageCallback(client, userdata, message): + userdata.onMessage(message.topic, message.payload) + +def mqttOnDisconnectCallback(client, userdata, rc): + userdata.onDisconnect(rc) + + +class AbstractMqttPublisher(threading.Thread): + def __init__(self, config): + super().__init__() + + self.config = config["mqtt"] + + self.client = mqtt.Client(userdata=self) + + # consider this flag in the localLoop + self.killBill = False + self.killEvent = threading.Event() + + def run(self): + self.client.on_message = mqttOnMessageCallback + self.client.on_connect = mqttOnConnectCallback + self.client.on_disconnect = mqttOnDisconnectCallback + + if ("login" in self.config) and ("password" in self.config): + self.client.username_pw_set(self.config["login"], self.config["password"]) + + if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config): + self.client.tls_set( + ca_certs=self.config["ca"], + certfile=self.config["cert"], + keyfile=self.config["key"], + cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLSv1_2, + ciphers=None # this does not mean "no cipher" but it means "default ciphers" + ) + + self.client.connect(self.config["broker"], int(self.config["port"])) + self.client.loop_start() + logger.info("mqtt loop started") + + self.localLoop() + + def localLoop(self): + raise NotImplementedError() + + def stop(self): + self.client.loop_stop() + logger.info("mqtt loop stopped") + + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + logger.info("kill events triggered") + + def onConnect(self): + logger.info("mqtt connected") + + def onDisconnect(self, rc): + logger.warning("mqtt disconnect, rc: {}".format(rc)) + + def onMessage(self, topic, payload): + logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload))) + diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py new file mode 100644 index 0000000..cbb2f33 --- /dev/null +++ b/opcua2mqtt/MqttPublish.py @@ -0,0 +1,30 @@ +from threading import Event +from loguru import logger +from MqttBase import AbstractMqttPublisher +from queue import Empty + + +LOOP_SLICE = 0.1 # seconds + +class MqttPublish(AbstractMqttPublisher): + def __init__(self, config, queue): + super().__init__(config) + self.queue = queue + self.topicPre = self.config["publishTopicPrefix"] + + def localLoop(self): + while not self.killBill: + try: + dataObject = self.queue.get(timeout=LOOP_SLICE) + if not dataObject.isValid(): + logger.error("invalid dataObject received: drop it") + else: + topic = self.topicPre + '/' + dataObject.getTopicPost() + payload = dataObject.getPayload() + self.client.publish(topic, payload) + logger.info("mqtt message sent: {} -> {}".format(topic, payload)) + except Empty: + if self.killBill: + logger.error("killbill received") + break + diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py new file mode 100644 index 0000000..703d50b --- /dev/null +++ b/opcua2mqtt/OpcUaRequester.py @@ -0,0 +1,43 @@ +import asyncio +from asyncua import Client +import threading +from loguru import logger +from DataObject import DataObject + + +class OpcUaRequester(threading.Thread): + def __init__(self, config, name, queue): + super().__init__() + + self.config = config["opcua"][name] + self.queue = queue + + # consider this flag in the localLoop + self.killBill = False + self.killEvent = threading.Event() + + + async def opcUaRequesterInnerLoop(self): + while True: + async with Client(url=URL, timeout=10.0) as client: + for nodeSpec in NODE_SPECS: + node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}") + value = await node.read_value() + displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text + print(f"{displayName=} = {value=}") + self.queue.put(DataObject(SERVER, nodeSpec['ns'], displayName, value)) + await asyncio.sleep(DELAY) + + def run(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.opcUaRequesterInnerLoop()) + loop.close() + + def stop(self): + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + logger.info("kill events triggered") + diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py new file mode 100644 index 0000000..87854fc --- /dev/null +++ b/opcua2mqtt/bridge.py @@ -0,0 +1,60 @@ +from MqttPublish import MqttPublish +from OpcUaRequester import OpcUaRequester +from loguru import logger +import argparse +import configparser +import threading +import queue + +deathBell = threading.Event() + +def exceptHook(args): + global deathBell + logger.error("Exception in thread caught: {}".format(args)) + deathBell.set() + logger.error("rang the death bell") + + +logger.info("opcua2mqtt bridge starting") + +parser = argparse.ArgumentParser(description="example1") +parser.add_argument('--config', '-f', + help='Config file, default is $pwd/config.json', + required=False, + default='./config.json') +args = parser.parse_args() + +config = configparser.ConfigParser() +config.read(args.config) + + +queue = queue.Queue() + +publishThread = MqttPublish(config, queue) +publishThread.start() +logger.info("MqttPublish started") + +opcuaThread = OpcUaRequester(config, queue) +opcuaThread.start() +logger.info("OpcUaRequest started") + +threading.excepthook = exceptHook +logger.info("Threading excepthook set") + +logger.info("opcua2mqtt bridge is running") + + +deathBell.wait() +logger.error("opcua2mqtt bridge is dying") + +publishThread.stop() +logger.error("publishThread stopped") +opcuaThread.stop() +logger.error("opcuaThread stopped") + +publishThread.join() +logger.error("publishThread joined") +opcuaThread.join() +logger.error("opcuaThread joined") + +logger.error("opcua2mqtt bridge is terminated") diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json new file mode 100644 index 0000000..967d4dd --- /dev/null +++ b/opcua2mqtt/config.json @@ -0,0 +1,22 @@ +{ + "mqtt": { + "broker": "172.16.2.16", + "port": 1883, + "publishTopicPrefix": "opcua" + }, + "opcua": [ + { + "nodes": [ + { "ns": 0, "n": "i=345", "d": "pv" }, + { "ns": 0, "n": "i=348", "d": "sv" }, + { "ns": 0, "n": "i=351", "d": "tv" }, + { "ns": 0, "n": "i=354", "d": "qv" } + ], + "url": "opc.tcp://172.16.3.60:4840", + "name": "apl", + "delay": 1.0 + } + ] +} + + From 51bc396c3913366a1a54fdcd03bcd6fd0fc8ef20 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 20:25:32 +0100 Subject: [PATCH 02/15] done so far --- opcua2mqtt/OpcUaRequester.py | 22 ++++++++++++++-------- opcua2mqtt/bridge.py | 27 +++++++++++++++++---------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py index 703d50b..665aa2d 100644 --- a/opcua2mqtt/OpcUaRequester.py +++ b/opcua2mqtt/OpcUaRequester.py @@ -6,27 +6,33 @@ from DataObject import DataObject class OpcUaRequester(threading.Thread): - def __init__(self, config, name, queue): + def __init__(self, config, queue): super().__init__() - self.config = config["opcua"][name] + self.config = config self.queue = queue + self.name = self.config['name'] + self.url = self.config['url'] + self.nodes = self.config['nodes'] + self.delay = self.config['delay'] + # consider this flag in the localLoop self.killBill = False self.killEvent = threading.Event() async def opcUaRequesterInnerLoop(self): - while True: - async with Client(url=URL, timeout=10.0) as client: - for nodeSpec in NODE_SPECS: + while not self.killBill: + async with Client(url=self.url, timeout=10.0) as client: + for nodeSpec in self.nodes: + logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}") value = await node.read_value() displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text - print(f"{displayName=} = {value=}") - self.queue.put(DataObject(SERVER, nodeSpec['ns'], displayName, value)) - await asyncio.sleep(DELAY) + logger.debug(f"Got: {displayName=} = {value=}") + self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value)) + await asyncio.sleep(self.delay) def run(self): loop = asyncio.new_event_loop() diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py index 87854fc..6370900 100644 --- a/opcua2mqtt/bridge.py +++ b/opcua2mqtt/bridge.py @@ -2,7 +2,7 @@ from MqttPublish import MqttPublish from OpcUaRequester import OpcUaRequester from loguru import logger import argparse -import configparser +import json import threading import queue @@ -24,8 +24,8 @@ parser.add_argument('--config', '-f', default='./config.json') args = parser.parse_args() -config = configparser.ConfigParser() -config.read(args.config) +with open(args.config) as f: + config = json.load(f) queue = queue.Queue() @@ -34,9 +34,12 @@ publishThread = MqttPublish(config, queue) publishThread.start() logger.info("MqttPublish started") -opcuaThread = OpcUaRequester(config, queue) -opcuaThread.start() -logger.info("OpcUaRequest started") +opcuaThreads = [] +for o in config['opcua']: + ot = OpcUaRequester(o, queue) + ot.start() + logger.info(f"OpcUaRequester thread for {o['name']} started") + opcuaThreads.append(ot) threading.excepthook = exceptHook logger.info("Threading excepthook set") @@ -49,12 +52,16 @@ logger.error("opcua2mqtt bridge is dying") publishThread.stop() logger.error("publishThread stopped") -opcuaThread.stop() -logger.error("opcuaThread stopped") + +for ot in opcuaThreads: + ot.stop() + logger.error(f"opcua thread {ot.name} stopped") publishThread.join() logger.error("publishThread joined") -opcuaThread.join() -logger.error("opcuaThread joined") + +for ot in opcuaThreads: + ot.join() + logger.error(f"opcua thread {ot.name} joined") logger.error("opcua2mqtt bridge is terminated") From 8e0a525bebebed812eb0e227ebf3909cb780d337 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 22:13:19 +0100 Subject: [PATCH 03/15] debug --- opcua2mqtt/MqttPublish.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py index cbb2f33..da4b46d 100644 --- a/opcua2mqtt/MqttPublish.py +++ b/opcua2mqtt/MqttPublish.py @@ -22,7 +22,7 @@ class MqttPublish(AbstractMqttPublisher): topic = self.topicPre + '/' + dataObject.getTopicPost() payload = dataObject.getPayload() self.client.publish(topic, payload) - logger.info("mqtt message sent: {} -> {}".format(topic, payload)) + logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) except Empty: if self.killBill: logger.error("killbill received") From 8a32e05673f423074d66ef967b32e97beb330dca Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 22:28:36 +0100 Subject: [PATCH 04/15] sigint handler added --- opcua2mqtt/bridge.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py index 6370900..a941130 100644 --- a/opcua2mqtt/bridge.py +++ b/opcua2mqtt/bridge.py @@ -5,6 +5,8 @@ import argparse import json import threading import queue +import signal + deathBell = threading.Event() @@ -14,6 +16,12 @@ def exceptHook(args): deathBell.set() logger.error("rang the death bell") +def terminateHook(sig, frame): + global deathBell + logger.error("SIGINT received") + deathBell.set() + logger.error("rang the death bell") + logger.info("opcua2mqtt bridge starting") @@ -44,6 +52,9 @@ for o in config['opcua']: threading.excepthook = exceptHook logger.info("Threading excepthook set") +signal.signal(signal.SIGINT, terminateHook) +logger.info("SIGINT handler set") + logger.info("opcua2mqtt bridge is running") From 91039460fd4b8334457e257a5edce8b1fc25db2d Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 22:30:48 +0100 Subject: [PATCH 05/15] enabled flag in opcua configuration --- opcua2mqtt/bridge.py | 2 ++ opcua2mqtt/config.json | 1 + 2 files changed, 3 insertions(+) diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py index a941130..d6ee76f 100644 --- a/opcua2mqtt/bridge.py +++ b/opcua2mqtt/bridge.py @@ -44,6 +44,8 @@ logger.info("MqttPublish started") opcuaThreads = [] for o in config['opcua']: + if o['enabled'] != 'true': + continue ot = OpcUaRequester(o, queue) ot.start() logger.info(f"OpcUaRequester thread for {o['name']} started") diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json index 967d4dd..c1168d0 100644 --- a/opcua2mqtt/config.json +++ b/opcua2mqtt/config.json @@ -6,6 +6,7 @@ }, "opcua": [ { + "enabled": "true", "nodes": [ { "ns": 0, "n": "i=345", "d": "pv" }, { "ns": 0, "n": "i=348", "d": "sv" }, From 34be24869a687cee3405a4f87b82a456f48ba5a2 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 22:45:13 +0100 Subject: [PATCH 06/15] add sh config --- opcua2mqtt/config.json | 46 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json index c1168d0..5b62da6 100644 --- a/opcua2mqtt/config.json +++ b/opcua2mqtt/config.json @@ -7,15 +7,53 @@ "opcua": [ { "enabled": "true", + "url": "opc.tcp://172.16.3.60:4840", + "name": "apl", + "delay": 1.0, "nodes": [ { "ns": 0, "n": "i=345", "d": "pv" }, { "ns": 0, "n": "i=348", "d": "sv" }, { "ns": 0, "n": "i=351", "d": "tv" }, { "ns": 0, "n": "i=354", "d": "qv" } - ], - "url": "opc.tcp://172.16.3.60:4840", - "name": "apl", - "delay": 1.0 + ] + }, + { + "enabled": "false", + "url": "opc.tcp://192.168.254.5:4863", + "name": "sh", + "delay": 1.0, + "nodes": [ + { "ns": 1, "n": "s=A201CD124/MOT_01.AV_Out#Value", "d": "A201CD124" }, + { "ns": 1, "n": "s=A201CJ003/PID_01.PV_Out#Value", "d": "A201CJ003" }, + { "ns": 1, "n": "s=A201CJ004/PID_01.PV_Out#Value", "d": "A201CJ004" }, + { "ns": 1, "n": "s=A201CJ011/PID_01.PV_Out#Value", "d": "A201CJ011" }, + { "ns": 1, "n": "s=A201CJ014/PID_01.PV_Out#Value", "d": "A201CJ014" }, + { "ns": 1, "n": "s=A201CJ021/MMON_01.PV_Out#Value", "d": "A201CJ021" }, + { "ns": 1, "n": "s=A201CJ022/MMON_01.PV_Out#Value", "d": "A201CJ022" }, + { "ns": 1, "n": "s=A201CJ023/MMON_01.PV_Out#Value", "d": "A201CJ023" }, + { "ns": 1, "n": "s=A201CJ024/PID_01.PV_Out#Value", "d": "A201CJ024" }, + { "ns": 1, "n": "s=A201CJ025/PID_01.PV_Out#Value", "d": "A201CJ025" }, + { "ns": 1, "n": "s=A201CD123/MOT_01.AV_Out#Value", "d": "A201CD123" }, + { "ns": 1, "n": "s=A201CD121/MOT_01.AV_Out#Value", "d": "A201CD121" }, + { "ns": 1, "n": "s=A212DD110/MOT_01.AV_Out#Value", "d": "A212DD110" }, + { "ns": 1, "n": "s=A212DD130/MOT_01.AV_Out#Value", "d": "A212DD130" }, + { "ns": 1, "n": "s=A212DD131/MOT_01.AV_Out#Value", "d": "A212DD131" }, + { "ns": 1, "n": "s=A212DD111/MOT_01.AV_Out#Value", "d": "A212DD111" }, + { "ns": 1, "n": "s=A212DD113/MOT_01.AV_Out#Value", "d": "A212DD113" }, + { "ns": 1, "n": "s=A212DJ004/PID_01.PV_Out#Value", "d": "A212DJ004" }, + { "ns": 1, "n": "s=A212DJ021/PID_01.PV_Out#Value", "d": "A212DJ021" }, + { "ns": 1, "n": "s=A212DJ001/PID_01.PV_Out#Value", "d": "A212DJ001" }, + { "ns": 1, "n": "s=A212DJ011/PID_01.PV_Out#Value", "d": "A212DJ011" }, + { "ns": 1, "n": "s=A212DJ032/MMON_01.PV_Out#Value", "d": "A212DJ032" }, + { "ns": 1, "n": "s=A212DJ031/MMON_01.PV_Out#Value", "d": "A212DJ031" }, + { "ns": 1, "n": "s=A212DJ033/MMON_01.PV_Out#Value", "d": "A212DJ033" }, + { "ns": 1, "n": "s=A212DJ010/MMON_01.PV_Out#Value", "d": "A212DJ010" }, + { "ns": 1, "n": "s=A212DJ042/MMON_01.PV_Out#Value", "d": "A212DJ042" }, + { "ns": 1, "n": "s=A214BJ055/PID_01.PV_Out#Value", "d": "A214BJ055" }, + { "ns": 1, "n": "s=A214BJ065/PID_01.PV_Out#Value", "d": "A214BJ065" }, + { "ns": 1, "n": "s=A212BJ010/MMON_01.PV_Out#Value", "d": "A212BJ010" }, + { "ns": 1, "n": "s=A212BJ010/MMON_02.PV_Out#Value", "d": "A212BJ010" } + ] } ] } From 0e5a22a481b517084f4648c374b1e4edbf9c48cc Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 22:56:20 +0100 Subject: [PATCH 07/15] improve error handling --- opcua2mqtt/OpcUaRequester.py | 26 ++++++++++++++++---------- opcua2mqtt/config.json | 2 +- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py index 665aa2d..27ec40e 100644 --- a/opcua2mqtt/OpcUaRequester.py +++ b/opcua2mqtt/OpcUaRequester.py @@ -1,5 +1,5 @@ import asyncio -from asyncua import Client +from asyncua import Client, ua import threading from loguru import logger from DataObject import DataObject @@ -24,15 +24,21 @@ class OpcUaRequester(threading.Thread): async def opcUaRequesterInnerLoop(self): while not self.killBill: - async with Client(url=self.url, timeout=10.0) as client: - for nodeSpec in self.nodes: - logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") - node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}") - value = await node.read_value() - displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text - logger.debug(f"Got: {displayName=} = {value=}") - self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value)) - await asyncio.sleep(self.delay) + try: + async with Client(url=self.url, timeout=10.0) as client: + for nodeSpec in self.nodes: + try: + logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") + node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}") + value = await node.read_value() + displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text + logger.debug(f"Got: {displayName=} = {value=}") + self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value)) + except ua.UaError as e: + logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}") + await asyncio.sleep(self.delay) + except TimeoutError as e: + logger.error(f"Timeout in inner OPC-UA loop") def run(self): loop = asyncio.new_event_loop() diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json index 5b62da6..46eb227 100644 --- a/opcua2mqtt/config.json +++ b/opcua2mqtt/config.json @@ -14,7 +14,7 @@ { "ns": 0, "n": "i=345", "d": "pv" }, { "ns": 0, "n": "i=348", "d": "sv" }, { "ns": 0, "n": "i=351", "d": "tv" }, - { "ns": 0, "n": "i=354", "d": "qv" } + { "ns": 0, "n": "i=35400", "d": "qv" } ] }, { From 04b48f75a302775774e355f72ce4b4ee0dac654a Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 23:03:11 +0100 Subject: [PATCH 08/15] publish json data --- opcua2mqtt/DataObject.py | 10 ++++++++-- opcua2mqtt/OpcUaRequester.py | 3 ++- opcua2mqtt/config.json | 4 +++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/opcua2mqtt/DataObject.py b/opcua2mqtt/DataObject.py index 0653afd..b7628f2 100644 --- a/opcua2mqtt/DataObject.py +++ b/opcua2mqtt/DataObject.py @@ -1,5 +1,5 @@ import re - +import json class InvalidDataObjectException(Exception): def __init__(self, message): @@ -21,4 +21,10 @@ class DataObject(object): return self.serverName + '/' + str(self.nameSpaceIndex) + '/' + self.variableName def getPayload(self): - return self.value \ No newline at end of file + payload = { + "serverName": self.serverName, + "nameSpaceIndex": self.nameSpaceIndex, + "variableName": self.variableName, + "value": self.value + } + return json.dumps(payload) \ No newline at end of file diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py index 27ec40e..d333652 100644 --- a/opcua2mqtt/OpcUaRequester.py +++ b/opcua2mqtt/OpcUaRequester.py @@ -16,6 +16,7 @@ class OpcUaRequester(threading.Thread): self.url = self.config['url'] self.nodes = self.config['nodes'] self.delay = self.config['delay'] + self.timeout = self.config['timeout'] # consider this flag in the localLoop self.killBill = False @@ -25,7 +26,7 @@ class OpcUaRequester(threading.Thread): async def opcUaRequesterInnerLoop(self): while not self.killBill: try: - async with Client(url=self.url, timeout=10.0) as client: + async with Client(url=self.url, timeout=self.timeout) as client: for nodeSpec in self.nodes: try: logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json index 46eb227..8b93b71 100644 --- a/opcua2mqtt/config.json +++ b/opcua2mqtt/config.json @@ -10,11 +10,12 @@ "url": "opc.tcp://172.16.3.60:4840", "name": "apl", "delay": 1.0, + "timeout": 1.0, "nodes": [ { "ns": 0, "n": "i=345", "d": "pv" }, { "ns": 0, "n": "i=348", "d": "sv" }, { "ns": 0, "n": "i=351", "d": "tv" }, - { "ns": 0, "n": "i=35400", "d": "qv" } + { "ns": 0, "n": "i=354", "d": "qv" } ] }, { @@ -22,6 +23,7 @@ "url": "opc.tcp://192.168.254.5:4863", "name": "sh", "delay": 1.0, + "timeout": 10.0, "nodes": [ { "ns": 1, "n": "s=A201CD124/MOT_01.AV_Out#Value", "d": "A201CD124" }, { "ns": 1, "n": "s=A201CJ003/PID_01.PV_Out#Value", "d": "A201CJ003" }, From afaccc96d6ed75b73ddde1d20fdf8025671a92a3 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 23:06:43 +0100 Subject: [PATCH 09/15] more error handling --- opcua2mqtt/MqttPublish.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py index da4b46d..3df8211 100644 --- a/opcua2mqtt/MqttPublish.py +++ b/opcua2mqtt/MqttPublish.py @@ -27,4 +27,6 @@ class MqttPublish(AbstractMqttPublisher): if self.killBill: logger.error("killbill received") break + except Exception as e: + logger.error(f"Exception {type(e)} received in MQTT local loop: {e}") From a403df5fadb312d27a5fb1b84e5d29e58d955897 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 23:08:53 +0100 Subject: [PATCH 10/15] help message fix --- opcua2mqtt/bridge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py index d6ee76f..b8a7449 100644 --- a/opcua2mqtt/bridge.py +++ b/opcua2mqtt/bridge.py @@ -25,7 +25,7 @@ def terminateHook(sig, frame): logger.info("opcua2mqtt bridge starting") -parser = argparse.ArgumentParser(description="example1") +parser = argparse.ArgumentParser(description="opcua2mqtt") parser.add_argument('--config', '-f', help='Config file, default is $pwd/config.json', required=False, From bc28cc60246a8359a9a2f810b3ea247848afea5c Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 23:19:49 +0100 Subject: [PATCH 11/15] add status message on mqtt --- opcua2mqtt/MqttPublish.py | 8 ++++++++ opcua2mqtt/config.json | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py index 3df8211..a77b07d 100644 --- a/opcua2mqtt/MqttPublish.py +++ b/opcua2mqtt/MqttPublish.py @@ -2,6 +2,7 @@ from threading import Event from loguru import logger from MqttBase import AbstractMqttPublisher from queue import Empty +import json LOOP_SLICE = 0.1 # seconds @@ -11,8 +12,11 @@ class MqttPublish(AbstractMqttPublisher): super().__init__(config) self.queue = queue self.topicPre = self.config["publishTopicPrefix"] + self.statusTopic = self.config["statusTopic"] + self.statusThreshold = self.config["statusThreshold"] def localLoop(self): + cnt = 0 while not self.killBill: try: dataObject = self.queue.get(timeout=LOOP_SLICE) @@ -22,7 +26,11 @@ class MqttPublish(AbstractMqttPublisher): topic = self.topicPre + '/' + dataObject.getTopicPost() payload = dataObject.getPayload() self.client.publish(topic, payload) + cnt += 1 logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) + if cnt % 100 == 0: + payload = { "count": cnt } + self.client.publish(self.statusTopic, json.dumps(payload), retain=True) except Empty: if self.killBill: logger.error("killbill received") diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json index 8b93b71..a5e0f07 100644 --- a/opcua2mqtt/config.json +++ b/opcua2mqtt/config.json @@ -2,7 +2,9 @@ "mqtt": { "broker": "172.16.2.16", "port": 1883, - "publishTopicPrefix": "opcua" + "publishTopicPrefix": "opcua", + "statusTopic": "opcua/bridge/status", + "statusThreshold": 100 }, "opcua": [ { From 4ec1d1441c9910a77ccc82fa41c261933c5a71d0 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 23:20:55 +0100 Subject: [PATCH 12/15] add status message on mqtt --- opcua2mqtt/MqttPublish.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py index a77b07d..c03d976 100644 --- a/opcua2mqtt/MqttPublish.py +++ b/opcua2mqtt/MqttPublish.py @@ -28,7 +28,7 @@ class MqttPublish(AbstractMqttPublisher): self.client.publish(topic, payload) cnt += 1 logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) - if cnt % 100 == 0: + if cnt % 100 == 1: payload = { "count": cnt } self.client.publish(self.statusTopic, json.dumps(payload), retain=True) except Empty: From 008713167de17caf9c44b0c9ddaadc86e862b6d4 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 23:29:57 +0100 Subject: [PATCH 13/15] uptime in status message --- opcua2mqtt/MqttPublish.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py index c03d976..3714901 100644 --- a/opcua2mqtt/MqttPublish.py +++ b/opcua2mqtt/MqttPublish.py @@ -3,7 +3,7 @@ from loguru import logger from MqttBase import AbstractMqttPublisher from queue import Empty import json - +import datetime LOOP_SLICE = 0.1 # seconds @@ -17,6 +17,7 @@ class MqttPublish(AbstractMqttPublisher): def localLoop(self): cnt = 0 + startTime = datetime.datetime.now() while not self.killBill: try: dataObject = self.queue.get(timeout=LOOP_SLICE) @@ -29,7 +30,9 @@ class MqttPublish(AbstractMqttPublisher): cnt += 1 logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) if cnt % 100 == 1: - payload = { "count": cnt } + currentTime = datetime.datetime.now() + uptime = int((currentTime - startTime).total_seconds()) + payload = { "count": cnt, "uptime": uptime } self.client.publish(self.statusTopic, json.dumps(payload), retain=True) except Empty: if self.killBill: From 1a321b471251717604ef352112d221e74e6a89d7 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Sat, 5 Feb 2022 11:08:49 +0100 Subject: [PATCH 14/15] error handling improved --- opcua2mqtt/MqttPublish.py | 5 ++--- opcua2mqtt/OpcUaRequester.py | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py index 3714901..746383e 100644 --- a/opcua2mqtt/MqttPublish.py +++ b/opcua2mqtt/MqttPublish.py @@ -35,9 +35,8 @@ class MqttPublish(AbstractMqttPublisher): payload = { "count": cnt, "uptime": uptime } self.client.publish(self.statusTopic, json.dumps(payload), retain=True) except Empty: - if self.killBill: - logger.error("killbill received") - break + # just evaluate the killBill at the top of the loop again + pass except Exception as e: logger.error(f"Exception {type(e)} received in MQTT local loop: {e}") diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py index d333652..49dd62f 100644 --- a/opcua2mqtt/OpcUaRequester.py +++ b/opcua2mqtt/OpcUaRequester.py @@ -38,8 +38,10 @@ class OpcUaRequester(threading.Thread): except ua.UaError as e: logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}") await asyncio.sleep(self.delay) - except TimeoutError as e: + except asyncio.exceptions.TimeoutError as e: logger.error(f"Timeout in inner OPC-UA loop") + except Exception as e: + logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}") def run(self): loop = asyncio.new_event_loop() From 0e333c62e3e681ea3d50a1303aa9f9fb989e3187 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 7 Feb 2022 15:46:31 +0100 Subject: [PATCH 15/15] fixed config for sh --- opcua2mqtt/config.json | 64 +++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json index a5e0f07..23d75d0 100644 --- a/opcua2mqtt/config.json +++ b/opcua2mqtt/config.json @@ -8,7 +8,7 @@ }, "opcua": [ { - "enabled": "true", + "enabled": "false", "url": "opc.tcp://172.16.3.60:4840", "name": "apl", "delay": 1.0, @@ -21,42 +21,42 @@ ] }, { - "enabled": "false", + "enabled": "true", "url": "opc.tcp://192.168.254.5:4863", "name": "sh", "delay": 1.0, "timeout": 10.0, "nodes": [ - { "ns": 1, "n": "s=A201CD124/MOT_01.AV_Out#Value", "d": "A201CD124" }, - { "ns": 1, "n": "s=A201CJ003/PID_01.PV_Out#Value", "d": "A201CJ003" }, - { "ns": 1, "n": "s=A201CJ004/PID_01.PV_Out#Value", "d": "A201CJ004" }, - { "ns": 1, "n": "s=A201CJ011/PID_01.PV_Out#Value", "d": "A201CJ011" }, - { "ns": 1, "n": "s=A201CJ014/PID_01.PV_Out#Value", "d": "A201CJ014" }, - { "ns": 1, "n": "s=A201CJ021/MMON_01.PV_Out#Value", "d": "A201CJ021" }, - { "ns": 1, "n": "s=A201CJ022/MMON_01.PV_Out#Value", "d": "A201CJ022" }, - { "ns": 1, "n": "s=A201CJ023/MMON_01.PV_Out#Value", "d": "A201CJ023" }, - { "ns": 1, "n": "s=A201CJ024/PID_01.PV_Out#Value", "d": "A201CJ024" }, - { "ns": 1, "n": "s=A201CJ025/PID_01.PV_Out#Value", "d": "A201CJ025" }, - { "ns": 1, "n": "s=A201CD123/MOT_01.AV_Out#Value", "d": "A201CD123" }, - { "ns": 1, "n": "s=A201CD121/MOT_01.AV_Out#Value", "d": "A201CD121" }, - { "ns": 1, "n": "s=A212DD110/MOT_01.AV_Out#Value", "d": "A212DD110" }, - { "ns": 1, "n": "s=A212DD130/MOT_01.AV_Out#Value", "d": "A212DD130" }, - { "ns": 1, "n": "s=A212DD131/MOT_01.AV_Out#Value", "d": "A212DD131" }, - { "ns": 1, "n": "s=A212DD111/MOT_01.AV_Out#Value", "d": "A212DD111" }, - { "ns": 1, "n": "s=A212DD113/MOT_01.AV_Out#Value", "d": "A212DD113" }, - { "ns": 1, "n": "s=A212DJ004/PID_01.PV_Out#Value", "d": "A212DJ004" }, - { "ns": 1, "n": "s=A212DJ021/PID_01.PV_Out#Value", "d": "A212DJ021" }, - { "ns": 1, "n": "s=A212DJ001/PID_01.PV_Out#Value", "d": "A212DJ001" }, - { "ns": 1, "n": "s=A212DJ011/PID_01.PV_Out#Value", "d": "A212DJ011" }, - { "ns": 1, "n": "s=A212DJ032/MMON_01.PV_Out#Value", "d": "A212DJ032" }, - { "ns": 1, "n": "s=A212DJ031/MMON_01.PV_Out#Value", "d": "A212DJ031" }, - { "ns": 1, "n": "s=A212DJ033/MMON_01.PV_Out#Value", "d": "A212DJ033" }, - { "ns": 1, "n": "s=A212DJ010/MMON_01.PV_Out#Value", "d": "A212DJ010" }, - { "ns": 1, "n": "s=A212DJ042/MMON_01.PV_Out#Value", "d": "A212DJ042" }, - { "ns": 1, "n": "s=A214BJ055/PID_01.PV_Out#Value", "d": "A214BJ055" }, - { "ns": 1, "n": "s=A214BJ065/PID_01.PV_Out#Value", "d": "A214BJ065" }, - { "ns": 1, "n": "s=A212BJ010/MMON_01.PV_Out#Value", "d": "A212BJ010" }, - { "ns": 1, "n": "s=A212BJ010/MMON_02.PV_Out#Value", "d": "A212BJ010" } + { "ns": 1, "n": "s=t|SERVER::A201CD124/MOT_01.AV_Out#Value", "d": "A201CD124" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ003/PID_01.PV_Out#Value", "d": "A201CJ003" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ004/PID_01.PV_Out#Value", "d": "A201CJ004" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ011/PID_01.PV_Out#Value", "d": "A201CJ011" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ014/PID_01.PV_Out#Value", "d": "A201CJ014" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ021/MMON_01.PV_Out#Value", "d": "A201CJ021" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ022/MMON_01.PV_Out#Value", "d": "A201CJ022" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ023/MMON_01.PV_Out#Value", "d": "A201CJ023" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ024/PID_01.PV_Out#Value", "d": "A201CJ024" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ025/PID_01.PV_Out#Value", "d": "A201CJ025" }, + { "ns": 1, "n": "s=t|SERVER::A201CD123/MOT_01.AV_Out#Value", "d": "A201CD123" }, + { "ns": 1, "n": "s=t|SERVER::A201CD121/MOT_01.AV_Out#Value", "d": "A201CD121" }, + { "ns": 1, "n": "s=t|SERVER::A212DD110/MOT_01.AV_Out#Value", "d": "A212DD110" }, + { "ns": 1, "n": "s=t|SERVER::A212DD130/MOT_01.AV_Out#Value", "d": "A212DD130" }, + { "ns": 1, "n": "s=t|SERVER::A212DD131/MOT_01.AV_Out#Value", "d": "A212DD131" }, + { "ns": 1, "n": "s=t|SERVER::A212DD111/MOT_01.AV_Out#Value", "d": "A212DD111" }, + { "ns": 1, "n": "s=t|SERVER::A212DD113/MOT_01.AV_Out#Value", "d": "A212DD113" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ004/PID_01.PV_Out#Value", "d": "A212DJ004" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ021/PID_01.PV_Out#Value", "d": "A212DJ021" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ001/PID_01.PV_Out#Value", "d": "A212DJ001" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ011/PID_01.PV_Out#Value", "d": "A212DJ011" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ032/MMON_01.PV_Out#Value", "d": "A212DJ032" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ031/MMON_01.PV_Out#Value", "d": "A212DJ031" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ033/MMON_01.PV_Out#Value", "d": "A212DJ033" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ010/MMON_01.PV_Out#Value", "d": "A212DJ010" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ042/MMON_01.PV_Out#Value", "d": "A212DJ042" }, + { "ns": 1, "n": "s=t|SERVER::A214BJ055/PID_01.PV_Out#Value", "d": "A214BJ055" }, + { "ns": 1, "n": "s=t|SERVER::A214BJ065/PID_01.PV_Out#Value", "d": "A214BJ065" }, + { "ns": 1, "n": "s=t|SERVER::A212BJ010/MMON_01.PV_Out#Value", "d": "A212BJ010" }, + { "ns": 1, "n": "s=t|SERVER::A212BJ010/MMON_02.PV_Out#Value", "d": "A212BJ010" } ] } ]