13 Commits

Author SHA1 Message Date
93bbccf5c3 fix typo in configuration
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-01-11 15:32:32 +01:00
012bb46b2a fix configuration
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-01-11 15:27:28 +01:00
ae1828a06e fix use of modbus module
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-01-11 15:23:07 +01:00
51dec2b281 fix ci script
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-01-11 15:16:47 +01:00
4ea85f3c7e change config to env, add ci-script
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-01-11 14:52:36 +01:00
5cabef91ea termination mechanism integrated 2021-08-25 17:36:30 +02:00
78f001fd70 add option to disable analog event publishing 2021-08-25 11:58:35 +02:00
fc78dc7f4d documentation 2021-08-24 13:55:51 +02:00
1c8a3a2093 retain event messages 2021-08-24 13:33:52 +02:00
92aeb8efdf start command fixed 2021-08-24 10:12:31 +02:00
8120df60ac another type mismatch in config 2021-08-24 10:08:32 +02:00
3d97536b9b type mismatch in config 2021-08-24 10:06:13 +02:00
220d1e0ef9 forgotten module 2021-08-24 10:02:11 +02:00
12 changed files with 317 additions and 103 deletions

30
.woodpecker.yml Normal file
View File

@ -0,0 +1,30 @@
steps:
build:
image: plugins/kaniko
settings:
repo: gitea.hottis.de/wn/digitaltwin1
registry:
from_secret: container_registry
tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG}
username:
from_secret: container_registry_username
password:
from_secret: container_registry_password
dockerfile: Dockerfile
when:
- event: [push, tag]
deploy:
image: portainer/kubectl-shell:latest
secrets:
- source: kube_config
target: KUBE_CONFIG_CONTENT
commands:
- export IMAGE_TAG=$CI_COMMIT_TAG
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig
- export KUBECONFIG=/tmp/kubeconfig
- cat ./deployment/install-yml.tmpl | sed -e 's,%IMAGETAG%,'$IMAGE_TAG','g | kubectl apply -f -
when:
- event: tag

View File

@ -11,7 +11,8 @@ ARG CONF_DIR="${APP_DIR}/config"
RUN \ RUN \
apt update && \ apt update && \
pip3 install loguru && \ pip3 install loguru && \
pip3 install pymodbus pip3 install pymodbus && \
pip3 install paho-mqtt
RUN \ RUN \
mkdir -p ${APP_DIR} && \ mkdir -p ${APP_DIR} && \
@ -26,5 +27,5 @@ WORKDIR ${APP_DIR}
VOLUME ${CONF_DIR} VOLUME ${CONF_DIR}
CMD "python digitaltwin1.py -f ./config/config.ini" CMD [ "/usr/local/bin/python", "digitaltwin1.py", "-f", "./config/config.ini" ]

View File

@ -9,3 +9,4 @@ digitalInputTopicPrefix = dt1/di
analogInputEventTopicPrefix = dt1/ai/event analogInputEventTopicPrefix = dt1/ai/event
analogInputPeriodicTopicPrefix = dt1/ai/periodic analogInputPeriodicTopicPrefix = dt1/ai/periodic
analogInputPublishPeriod = 10.0 analogInputPublishPeriod = 10.0
disableAnalogInputEventPublishing = true

View File

@ -0,0 +1,44 @@
apiVersion: v1
kind: Namespace
metadata:
name: homea
---
apiVersion: v1
kind: ConfigMap
metadata:
name: digitaltwin1
namespace: homea
data:
MQTT__BROKER: "emqx01-anonymous-cluster-internal.broker.svc.cluster.local"
MQTT__DIGITALOUTPUTTOPICPREFIX: "dt1/coil"
MQTT__DIGITALINPUTTOPICPREFIX: "dt1/di"
MQTT__ANALOGINPUTEVENTTOPICPREFIX: "dt1/ai/event"
MQTT__ANALOGINPUTPERIODICTOPICPREFIX: "dt1/ai/periodic"
MQTT__ANALOGINPUTPUBLISHPERIOD: "60.0"
MQTT__DISABLEANALOGINPUTEVENTPUBLISHING: "true"
MODBUS__CLIENT: "172.16.2.31"
MODBUS__SCANRATE: "0.25"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: digitaltwin1
namespace: homea
labels:
app: digitaltwin1
spec:
replicas: 1
selector:
matchLabels:
app: digitaltwin1
template:
metadata:
labels:
app: digitaltwin1
spec:
containers:
- name: digitaltwin1
image: gitea.hottis.de/wn/digitaltwin1:%IMAGETAG%
envFrom:
- configMapRef:
name: digitaltwin1

55
readme.md Normal file
View File

@ -0,0 +1,55 @@
# DigitalTwin1
## Digital twin for Beckhoff BK-9000 Modbus-TCP coupler (and compatible Wago device)
### Overview
This tool connects to a Modbus-TCP coupler, reads it process image and publishes
* the new state of a discrete input register (digital input) after a change
* the new value of an input register (analog input) after a change
* periodically publishes the current values of all input registers (analog inputs)
using MQTT messages. The topic is built from a configured prefix and the index of the particular registers.
Furthermore it subscribes to MQTT messages related to the coils for the setup and anytime a message is received, it set the particular coils accordingly.
### Configuration
Configuration is done using a configuration file
[modbus]
client = 172.16.2.157
scanrate = 0.25
[mqtt]
broker = 172.16.2.16
digitalOutputTopicPrefix = dt1/coil
digitalInputTopicPrefix = dt1/di
analogInputEventTopicPrefix = dt1/ai/event
analogInputPeriodicTopicPrefix = dt1/ai/periodic
analogInputPublishPeriod = 10.0
disableAnalogInputEventPublishing = true
### Operation details
While the input and discrete input registers are scanned with the configured scanrate, an action on a coil according to a received message is issued immediately. At the same time the all input registers are scanned.
Using a configuration option it is possible to disable the publishing of analog change event - if only periodic information (like for a thermometer or so) is required.
The MQTT messages related to change events of input or discrete input registers are marked as //retained//.
### Usage
The tool is simply started using
python digitaltwin1.py
On default it searches the configuration file at ``$PWD/config``. This can be changed using the commandline argument ``-f``.
The Python modules ``pymodbus``, ``loguru`` and ``paho-mqtt`` are required.
A docker image with the required dependencies, the tool and a prepared default configuration in a volume is provided at ``registry.hottis.de/wolutator/digitaltwin1``.

View File

@ -22,6 +22,10 @@ class AbstractMqttPublisher(threading.Thread):
self.client = mqtt.Client(userdata=self) self.client = mqtt.Client(userdata=self)
# consider this flag in the localLoop
self.killBill = False
self.killEvent = threading.Event()
def run(self): def run(self):
self.client.on_message = mqttOnMessageCallback self.client.on_message = mqttOnMessageCallback
self.client.on_connect = mqttOnConnectCallback self.client.on_connect = mqttOnConnectCallback
@ -39,6 +43,18 @@ class AbstractMqttPublisher(threading.Thread):
def localLoop(self): def localLoop(self):
raise NotImplementedError() raise NotImplementedError()
def stop(self):
self.client.loop_stop()
logger.info("mqtt loop stopped")
self.killBill = True
logger.info("kill flag set")
self.killEvent.set()
with self.processImage:
self.processImage.notify()
logger.info("kill events triggered")
def onConnect(self): def onConnect(self):
logger.info("mqtt connected") logger.info("mqtt connected")

View File

@ -1,83 +1,90 @@
from pymodbus.client.sync import ModbusTcpClient as ModbusClient from pymodbus.client import ModbusTcpClient as ModbusClient
from pymodbus.exceptions import ModbusIOException from pymodbus.exceptions import ModbusIOException
from time import sleep from time import sleep
import threading import threading
from loguru import logger from loguru import logger
def modbusStart(config, processImage):
modbusThread = threading.Thread(target=modbusHandler, args=[config, processImage]) class ModbusHandler(threading.Thread):
modbusThread.start() def __init__(self, config, processImage):
super().__init__()
self.config = config["modbus"]
self.processImage = processImage
self.killBill = False
def stop(self):
self.killBill = True
def run(self):
modbusClient = self.config["client"]
modbusRefreshPeriod = float(self.config["scanrate"])
client = ModbusClient(modbusClient)
def modbusHandler(config, processImage): try:
modbusClient = config["modbus"]["client"] client.connect()
modbusRefreshPeriod = config["modbus"]["scanrate"]
res = client.read_holding_registers(0x1010, 4)
if isinstance(res, ModbusIOException):
raise Exception(self.processImage)
client = ModbusClient(modbusClient) if len(res.registers) != 4:
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers)))
try: (analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers
client.connect() logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}")
res = client.read_holding_registers(0x1010, 4) self.processImage.init(digitalOutputBits, digitalInputBits, analogInputBits)
if isinstance(res, ModbusIOException):
raise Exception(processImage)
if len(res.registers) != 4: reg = client.read_coils(0, digitalOutputBits)
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers))) if isinstance(reg, ModbusIOException):
raise Exception(reg)
with self.processImage:
self.processImage.setCoils(reg.bits)
(analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers while not self.killBill:
logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}") try:
if not client.is_socket_open():
client.connect()
processImage.init(digitalOutputBits, digitalInputBits, analogInputBits) reg = client.read_input_registers(0, analogInputBits // 8)
reg = client.read_coils(0, digitalOutputBits)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
with processImage:
processImage.setCoils(reg.bits)
while True:
try:
if not client.is_socket_open():
client.connect()
reg = client.read_input_registers(0, analogInputBits // 8)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
analogInputs = reg.registers
reg = client.read_discrete_inputs(0, digitalInputBits)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
discreteInputs = reg.bits
coils = None
with processImage:
processImage.setAnalogsInputs(analogInputs)
processImage.setDiscreteInputs(discreteInputs)
if processImage.hasPendingInputChanges():
processImage.notify()
if processImage.hasPendingOutputChanges():
coils = processImage.getCoils()
if coils:
logger.debug("write coils to device: {}".format(coils))
reg = client.write_coils(0, coils)
if isinstance(reg, ModbusIOException): if isinstance(reg, ModbusIOException):
raise Exception(reg) raise Exception(reg)
analogInputs = reg.registers
except Exception as e: reg = client.read_discrete_inputs(0, digitalInputBits)
logger.error("Exception in inner modbus handler loop: {}".format(e)) if isinstance(reg, ModbusIOException):
client.close() raise Exception(reg)
finally: discreteInputs = reg.bits
#sleep(modbusRefreshPeriod)
if (processImage.coilEvent.wait(timeout=modbusRefreshPeriod)): coils = None
processImage.coilEvent.clear() with self.processImage:
self.processImage.setAnalogsInputs(analogInputs)
self.processImage.setDiscreteInputs(discreteInputs)
if self.processImage.hasPendingInputChanges():
self.processImage.notify()
if self.processImage.hasPendingOutputChanges():
coils = self.processImage.getCoils()
if coils:
logger.debug("write coils to device: {}".format(coils))
reg = client.write_coils(0, coils)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
except Exception as e:
logger.error("Exception in inner modbus handler loop: {}".format(e))
client.close()
finally:
#sleep(modbusRefreshPeriod)
if (self.processImage.coilEvent.wait(timeout=modbusRefreshPeriod)):
self.processImage.coilEvent.clear()
except Exception as e: except Exception as e:
logger.error("Exception in modbus handler: {}".format(e)) logger.error("Exception in modbus handler: {}".format(e))
finally: finally:
client.close() client.close()

View File

@ -14,8 +14,8 @@ class MqttCoilSubscriber(AbstractMqttPublisher):
super().__init__(config, processImage) super().__init__(config, processImage)
def localLoop(self): def localLoop(self):
while True: while not self.killBill:
sleep(self.config["analogInputPublishPeriod"]) sleep(float(self.config["analogInputPublishPeriod"]))
def onMessage(self, topic, payload): def onMessage(self, topic, payload):
logger.warning("mqtt message received: {} -> {}".format(topic, str(payload))) logger.warning("mqtt message received: {} -> {}".format(topic, str(payload)))

View File

@ -12,14 +12,18 @@ def mqttEventPublisherStart(config, processImage):
class MqttEventPublisher(AbstractMqttPublisher): class MqttEventPublisher(AbstractMqttPublisher):
def __init__(self, config, processImage): def __init__(self, config, processImage):
super().__init__(config, processImage) super().__init__(config, processImage)
self.disableAnalogInputEventPublishing = self.config["disableAnalogInputEventPublishing"].lower() in [ "true", "on" ]
def localLoop(self): def localLoop(self):
while True: while not self.killBill:
with self.processImage: with self.processImage:
self.processImage.wait() self.processImage.wait()
if self.killBill:
continue
discreteInputChangeset = self.processImage.getChangedDiscreteInputs() discreteInputChangeset = self.processImage.getChangedDiscreteInputs()
analogInputChangeset = self.processImage.getChangedAnalogsInputs() if not self.disableAnalogInputEventPublishing:
analogInputChangeset = self.processImage.getChangedAnalogsInputs()
for discreteInputChangeItem in discreteInputChangeset: for discreteInputChangeItem in discreteInputChangeset:
logger.debug("Discrete input {} changed from {} to {}" logger.debug("Discrete input {} changed from {} to {}"
@ -27,15 +31,18 @@ class MqttEventPublisher(AbstractMqttPublisher):
discreteInputChangeItem[1][1], discreteInputChangeItem[1][1],
discreteInputChangeItem[1][0])) discreteInputChangeItem[1][0]))
self.client.publish("{}/{}".format(self.config["digitalInputTopicPrefix"], str(discreteInputChangeItem[0])), self.client.publish("{}/{}".format(self.config["digitalInputTopicPrefix"], str(discreteInputChangeItem[0])),
str(discreteInputChangeItem[1][0])) str(discreteInputChangeItem[1][0]),
retain=True)
for analogInputChangeItem in analogInputChangeset: if not self.disableAnalogInputEventPublishing:
logger.debug("Analog input {} changed from {} to {}" for analogInputChangeItem in analogInputChangeset:
.format(analogInputChangeItem[0], logger.debug("Analog input {} changed from {} to {}"
analogInputChangeItem[1][1], .format(analogInputChangeItem[0],
analogInputChangeItem[1][0])) analogInputChangeItem[1][1],
analogInputChangeItem[1][0]))
self.client.publish("{}/{}".format(self.config["analogInputEventTopicPrefix"], str(analogInputChangeItem[0])),
str(analogInputChangeItem[1][0])) self.client.publish("{}/{}".format(self.config["analogInputEventTopicPrefix"], str(analogInputChangeItem[0])),
str(analogInputChangeItem[1][0]),
retain=True)

View File

@ -1,8 +1,6 @@
import paho.mqtt.client as mqtt from threading import Event
import threading
from loguru import logger from loguru import logger
from AbstractMqttHandler import AbstractMqttPublisher from AbstractMqttHandler import AbstractMqttPublisher
from time import sleep
def mqttPeriodPublisherStart(config, processImage): def mqttPeriodPublisherStart(config, processImage):
mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage) mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
@ -14,7 +12,7 @@ class MqttPeriodPublisher(AbstractMqttPublisher):
super().__init__(config, processImage) super().__init__(config, processImage)
def localLoop(self): def localLoop(self):
while True: while not self.killBill:
with self.processImage: with self.processImage:
if not self.processImage.isInitialized(): if not self.processImage.isInitialized():
continue continue
@ -28,5 +26,5 @@ class MqttPeriodPublisher(AbstractMqttPublisher):
self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])), self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])),
str(analogInputItem[1])) str(analogInputItem[1]))
sleep(self.config["analogInputPublishPeriod"]) self.killEvent.wait(timeout=float(self.config["analogInputPublishPeriod"]))

30
src/config.py Normal file
View File

@ -0,0 +1,30 @@
import os
from loguru import logger
class Config:
OPTIONS = {
'mqtt': [ 'broker',
'digitalOutputTopicPrefix',
'digitalInputTopicPrefix',
'analogInputEventTopicPrefix',
'analogInputPeriodicTopicPrefix',
'analogInputPublishPeriod',
'disableAnalogInputEventPublishing' ],
'modbus': [ 'client', 'scanrate' ]
}
def __init__(self):
self.values = {}
for section, keys in Config.OPTIONS.items():
self.values[section] = {}
for key in keys:
varname = f"{section}__{key}".upper()
try:
self.values[section][key] = os.environ[varname]
logger.info(f"Config: {section} {key} -> {self.values[section][key]}")
except KeyError:
pass
def __getitem__(self, section):
return self.values[section]

View File

@ -1,43 +1,68 @@
from loguru import logger from loguru import logger
import argparse import argparse
import configparser import configparser
import threading
from ProcessImage import ProcessImage from ProcessImage import ProcessImage
from ModbusHandler import modbusStart from ModbusHandler import ModbusHandler
from MqttEventPublisher import mqttEventPublisherStart from MqttEventPublisher import MqttEventPublisher, mqttEventPublisherStart
from MqttPeriodPublisher import mqttPeriodPublisherStart from MqttPeriodPublisher import MqttPeriodPublisher
from MqttCoilSubscriber import mqttCoilSubscriberStart from MqttCoilSubscriber import MqttCoilSubscriber
from config import Config
deathBell = threading.Event()
def exceptHook(args):
global deathBell
logger.error("Exception in thread caught: {}".format(args))
deathBell.set()
logger.error("rang the death bell")
logger.info("DigitalTwin1 starting") logger.info("DigitalTwin1 starting")
parser = argparse.ArgumentParser(description="DigitalTwin1") config = Config()
parser.add_argument('--config', '-f',
help='Config file, default is $pwd/config/config.ini',
required=False,
default='./config/config.ini')
args = parser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
processImage = ProcessImage() processImage = ProcessImage()
modbusStart(config, processImage) modbusHandlerThread = ModbusHandler(config, processImage)
modbusHandlerThread.start()
logger.info("Modbus handler running") logger.info("Modbus handler running")
mqttEventPublisherStart(config, processImage) mqttEventPublisherThread = MqttEventPublisher(config, processImage)
mqttEventPublisherThread.start()
logger.info("MQTT event publisher running") logger.info("MQTT event publisher running")
mqttPeriodPublisherStart(config, processImage) mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
mqttPeriodPublisherThread.start()
logger.info("MQTT period publisher running") logger.info("MQTT period publisher running")
mqttCoilSubscriberStart(config, processImage) mqttCoilSubscriberThread = MqttCoilSubscriber(config, processImage)
mqttCoilSubscriberThread.start()
logger.info("MQTT coil subscriber running") logger.info("MQTT coil subscriber running")
threading.excepthook = exceptHook
logger.info("Threading excepthook set")
logger.info("DigitalTwin1 running") logger.info("DigitalTwin1 running")
deathBell.wait()
logger.error("DigitalTwin1 is dying")
modbusHandlerThread.stop()
mqttEventPublisherThread.stop()
mqttPeriodPublisherThread.stop()
mqttCoilSubscriberThread.stop()
modbusHandlerThread.join()
logger.error("modbusHandlerThread joined")
mqttEventPublisherThread.join()
logger.error("mqttEventPublisherThread joined")
mqttPeriodPublisherThread.join()
logger.error("mqttPeriodPublisherThread joined")
mqttCoilSubscriberThread.join()
logger.error("mqttCoilSubscriberThread joined")
logger.error("DigitalTwin1 to terminate")