4 Commits

Author SHA1 Message Date
ae1828a06e fix use of modbus module
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-01-11 15:23:07 +01:00
51dec2b281 fix ci script
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-01-11 15:16:47 +01:00
4ea85f3c7e change config to env, add ci-script
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-01-11 14:52:36 +01:00
5cabef91ea termination mechanism integrated 2021-08-25 17:36:30 +02:00
9 changed files with 242 additions and 90 deletions

30
.woodpecker.yml Normal file
View File

@ -0,0 +1,30 @@
steps:
build:
image: plugins/kaniko
settings:
repo: gitea.hottis.de/wn/digitaltwin1
registry:
from_secret: container_registry
tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG}
username:
from_secret: container_registry_username
password:
from_secret: container_registry_password
dockerfile: Dockerfile
when:
- event: [push, tag]
deploy:
image: portainer/kubectl-shell:latest
secrets:
- source: kube_config
target: KUBE_CONFIG_CONTENT
commands:
- export IMAGE_TAG=$CI_COMMIT_TAG
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig
- export KUBECONFIG=/tmp/kubeconfig
- cat ./deployment/install-yml.tmpl | sed -e 's,%IMAGETAG%,'$IMAGE_TAG','g | kubectl apply -f -
when:
- event: tag

View File

@ -0,0 +1,44 @@
apiVersion: v1
kind: Namespace
metadata:
name: homea
---
apiVersion: v1
kind: ConfigMap
metadata:
name: digitaltwin1
namespace: homea
data:
MQTT__BROKER: "emqx01-anonymous-cluster-internal.broker.svc.cluster.local"
MQTT__DIGITALOUTPUTTOPICPREFIX: "dt1/coil"
MQTT__DIGITALINPUTTOPICPREFIX: "dt1/di"
MQTT__ANALOGINPUTEVENTTOPICPREFIX: "dt1/ai/event"
MQTT__ANALOGINPUTPERIODICTOPICPREFIX: "dt1/ai/periodic"
MQTT__ANALOGPUBLISHPERIOD: "10.0"
MQTT__DISABLEANALOGINPUTEVENTPUBLISHING: "true"
MODBUS__CLIENT: "172.16.2.157"
MODBUS__SCANRATE: "0.25"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: digitaltwin1
namespace: homea
labels:
app: digitaltwin1
spec:
replicas: 1
selector:
matchLabels:
app: digitaltwin1
template:
metadata:
labels:
app: digitaltwin1
spec:
containers:
- name: digitaltwin1
image: gitea.hottis.de/wn/digitaltwin1:%IMAGETAG%
envFrom:
- configMapRef:
name: digitaltwin1

View File

@ -22,6 +22,10 @@ class AbstractMqttPublisher(threading.Thread):
self.client = mqtt.Client(userdata=self)
# consider this flag in the localLoop
self.killBill = False
self.killEvent = threading.Event()
def run(self):
self.client.on_message = mqttOnMessageCallback
self.client.on_connect = mqttOnConnectCallback
@ -39,6 +43,18 @@ class AbstractMqttPublisher(threading.Thread):
def localLoop(self):
raise NotImplementedError()
def stop(self):
self.client.loop_stop()
logger.info("mqtt loop stopped")
self.killBill = True
logger.info("kill flag set")
self.killEvent.set()
with self.processImage:
self.processImage.notify()
logger.info("kill events triggered")
def onConnect(self):
logger.info("mqtt connected")

View File

@ -1,83 +1,90 @@
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from pymodbus.client import ModbusTcpClient as ModbusClient
from pymodbus.exceptions import ModbusIOException
from time import sleep
import threading
from loguru import logger
def modbusStart(config, processImage):
modbusThread = threading.Thread(target=modbusHandler, args=[config, processImage])
modbusThread.start()
class ModbusHandler(threading.Thread):
def __init__(self, config, processImage):
super().__init__()
self.config = config["modbus"]
self.processImage = processImage
self.killBill = False
def stop(self):
self.killBill = True
def run(self):
modbusClient = self.config["client"]
modbusRefreshPeriod = float(self.config["scanrate"])
client = ModbusClient(modbusClient)
def modbusHandler(config, processImage):
modbusClient = config["modbus"]["client"]
modbusRefreshPeriod = float(config["modbus"]["scanrate"])
try:
client.connect()
res = client.read_holding_registers(0x1010, 4)
if isinstance(res, ModbusIOException):
raise Exception(self.processImage)
client = ModbusClient(modbusClient)
if len(res.registers) != 4:
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers)))
try:
client.connect()
(analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers
logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}")
res = client.read_holding_registers(0x1010, 4)
if isinstance(res, ModbusIOException):
raise Exception(processImage)
self.processImage.init(digitalOutputBits, digitalInputBits, analogInputBits)
if len(res.registers) != 4:
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers)))
reg = client.read_coils(0, digitalOutputBits)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
with self.processImage:
self.processImage.setCoils(reg.bits)
(analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers
logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}")
while not self.killBill:
try:
if not client.is_socket_open():
client.connect()
processImage.init(digitalOutputBits, digitalInputBits, analogInputBits)
reg = client.read_coils(0, digitalOutputBits)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
with processImage:
processImage.setCoils(reg.bits)
while True:
try:
if not client.is_socket_open():
client.connect()
reg = client.read_input_registers(0, analogInputBits // 8)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
analogInputs = reg.registers
reg = client.read_discrete_inputs(0, digitalInputBits)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
discreteInputs = reg.bits
coils = None
with processImage:
processImage.setAnalogsInputs(analogInputs)
processImage.setDiscreteInputs(discreteInputs)
if processImage.hasPendingInputChanges():
processImage.notify()
if processImage.hasPendingOutputChanges():
coils = processImage.getCoils()
if coils:
logger.debug("write coils to device: {}".format(coils))
reg = client.write_coils(0, coils)
reg = client.read_input_registers(0, analogInputBits // 8)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
analogInputs = reg.registers
except Exception as e:
logger.error("Exception in inner modbus handler loop: {}".format(e))
client.close()
finally:
#sleep(modbusRefreshPeriod)
if (processImage.coilEvent.wait(timeout=modbusRefreshPeriod)):
processImage.coilEvent.clear()
reg = client.read_discrete_inputs(0, digitalInputBits)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
discreteInputs = reg.bits
coils = None
with self.processImage:
self.processImage.setAnalogsInputs(analogInputs)
self.processImage.setDiscreteInputs(discreteInputs)
if self.processImage.hasPendingInputChanges():
self.processImage.notify()
if self.processImage.hasPendingOutputChanges():
coils = self.processImage.getCoils()
if coils:
logger.debug("write coils to device: {}".format(coils))
reg = client.write_coils(0, coils)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
except Exception as e:
logger.error("Exception in inner modbus handler loop: {}".format(e))
client.close()
finally:
#sleep(modbusRefreshPeriod)
if (self.processImage.coilEvent.wait(timeout=modbusRefreshPeriod)):
self.processImage.coilEvent.clear()
except Exception as e:
logger.error("Exception in modbus handler: {}".format(e))
finally:
client.close()
except Exception as e:
logger.error("Exception in modbus handler: {}".format(e))
finally:
client.close()

View File

@ -14,7 +14,7 @@ class MqttCoilSubscriber(AbstractMqttPublisher):
super().__init__(config, processImage)
def localLoop(self):
while True:
while not self.killBill:
sleep(float(self.config["analogInputPublishPeriod"]))
def onMessage(self, topic, payload):

View File

@ -15,9 +15,11 @@ class MqttEventPublisher(AbstractMqttPublisher):
self.disableAnalogInputEventPublishing = self.config["disableAnalogInputEventPublishing"].lower() in [ "true", "on" ]
def localLoop(self):
while True:
while not self.killBill:
with self.processImage:
self.processImage.wait()
if self.killBill:
continue
discreteInputChangeset = self.processImage.getChangedDiscreteInputs()
if not self.disableAnalogInputEventPublishing:

View File

@ -1,8 +1,6 @@
import paho.mqtt.client as mqtt
import threading
from threading import Event
from loguru import logger
from AbstractMqttHandler import AbstractMqttPublisher
from time import sleep
def mqttPeriodPublisherStart(config, processImage):
mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
@ -14,7 +12,7 @@ class MqttPeriodPublisher(AbstractMqttPublisher):
super().__init__(config, processImage)
def localLoop(self):
while True:
while not self.killBill:
with self.processImage:
if not self.processImage.isInitialized():
continue
@ -28,5 +26,5 @@ class MqttPeriodPublisher(AbstractMqttPublisher):
self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])),
str(analogInputItem[1]))
sleep(float(self.config["analogInputPublishPeriod"]))
self.killEvent.wait(timeout=float(self.config["analogInputPublishPeriod"]))

30
src/config.py Normal file
View File

@ -0,0 +1,30 @@
import os
from loguru import logger
class Config:
OPTIONS = {
'mqtt': [ 'broker',
'digitalInputTopicPrefix',
'digitalInputTopicPrefix',
'analogInputEventTopicPrefix',
'analogInputPeriodicTopicPrefix',
'analogInputPublishPeriod',
'disableAnalogInputEventPublishing' ],
'modbus': [ 'client', 'scanrate' ]
}
def __init__(self):
self.values = {}
for section, keys in Config.OPTIONS.items():
self.values[section] = {}
for key in keys:
varname = f"{section}__{key}".upper()
try:
self.values[section][key] = os.environ[varname]
logger.info(f"Config: {section} {key} -> {self.values[section][key]}")
except KeyError:
pass
def __getitem__(self, section):
return self.values[section]

View File

@ -1,43 +1,68 @@
from loguru import logger
import argparse
import configparser
import threading
from ProcessImage import ProcessImage
from ModbusHandler import modbusStart
from MqttEventPublisher import mqttEventPublisherStart
from MqttPeriodPublisher import mqttPeriodPublisherStart
from MqttCoilSubscriber import mqttCoilSubscriberStart
from ModbusHandler import ModbusHandler
from MqttEventPublisher import MqttEventPublisher, mqttEventPublisherStart
from MqttPeriodPublisher import MqttPeriodPublisher
from MqttCoilSubscriber import MqttCoilSubscriber
from config import Config
deathBell = threading.Event()
def exceptHook(args):
global deathBell
logger.error("Exception in thread caught: {}".format(args))
deathBell.set()
logger.error("rang the death bell")
logger.info("DigitalTwin1 starting")
parser = argparse.ArgumentParser(description="DigitalTwin1")
parser.add_argument('--config', '-f',
help='Config file, default is $pwd/config/config.ini',
required=False,
default='./config/config.ini')
args = parser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
config = Config()
processImage = ProcessImage()
modbusStart(config, processImage)
modbusHandlerThread = ModbusHandler(config, processImage)
modbusHandlerThread.start()
logger.info("Modbus handler running")
mqttEventPublisherStart(config, processImage)
mqttEventPublisherThread = MqttEventPublisher(config, processImage)
mqttEventPublisherThread.start()
logger.info("MQTT event publisher running")
mqttPeriodPublisherStart(config, processImage)
mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
mqttPeriodPublisherThread.start()
logger.info("MQTT period publisher running")
mqttCoilSubscriberStart(config, processImage)
mqttCoilSubscriberThread = MqttCoilSubscriber(config, processImage)
mqttCoilSubscriberThread.start()
logger.info("MQTT coil subscriber running")
threading.excepthook = exceptHook
logger.info("Threading excepthook set")
logger.info("DigitalTwin1 running")
deathBell.wait()
logger.error("DigitalTwin1 is dying")
modbusHandlerThread.stop()
mqttEventPublisherThread.stop()
mqttPeriodPublisherThread.stop()
mqttCoilSubscriberThread.stop()
modbusHandlerThread.join()
logger.error("modbusHandlerThread joined")
mqttEventPublisherThread.join()
logger.error("mqttEventPublisherThread joined")
mqttPeriodPublisherThread.join()
logger.error("mqttPeriodPublisherThread joined")
mqttCoilSubscriberThread.join()
logger.error("mqttCoilSubscriberThread joined")
logger.error("DigitalTwin1 to terminate")