diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c01bd8c --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*/__pycache__/ diff --git a/opcua2mqtt/DataObject.py b/opcua2mqtt/DataObject.py new file mode 100644 index 0000000..b7628f2 --- /dev/null +++ b/opcua2mqtt/DataObject.py @@ -0,0 +1,30 @@ +import re +import json + +class InvalidDataObjectException(Exception): + def __init__(self, message): + super().__init__(message) + +class DataObject(object): + invalidChars = re.compile("[#+/\s]") + + def __init__(self, serverName, nameSpaceIndex, variableName, value): + self.serverName = serverName + self.nameSpaceIndex = nameSpaceIndex + self.variableName = variableName + self.value = value + + def isValid(self): + return (not (DataObject.invalidChars.search(self.serverName) or DataObject.invalidChars.search(self.variableName))) and (type(self.nameSpaceIndex) == int) + + def getTopicPost(self): + return self.serverName + '/' + str(self.nameSpaceIndex) + '/' + self.variableName + + def getPayload(self): + payload = { + "serverName": self.serverName, + "nameSpaceIndex": self.nameSpaceIndex, + "variableName": self.variableName, + "value": self.value + } + return json.dumps(payload) \ No newline at end of file diff --git a/opcua2mqtt/MqttBase.py b/opcua2mqtt/MqttBase.py new file mode 100644 index 0000000..6033846 --- /dev/null +++ b/opcua2mqtt/MqttBase.py @@ -0,0 +1,75 @@ +import paho.mqtt.client as mqtt +from loguru import logger +import threading +import ssl + + + +def mqttOnConnectCallback(client, userdata, flags, rc): + userdata.onConnect() + +def mqttOnMessageCallback(client, userdata, message): + userdata.onMessage(message.topic, message.payload) + +def mqttOnDisconnectCallback(client, userdata, rc): + userdata.onDisconnect(rc) + + +class AbstractMqttPublisher(threading.Thread): + def __init__(self, config): + super().__init__() + + self.config = config["mqtt"] + + self.client = mqtt.Client(userdata=self) + + # consider this flag in the localLoop + self.killBill = False + self.killEvent = threading.Event() + + def run(self): + self.client.on_message = mqttOnMessageCallback + self.client.on_connect = mqttOnConnectCallback + self.client.on_disconnect = mqttOnDisconnectCallback + + if ("login" in self.config) and ("password" in self.config): + self.client.username_pw_set(self.config["login"], self.config["password"]) + + if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config): + self.client.tls_set( + ca_certs=self.config["ca"], + certfile=self.config["cert"], + keyfile=self.config["key"], + cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLSv1_2, + ciphers=None # this does not mean "no cipher" but it means "default ciphers" + ) + + self.client.connect(self.config["broker"], int(self.config["port"])) + self.client.loop_start() + logger.info("mqtt loop started") + + self.localLoop() + + def localLoop(self): + raise NotImplementedError() + + def stop(self): + self.client.loop_stop() + logger.info("mqtt loop stopped") + + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + logger.info("kill events triggered") + + def onConnect(self): + logger.info("mqtt connected") + + def onDisconnect(self, rc): + logger.warning("mqtt disconnect, rc: {}".format(rc)) + + def onMessage(self, topic, payload): + logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload))) + diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py new file mode 100644 index 0000000..746383e --- /dev/null +++ b/opcua2mqtt/MqttPublish.py @@ -0,0 +1,42 @@ +from threading import Event +from loguru import logger +from MqttBase import AbstractMqttPublisher +from queue import Empty +import json +import datetime + +LOOP_SLICE = 0.1 # seconds + +class MqttPublish(AbstractMqttPublisher): + def __init__(self, config, queue): + super().__init__(config) + self.queue = queue + self.topicPre = self.config["publishTopicPrefix"] + self.statusTopic = self.config["statusTopic"] + self.statusThreshold = self.config["statusThreshold"] + + def localLoop(self): + cnt = 0 + startTime = datetime.datetime.now() + while not self.killBill: + try: + dataObject = self.queue.get(timeout=LOOP_SLICE) + if not dataObject.isValid(): + logger.error("invalid dataObject received: drop it") + else: + topic = self.topicPre + '/' + dataObject.getTopicPost() + payload = dataObject.getPayload() + self.client.publish(topic, payload) + cnt += 1 + logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) + if cnt % 100 == 1: + currentTime = datetime.datetime.now() + uptime = int((currentTime - startTime).total_seconds()) + payload = { "count": cnt, "uptime": uptime } + self.client.publish(self.statusTopic, json.dumps(payload), retain=True) + except Empty: + # just evaluate the killBill at the top of the loop again + pass + except Exception as e: + logger.error(f"Exception {type(e)} received in MQTT local loop: {e}") + diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py new file mode 100644 index 0000000..49dd62f --- /dev/null +++ b/opcua2mqtt/OpcUaRequester.py @@ -0,0 +1,58 @@ +import asyncio +from asyncua import Client, ua +import threading +from loguru import logger +from DataObject import DataObject + + +class OpcUaRequester(threading.Thread): + def __init__(self, config, queue): + super().__init__() + + self.config = config + self.queue = queue + + self.name = self.config['name'] + self.url = self.config['url'] + self.nodes = self.config['nodes'] + self.delay = self.config['delay'] + self.timeout = self.config['timeout'] + + # consider this flag in the localLoop + self.killBill = False + self.killEvent = threading.Event() + + + async def opcUaRequesterInnerLoop(self): + while not self.killBill: + try: + async with Client(url=self.url, timeout=self.timeout) as client: + for nodeSpec in self.nodes: + try: + logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") + node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}") + value = await node.read_value() + displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text + logger.debug(f"Got: {displayName=} = {value=}") + self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value)) + except ua.UaError as e: + logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}") + await asyncio.sleep(self.delay) + except asyncio.exceptions.TimeoutError as e: + logger.error(f"Timeout in inner OPC-UA loop") + except Exception as e: + logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}") + + def run(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.opcUaRequesterInnerLoop()) + loop.close() + + def stop(self): + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + logger.info("kill events triggered") + diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py new file mode 100644 index 0000000..b8a7449 --- /dev/null +++ b/opcua2mqtt/bridge.py @@ -0,0 +1,80 @@ +from MqttPublish import MqttPublish +from OpcUaRequester import OpcUaRequester +from loguru import logger +import argparse +import json +import threading +import queue +import signal + + +deathBell = threading.Event() + +def exceptHook(args): + global deathBell + logger.error("Exception in thread caught: {}".format(args)) + deathBell.set() + logger.error("rang the death bell") + +def terminateHook(sig, frame): + global deathBell + logger.error("SIGINT received") + deathBell.set() + logger.error("rang the death bell") + + +logger.info("opcua2mqtt bridge starting") + +parser = argparse.ArgumentParser(description="opcua2mqtt") +parser.add_argument('--config', '-f', + help='Config file, default is $pwd/config.json', + required=False, + default='./config.json') +args = parser.parse_args() + +with open(args.config) as f: + config = json.load(f) + + +queue = queue.Queue() + +publishThread = MqttPublish(config, queue) +publishThread.start() +logger.info("MqttPublish started") + +opcuaThreads = [] +for o in config['opcua']: + if o['enabled'] != 'true': + continue + ot = OpcUaRequester(o, queue) + ot.start() + logger.info(f"OpcUaRequester thread for {o['name']} started") + opcuaThreads.append(ot) + +threading.excepthook = exceptHook +logger.info("Threading excepthook set") + +signal.signal(signal.SIGINT, terminateHook) +logger.info("SIGINT handler set") + +logger.info("opcua2mqtt bridge is running") + + +deathBell.wait() +logger.error("opcua2mqtt bridge is dying") + +publishThread.stop() +logger.error("publishThread stopped") + +for ot in opcuaThreads: + ot.stop() + logger.error(f"opcua thread {ot.name} stopped") + +publishThread.join() +logger.error("publishThread joined") + +for ot in opcuaThreads: + ot.join() + logger.error(f"opcua thread {ot.name} joined") + +logger.error("opcua2mqtt bridge is terminated") diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json new file mode 100644 index 0000000..23d75d0 --- /dev/null +++ b/opcua2mqtt/config.json @@ -0,0 +1,65 @@ +{ + "mqtt": { + "broker": "172.16.2.16", + "port": 1883, + "publishTopicPrefix": "opcua", + "statusTopic": "opcua/bridge/status", + "statusThreshold": 100 + }, + "opcua": [ + { + "enabled": "false", + "url": "opc.tcp://172.16.3.60:4840", + "name": "apl", + "delay": 1.0, + "timeout": 1.0, + "nodes": [ + { "ns": 0, "n": "i=345", "d": "pv" }, + { "ns": 0, "n": "i=348", "d": "sv" }, + { "ns": 0, "n": "i=351", "d": "tv" }, + { "ns": 0, "n": "i=354", "d": "qv" } + ] + }, + { + "enabled": "true", + "url": "opc.tcp://192.168.254.5:4863", + "name": "sh", + "delay": 1.0, + "timeout": 10.0, + "nodes": [ + { "ns": 1, "n": "s=t|SERVER::A201CD124/MOT_01.AV_Out#Value", "d": "A201CD124" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ003/PID_01.PV_Out#Value", "d": "A201CJ003" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ004/PID_01.PV_Out#Value", "d": "A201CJ004" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ011/PID_01.PV_Out#Value", "d": "A201CJ011" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ014/PID_01.PV_Out#Value", "d": "A201CJ014" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ021/MMON_01.PV_Out#Value", "d": "A201CJ021" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ022/MMON_01.PV_Out#Value", "d": "A201CJ022" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ023/MMON_01.PV_Out#Value", "d": "A201CJ023" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ024/PID_01.PV_Out#Value", "d": "A201CJ024" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ025/PID_01.PV_Out#Value", "d": "A201CJ025" }, + { "ns": 1, "n": "s=t|SERVER::A201CD123/MOT_01.AV_Out#Value", "d": "A201CD123" }, + { "ns": 1, "n": "s=t|SERVER::A201CD121/MOT_01.AV_Out#Value", "d": "A201CD121" }, + { "ns": 1, "n": "s=t|SERVER::A212DD110/MOT_01.AV_Out#Value", "d": "A212DD110" }, + { "ns": 1, "n": "s=t|SERVER::A212DD130/MOT_01.AV_Out#Value", "d": "A212DD130" }, + { "ns": 1, "n": "s=t|SERVER::A212DD131/MOT_01.AV_Out#Value", "d": "A212DD131" }, + { "ns": 1, "n": "s=t|SERVER::A212DD111/MOT_01.AV_Out#Value", "d": "A212DD111" }, + { "ns": 1, "n": "s=t|SERVER::A212DD113/MOT_01.AV_Out#Value", "d": "A212DD113" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ004/PID_01.PV_Out#Value", "d": "A212DJ004" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ021/PID_01.PV_Out#Value", "d": "A212DJ021" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ001/PID_01.PV_Out#Value", "d": "A212DJ001" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ011/PID_01.PV_Out#Value", "d": "A212DJ011" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ032/MMON_01.PV_Out#Value", "d": "A212DJ032" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ031/MMON_01.PV_Out#Value", "d": "A212DJ031" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ033/MMON_01.PV_Out#Value", "d": "A212DJ033" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ010/MMON_01.PV_Out#Value", "d": "A212DJ010" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ042/MMON_01.PV_Out#Value", "d": "A212DJ042" }, + { "ns": 1, "n": "s=t|SERVER::A214BJ055/PID_01.PV_Out#Value", "d": "A214BJ055" }, + { "ns": 1, "n": "s=t|SERVER::A214BJ065/PID_01.PV_Out#Value", "d": "A214BJ065" }, + { "ns": 1, "n": "s=t|SERVER::A212BJ010/MMON_01.PV_Out#Value", "d": "A212BJ010" }, + { "ns": 1, "n": "s=t|SERVER::A212BJ010/MMON_02.PV_Out#Value", "d": "A212BJ010" } + ] + } + ] +} + +