Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
4ea85f3c7e
|
|||
5cabef91ea
|
29
.woodpecker.yml
Normal file
29
.woodpecker.yml
Normal file
@ -0,0 +1,29 @@
|
||||
steps:
|
||||
build:
|
||||
image: plugins/kaniko
|
||||
settings:
|
||||
repo: gitea.hottis.de/wn/pv-controller
|
||||
registry:
|
||||
from_secret: container_registry
|
||||
tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG}
|
||||
username:
|
||||
from_secret: container_registry_username
|
||||
password:
|
||||
from_secret: container_registry_password
|
||||
dockerfile: Dockerfile
|
||||
when:
|
||||
- event: [push, tag]
|
||||
|
||||
deploy:
|
||||
image: portainer/kubectl-shell:latest
|
||||
secrets:
|
||||
- source: kube_config
|
||||
target: KUBE_CONFIG_CONTENT
|
||||
commands:
|
||||
- export IMAGE_TAG=$CI_COMMIT_TAG
|
||||
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig
|
||||
- export KUBECONFIG=/tmp/kubeconfig
|
||||
- cat ./deployment/install-yml.tmpl | sed -e 's,%IMAGETAG%,'$IMAGE_TAG','g | kubectl apply -f -
|
||||
when:
|
||||
- event: tag
|
||||
|
44
deployment/install-yml.tmpl
Normal file
44
deployment/install-yml.tmpl
Normal file
@ -0,0 +1,44 @@
|
||||
apiVersion: v1
|
||||
kind: Namespace
|
||||
metadata:
|
||||
name: homea
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: digitaltwin1
|
||||
namespace: homea
|
||||
data:
|
||||
MQTT__BROKER: "emqx01-anonymous-cluster-internal.broker.svc.cluster.local"
|
||||
MQTT__DIGITALOUTPUTTOPICPREFIX: "dt1/coil"
|
||||
MQTT__DIGITALINPUTTOPICPREFIX: "dt1/di"
|
||||
MQTT__ANALOGINPUTEVENTTOPICPREFIX: "dt1/ai/event"
|
||||
MQTT__ANALOGINPUTPERIODICTOPICPREFIX: "dt1/ai/periodic"
|
||||
MQTT__ANALOGPUBLISHPERIOD: "10.0"
|
||||
MQTT__DISABLEANALOGINPUTEVENTPUBLISHING: "true"
|
||||
MODBUS__CLIENT: "172.16.2.157"
|
||||
MODBUS__SCANRATE: "0.25"
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: digitaltwin1
|
||||
namespace: homea
|
||||
labels:
|
||||
app: digitaltwin1
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: digitaltwin1
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: digitaltwin1
|
||||
spec:
|
||||
containers:
|
||||
- name: digitaltwin1
|
||||
image: gitea.hottis.de/wn/digitaltwin1:%IMAGETAG%
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: digitaltwin1
|
@ -22,6 +22,10 @@ class AbstractMqttPublisher(threading.Thread):
|
||||
|
||||
self.client = mqtt.Client(userdata=self)
|
||||
|
||||
# consider this flag in the localLoop
|
||||
self.killBill = False
|
||||
self.killEvent = threading.Event()
|
||||
|
||||
def run(self):
|
||||
self.client.on_message = mqttOnMessageCallback
|
||||
self.client.on_connect = mqttOnConnectCallback
|
||||
@ -39,6 +43,18 @@ class AbstractMqttPublisher(threading.Thread):
|
||||
def localLoop(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def stop(self):
|
||||
self.client.loop_stop()
|
||||
logger.info("mqtt loop stopped")
|
||||
|
||||
self.killBill = True
|
||||
logger.info("kill flag set")
|
||||
|
||||
self.killEvent.set()
|
||||
with self.processImage:
|
||||
self.processImage.notify()
|
||||
logger.info("kill events triggered")
|
||||
|
||||
def onConnect(self):
|
||||
logger.info("mqtt connected")
|
||||
|
||||
|
@ -5,79 +5,86 @@ import threading
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def modbusStart(config, processImage):
|
||||
modbusThread = threading.Thread(target=modbusHandler, args=[config, processImage])
|
||||
modbusThread.start()
|
||||
|
||||
class ModbusHandler(threading.Thread):
|
||||
def __init__(self, config, processImage):
|
||||
super().__init__()
|
||||
|
||||
self.config = config["modbus"]
|
||||
self.processImage = processImage
|
||||
self.killBill = False
|
||||
|
||||
def stop(self):
|
||||
self.killBill = True
|
||||
|
||||
def run(self):
|
||||
modbusClient = self.config["client"]
|
||||
modbusRefreshPeriod = float(self.config["scanrate"])
|
||||
|
||||
client = ModbusClient(modbusClient)
|
||||
|
||||
|
||||
def modbusHandler(config, processImage):
|
||||
modbusClient = config["modbus"]["client"]
|
||||
modbusRefreshPeriod = float(config["modbus"]["scanrate"])
|
||||
try:
|
||||
client.connect()
|
||||
|
||||
res = client.read_holding_registers(0x1010, 4)
|
||||
if isinstance(res, ModbusIOException):
|
||||
raise Exception(self.processImage)
|
||||
|
||||
client = ModbusClient(modbusClient)
|
||||
if len(res.registers) != 4:
|
||||
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers)))
|
||||
|
||||
try:
|
||||
client.connect()
|
||||
(analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers
|
||||
logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}")
|
||||
|
||||
res = client.read_holding_registers(0x1010, 4)
|
||||
if isinstance(res, ModbusIOException):
|
||||
raise Exception(processImage)
|
||||
self.processImage.init(digitalOutputBits, digitalInputBits, analogInputBits)
|
||||
|
||||
if len(res.registers) != 4:
|
||||
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers)))
|
||||
reg = client.read_coils(0, digitalOutputBits)
|
||||
if isinstance(reg, ModbusIOException):
|
||||
raise Exception(reg)
|
||||
with self.processImage:
|
||||
self.processImage.setCoils(reg.bits)
|
||||
|
||||
(analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers
|
||||
logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}")
|
||||
while not self.killBill:
|
||||
try:
|
||||
if not client.is_socket_open():
|
||||
client.connect()
|
||||
|
||||
processImage.init(digitalOutputBits, digitalInputBits, analogInputBits)
|
||||
|
||||
reg = client.read_coils(0, digitalOutputBits)
|
||||
if isinstance(reg, ModbusIOException):
|
||||
raise Exception(reg)
|
||||
with processImage:
|
||||
processImage.setCoils(reg.bits)
|
||||
|
||||
while True:
|
||||
try:
|
||||
if not client.is_socket_open():
|
||||
client.connect()
|
||||
|
||||
reg = client.read_input_registers(0, analogInputBits // 8)
|
||||
if isinstance(reg, ModbusIOException):
|
||||
raise Exception(reg)
|
||||
analogInputs = reg.registers
|
||||
|
||||
reg = client.read_discrete_inputs(0, digitalInputBits)
|
||||
if isinstance(reg, ModbusIOException):
|
||||
raise Exception(reg)
|
||||
discreteInputs = reg.bits
|
||||
|
||||
coils = None
|
||||
with processImage:
|
||||
processImage.setAnalogsInputs(analogInputs)
|
||||
processImage.setDiscreteInputs(discreteInputs)
|
||||
if processImage.hasPendingInputChanges():
|
||||
processImage.notify()
|
||||
if processImage.hasPendingOutputChanges():
|
||||
coils = processImage.getCoils()
|
||||
|
||||
if coils:
|
||||
logger.debug("write coils to device: {}".format(coils))
|
||||
reg = client.write_coils(0, coils)
|
||||
reg = client.read_input_registers(0, analogInputBits // 8)
|
||||
if isinstance(reg, ModbusIOException):
|
||||
raise Exception(reg)
|
||||
analogInputs = reg.registers
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Exception in inner modbus handler loop: {}".format(e))
|
||||
client.close()
|
||||
finally:
|
||||
#sleep(modbusRefreshPeriod)
|
||||
if (processImage.coilEvent.wait(timeout=modbusRefreshPeriod)):
|
||||
processImage.coilEvent.clear()
|
||||
reg = client.read_discrete_inputs(0, digitalInputBits)
|
||||
if isinstance(reg, ModbusIOException):
|
||||
raise Exception(reg)
|
||||
discreteInputs = reg.bits
|
||||
|
||||
coils = None
|
||||
with self.processImage:
|
||||
self.processImage.setAnalogsInputs(analogInputs)
|
||||
self.processImage.setDiscreteInputs(discreteInputs)
|
||||
if self.processImage.hasPendingInputChanges():
|
||||
self.processImage.notify()
|
||||
if self.processImage.hasPendingOutputChanges():
|
||||
coils = self.processImage.getCoils()
|
||||
|
||||
if coils:
|
||||
logger.debug("write coils to device: {}".format(coils))
|
||||
reg = client.write_coils(0, coils)
|
||||
if isinstance(reg, ModbusIOException):
|
||||
raise Exception(reg)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Exception in inner modbus handler loop: {}".format(e))
|
||||
client.close()
|
||||
finally:
|
||||
#sleep(modbusRefreshPeriod)
|
||||
if (self.processImage.coilEvent.wait(timeout=modbusRefreshPeriod)):
|
||||
self.processImage.coilEvent.clear()
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Exception in modbus handler: {}".format(e))
|
||||
finally:
|
||||
client.close()
|
||||
except Exception as e:
|
||||
logger.error("Exception in modbus handler: {}".format(e))
|
||||
finally:
|
||||
client.close()
|
||||
|
@ -14,7 +14,7 @@ class MqttCoilSubscriber(AbstractMqttPublisher):
|
||||
super().__init__(config, processImage)
|
||||
|
||||
def localLoop(self):
|
||||
while True:
|
||||
while not self.killBill:
|
||||
sleep(float(self.config["analogInputPublishPeriod"]))
|
||||
|
||||
def onMessage(self, topic, payload):
|
||||
|
@ -15,9 +15,11 @@ class MqttEventPublisher(AbstractMqttPublisher):
|
||||
self.disableAnalogInputEventPublishing = self.config["disableAnalogInputEventPublishing"].lower() in [ "true", "on" ]
|
||||
|
||||
def localLoop(self):
|
||||
while True:
|
||||
while not self.killBill:
|
||||
with self.processImage:
|
||||
self.processImage.wait()
|
||||
if self.killBill:
|
||||
continue
|
||||
|
||||
discreteInputChangeset = self.processImage.getChangedDiscreteInputs()
|
||||
if not self.disableAnalogInputEventPublishing:
|
||||
|
@ -1,8 +1,6 @@
|
||||
import paho.mqtt.client as mqtt
|
||||
import threading
|
||||
from threading import Event
|
||||
from loguru import logger
|
||||
from AbstractMqttHandler import AbstractMqttPublisher
|
||||
from time import sleep
|
||||
|
||||
def mqttPeriodPublisherStart(config, processImage):
|
||||
mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
|
||||
@ -14,7 +12,7 @@ class MqttPeriodPublisher(AbstractMqttPublisher):
|
||||
super().__init__(config, processImage)
|
||||
|
||||
def localLoop(self):
|
||||
while True:
|
||||
while not self.killBill:
|
||||
with self.processImage:
|
||||
if not self.processImage.isInitialized():
|
||||
continue
|
||||
@ -28,5 +26,5 @@ class MqttPeriodPublisher(AbstractMqttPublisher):
|
||||
self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])),
|
||||
str(analogInputItem[1]))
|
||||
|
||||
sleep(float(self.config["analogInputPublishPeriod"]))
|
||||
self.killEvent.wait(timeout=float(self.config["analogInputPublishPeriod"]))
|
||||
|
||||
|
30
src/config.py
Normal file
30
src/config.py
Normal file
@ -0,0 +1,30 @@
|
||||
import os
|
||||
from loguru import logger
|
||||
|
||||
class Config:
|
||||
OPTIONS = {
|
||||
'mqtt': [ 'broker',
|
||||
'digitalInputTopicPrefix',
|
||||
'digitalInputTopicPrefix',
|
||||
'analogInputEventTopicPrefix',
|
||||
'analogInputPeriodicTopicPrefix',
|
||||
'analogInputPublishPeriod',
|
||||
'disableAnalogInputEventPublishing' ],
|
||||
'modbus': [ 'client', 'scanrate' ]
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.values = {}
|
||||
for section, keys in Config.OPTIONS.items():
|
||||
self.values[section] = {}
|
||||
for key in keys:
|
||||
varname = f"{section}__{key}".upper()
|
||||
try:
|
||||
self.values[section][key] = os.environ[varname]
|
||||
logger.info(f"Config: {section} {key} -> {self.values[section][key]}")
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def __getitem__(self, section):
|
||||
return self.values[section]
|
||||
|
@ -1,43 +1,68 @@
|
||||
from loguru import logger
|
||||
import argparse
|
||||
import configparser
|
||||
import threading
|
||||
|
||||
from ProcessImage import ProcessImage
|
||||
from ModbusHandler import modbusStart
|
||||
from MqttEventPublisher import mqttEventPublisherStart
|
||||
from MqttPeriodPublisher import mqttPeriodPublisherStart
|
||||
from MqttCoilSubscriber import mqttCoilSubscriberStart
|
||||
from ModbusHandler import ModbusHandler
|
||||
from MqttEventPublisher import MqttEventPublisher, mqttEventPublisherStart
|
||||
from MqttPeriodPublisher import MqttPeriodPublisher
|
||||
from MqttCoilSubscriber import MqttCoilSubscriber
|
||||
from config import Config
|
||||
|
||||
deathBell = threading.Event()
|
||||
|
||||
def exceptHook(args):
|
||||
global deathBell
|
||||
logger.error("Exception in thread caught: {}".format(args))
|
||||
deathBell.set()
|
||||
logger.error("rang the death bell")
|
||||
|
||||
|
||||
logger.info("DigitalTwin1 starting")
|
||||
|
||||
parser = argparse.ArgumentParser(description="DigitalTwin1")
|
||||
parser.add_argument('--config', '-f',
|
||||
help='Config file, default is $pwd/config/config.ini',
|
||||
required=False,
|
||||
default='./config/config.ini')
|
||||
args = parser.parse_args()
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(args.config)
|
||||
|
||||
|
||||
|
||||
config = Config()
|
||||
|
||||
processImage = ProcessImage()
|
||||
|
||||
modbusStart(config, processImage)
|
||||
modbusHandlerThread = ModbusHandler(config, processImage)
|
||||
modbusHandlerThread.start()
|
||||
logger.info("Modbus handler running")
|
||||
|
||||
mqttEventPublisherStart(config, processImage)
|
||||
mqttEventPublisherThread = MqttEventPublisher(config, processImage)
|
||||
mqttEventPublisherThread.start()
|
||||
logger.info("MQTT event publisher running")
|
||||
|
||||
mqttPeriodPublisherStart(config, processImage)
|
||||
mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
|
||||
mqttPeriodPublisherThread.start()
|
||||
logger.info("MQTT period publisher running")
|
||||
|
||||
mqttCoilSubscriberStart(config, processImage)
|
||||
mqttCoilSubscriberThread = MqttCoilSubscriber(config, processImage)
|
||||
mqttCoilSubscriberThread.start()
|
||||
logger.info("MQTT coil subscriber running")
|
||||
|
||||
threading.excepthook = exceptHook
|
||||
logger.info("Threading excepthook set")
|
||||
|
||||
logger.info("DigitalTwin1 running")
|
||||
|
||||
|
||||
deathBell.wait()
|
||||
logger.error("DigitalTwin1 is dying")
|
||||
|
||||
modbusHandlerThread.stop()
|
||||
mqttEventPublisherThread.stop()
|
||||
mqttPeriodPublisherThread.stop()
|
||||
mqttCoilSubscriberThread.stop()
|
||||
|
||||
modbusHandlerThread.join()
|
||||
logger.error("modbusHandlerThread joined")
|
||||
mqttEventPublisherThread.join()
|
||||
logger.error("mqttEventPublisherThread joined")
|
||||
mqttPeriodPublisherThread.join()
|
||||
logger.error("mqttPeriodPublisherThread joined")
|
||||
mqttCoilSubscriberThread.join()
|
||||
logger.error("mqttCoilSubscriberThread joined")
|
||||
|
||||
logger.error("DigitalTwin1 to terminate")
|
||||
|
||||
|
Reference in New Issue
Block a user