input working
This commit is contained in:
parent
a37f4945d8
commit
17e5b6b012
50
src/AbstractMqttHandler.py
Normal file
50
src/AbstractMqttHandler.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import threading
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
|
||||||
|
def mqttOnConnectCallback(client, userdata, flags, rc):
|
||||||
|
userdata.onConnect()
|
||||||
|
|
||||||
|
def mqttOnMessageCallback(client, userdata, message):
|
||||||
|
userdata.onMessage(message.topic, message.payload)
|
||||||
|
|
||||||
|
def mqttOnDisconnectCallback(client, userdata, rc):
|
||||||
|
userdata.onDisconnect(rc)
|
||||||
|
|
||||||
|
|
||||||
|
class AbstractMqttPublisher(threading.Thread):
|
||||||
|
def __init__(self, config, processImage):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
self.config = config["mqtt"]
|
||||||
|
self.processImage = processImage
|
||||||
|
|
||||||
|
self.client = mqtt.Client(userdata=self)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.client.on_message = mqttOnMessageCallback
|
||||||
|
self.client.on_connect = mqttOnConnectCallback
|
||||||
|
self.client.on_disconnect = mqttOnDisconnectCallback
|
||||||
|
|
||||||
|
if ("login" in self.config) and ("password" in self.config):
|
||||||
|
self.client.username_pw_set(self.config["login"], self.config["password"])
|
||||||
|
|
||||||
|
self.client.connect(self.config["broker"])
|
||||||
|
self.client.loop_start()
|
||||||
|
logger.info("mqtt loop started")
|
||||||
|
|
||||||
|
self.localLoop()
|
||||||
|
|
||||||
|
def localLoop(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def onConnect(self):
|
||||||
|
logger.info("mqtt connected")
|
||||||
|
|
||||||
|
def onDisconnect(self, rc):
|
||||||
|
logger.warning("mqtt disconnect, rc: {}".format(rc))
|
||||||
|
|
||||||
|
def onMessage(self, topic, payload):
|
||||||
|
logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload)))
|
||||||
|
|
@ -2,15 +2,20 @@ from pymodbus.client.sync import ModbusTcpClient as ModbusClient
|
|||||||
from pymodbus.exceptions import ModbusIOException
|
from pymodbus.exceptions import ModbusIOException
|
||||||
from time import sleep
|
from time import sleep
|
||||||
import threading
|
import threading
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
|
||||||
MODBUS_CLIENT = '172.16.2.157'
|
def modbusStart(config, processImage):
|
||||||
MODBUS_REFRESH_PERIOD = 0.25
|
modbusThread = threading.Thread(target=modbusHandler, args=[config, processImage])
|
||||||
|
modbusThread.start()
|
||||||
|
|
||||||
|
|
||||||
|
def modbusHandler(config, processImage):
|
||||||
|
modbusClient = config["modbus"]["client"]
|
||||||
|
modbusRefreshPeriod = config["modbus"]["scanrate"]
|
||||||
|
|
||||||
def modbusHandler(processImage):
|
|
||||||
client = ModbusClient(MODBUS_CLIENT)
|
client = ModbusClient(modbusClient)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
client.connect()
|
client.connect()
|
||||||
@ -23,7 +28,7 @@ def modbusHandler(processImage):
|
|||||||
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers)))
|
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers)))
|
||||||
|
|
||||||
(analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers
|
(analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers
|
||||||
print(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}")
|
logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}")
|
||||||
|
|
||||||
processImage.init(digitalOutputBits, digitalInputBits, analogInputBits)
|
processImage.init(digitalOutputBits, digitalInputBits, analogInputBits)
|
||||||
|
|
||||||
@ -63,12 +68,12 @@ def modbusHandler(processImage):
|
|||||||
raise Exception(reg)
|
raise Exception(reg)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("Exception in inner modbus handler loop: {}".format(e))
|
logger.error("Exception in inner modbus handler loop: {}".format(e))
|
||||||
client.close()
|
client.close()
|
||||||
finally:
|
finally:
|
||||||
sleep(MODBUS_REFRESH_PERIOD)
|
sleep(modbusRefreshPeriod)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("Exception in modbus handler: {}".format(e))
|
logger.error("Exception in modbus handler: {}".format(e))
|
||||||
finally:
|
finally:
|
||||||
client.close()
|
client.close()
|
||||||
|
41
src/MqttEventPublisher.py
Normal file
41
src/MqttEventPublisher.py
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import threading
|
||||||
|
from loguru import logger
|
||||||
|
from AbstractMqttHandler import AbstractMqttPublisher
|
||||||
|
|
||||||
|
|
||||||
|
def mqttEventPublisherStart(config, processImage):
|
||||||
|
mqttEventPublisherThread = MqttEventPublisher(config, processImage)
|
||||||
|
mqttEventPublisherThread.start()
|
||||||
|
|
||||||
|
|
||||||
|
class MqttEventPublisher(AbstractMqttPublisher):
|
||||||
|
def __init__(self, config, processImage):
|
||||||
|
super().__init__(config, processImage)
|
||||||
|
|
||||||
|
def localLoop(self):
|
||||||
|
while True:
|
||||||
|
with self.processImage:
|
||||||
|
self.processImage.wait()
|
||||||
|
|
||||||
|
discreteInputChangeset = self.processImage.getChangedDiscreteInputs()
|
||||||
|
analogInputChangeset = self.processImage.getChangedAnalogsInputs()
|
||||||
|
|
||||||
|
for discreteInputChangeItem in discreteInputChangeset:
|
||||||
|
logger.debug("Discrete input {} changed from {} to {}"
|
||||||
|
.format(discreteInputChangeItem[0],
|
||||||
|
discreteInputChangeItem[1][1],
|
||||||
|
discreteInputChangeItem[1][0]))
|
||||||
|
self.client.publish("{}/{}".format(self.config["digitalInputTopicPrefix"], str(discreteInputChangeItem[0])),
|
||||||
|
str(discreteInputChangeItem[1][0]))
|
||||||
|
|
||||||
|
for analogInputChangeItem in analogInputChangeset:
|
||||||
|
logger.debug("Analog input {} changed from {} to {}"
|
||||||
|
.format(analogInputChangeItem[0],
|
||||||
|
analogInputChangeItem[1][1],
|
||||||
|
analogInputChangeItem[1][0]))
|
||||||
|
|
||||||
|
self.client.publish("{}/{}".format(self.config["analogInputEventTopicPrefix"], str(analogInputChangeItem[0])),
|
||||||
|
str(analogInputChangeItem[1][0]))
|
||||||
|
|
||||||
|
|
32
src/MqttPeriodPublisher.py
Normal file
32
src/MqttPeriodPublisher.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import threading
|
||||||
|
from loguru import logger
|
||||||
|
from AbstractMqttHandler import AbstractMqttPublisher
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
def mqttPeriodPublisherStart(config, processImage):
|
||||||
|
mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
|
||||||
|
mqttPeriodPublisherThread.start()
|
||||||
|
|
||||||
|
|
||||||
|
class MqttPeriodPublisher(AbstractMqttPublisher):
|
||||||
|
def __init__(self, config, processImage):
|
||||||
|
super().__init__(config, processImage)
|
||||||
|
|
||||||
|
def localLoop(self):
|
||||||
|
while True:
|
||||||
|
with self.processImage:
|
||||||
|
if not self.processImage.isInitialized():
|
||||||
|
continue
|
||||||
|
analogInputs = self.processImage.getAnalogsInputs()
|
||||||
|
|
||||||
|
for analogInputItem in enumerate(analogInputs):
|
||||||
|
logger.debug("Analog input {} is {}"
|
||||||
|
.format(analogInputItem[0],
|
||||||
|
analogInputItem[1]))
|
||||||
|
|
||||||
|
self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])),
|
||||||
|
str(analogInputItem[1]))
|
||||||
|
|
||||||
|
sleep(self.config["analogInputPublishPeriod"])
|
||||||
|
|
@ -47,7 +47,6 @@ class ProcessImage(Condition):
|
|||||||
def getDiscreteInputs(self):
|
def getDiscreteInputs(self):
|
||||||
if not self.initialized:
|
if not self.initialized:
|
||||||
raise NotInitializedException
|
raise NotInitializedException
|
||||||
self.shadowDiscreteInputs = self.discreteInputs
|
|
||||||
return self.discreteInputs
|
return self.discreteInputs
|
||||||
|
|
||||||
def setCoils(self, coils):
|
def setCoils(self, coils):
|
||||||
@ -65,7 +64,6 @@ class ProcessImage(Condition):
|
|||||||
def getCoils(self):
|
def getCoils(self):
|
||||||
if not self.initialized:
|
if not self.initialized:
|
||||||
raise NotInitializedException
|
raise NotInitializedException
|
||||||
self.shadowCoils = self.coils
|
|
||||||
return self.coils
|
return self.coils
|
||||||
|
|
||||||
def setAnalogsInputs(self, analogInputs):
|
def setAnalogsInputs(self, analogInputs):
|
||||||
@ -83,7 +81,6 @@ class ProcessImage(Condition):
|
|||||||
def getAnalogsInputs(self):
|
def getAnalogsInputs(self):
|
||||||
if not self.initialized:
|
if not self.initialized:
|
||||||
raise NotInitializedException
|
raise NotInitializedException
|
||||||
self.shadowAnalogInputs = self.analogInputs
|
|
||||||
return self.analogInputs
|
return self.analogInputs
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,14 +1,39 @@
|
|||||||
from ProcessImage import ProcessImage
|
from ProcessImage import ProcessImage
|
||||||
from ModbusHandler import modbusHandler
|
from ModbusHandler import modbusStart
|
||||||
from DummyPublisher import dummyPublisher as publisher
|
from MqttEventPublisher import mqttEventPublisherStart
|
||||||
|
from MqttPeriodPublisher import mqttPeriodPublisherStart
|
||||||
import threading
|
import threading
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
|
||||||
|
config = {
|
||||||
|
"modbus": {
|
||||||
|
"client": "172.16.2.157",
|
||||||
|
"scanrate": 0.25
|
||||||
|
},
|
||||||
|
"mqtt": {
|
||||||
|
"broker": "172.16.2.16",
|
||||||
|
"digitalOutputTopicPrefix": "dt1/coil",
|
||||||
|
"digitalInputTopicPrefix": "dt1/di",
|
||||||
|
"analogInputEventTopicPrefix": "dt1/ai/event",
|
||||||
|
"analogInputPeriodicTopicPrefix": "dt1/ai/periodic",
|
||||||
|
"analogInputPublishPeriod": 10.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
logger.info("DigitalTwin1 starting")
|
||||||
|
|
||||||
processImage = ProcessImage()
|
processImage = ProcessImage()
|
||||||
|
|
||||||
modbusThread = threading.Thread(target=modbusHandler, args=[processImage])
|
modbusStart(config, processImage)
|
||||||
publisherThread = threading.Thread(target=publisher, args=[processImage])
|
logger.info("Modbus handler running")
|
||||||
|
|
||||||
modbusThread.start()
|
mqttEventPublisherStart(config, processImage)
|
||||||
publisherThread.start()
|
logger.info("MQTT event publisher running")
|
||||||
|
|
||||||
|
mqttPeriodPublisherStart(config, processImage)
|
||||||
|
logger.info("MQTT period publisher running")
|
||||||
|
|
||||||
|
logger.info("DigitalTwin1 running")
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user