actual publish something
This commit is contained in:
@ -5,10 +5,11 @@ import RegisterDatapoint
|
|||||||
from pymodbus.client.sync import ModbusSerialClient
|
from pymodbus.client.sync import ModbusSerialClient
|
||||||
|
|
||||||
class CommunicationProcessor(threading.Thread):
|
class CommunicationProcessor(threading.Thread):
|
||||||
def __init__(self, config, queue):
|
def __init__(self, config, queue, pubQueue):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.config = config
|
self.config = config
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
self.pubQueue = pubQueue
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
|
|
||||||
def __getSerial(self):
|
def __getSerial(self):
|
||||||
@ -26,7 +27,7 @@ class CommunicationProcessor(threading.Thread):
|
|||||||
try:
|
try:
|
||||||
print("Dequeued: {0!s}".format(r))
|
print("Dequeued: {0!s}".format(r))
|
||||||
r.enqueued = False
|
r.enqueued = False
|
||||||
r.process(client)
|
r.process(client, self.pubQueue)
|
||||||
except RegisterDatapoint.DatapointException as e:
|
except RegisterDatapoint.DatapointException as e:
|
||||||
print("ERROR when processing '{0}': {1!s}".format(r.label, e))
|
print("ERROR when processing '{0}': {1!s}".format(r.label, e))
|
||||||
if client.socket is None:
|
if client.socket is None:
|
||||||
|
@ -4,6 +4,11 @@ from NotificationForwarder import AbstractNotificationReceiver
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class PublishItem(object):
|
||||||
|
def __init__(self, topic, payload):
|
||||||
|
self.topic = topic
|
||||||
|
self.payload = payload
|
||||||
|
|
||||||
def mqttOnConnectCallback(client, userdata, flags, rc):
|
def mqttOnConnectCallback(client, userdata, flags, rc):
|
||||||
userdata.onConnect()
|
userdata.onConnect()
|
||||||
|
|
||||||
@ -14,11 +19,12 @@ def mqttOnDisconnectCallback(client, userdata, rc):
|
|||||||
userdata.onDisconnect(rc)
|
userdata.onDisconnect(rc)
|
||||||
|
|
||||||
class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
||||||
def __init__(self, config, registers, queue):
|
def __init__(self, config, registers, queue, pubQueue):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.config = config
|
self.config = config
|
||||||
self.registers = registers
|
self.registers = registers
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
self.pubQueue = pubQueue
|
||||||
self.client = mqtt.Client(userdata=self)
|
self.client = mqtt.Client(userdata=self)
|
||||||
self.subscriptions = []
|
self.subscriptions = []
|
||||||
self.topicRegisterMap ={}
|
self.topicRegisterMap ={}
|
||||||
@ -56,7 +62,15 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
|||||||
if self.config.mqttLogin and self.config.mqttPassword:
|
if self.config.mqttLogin and self.config.mqttPassword:
|
||||||
self.client.username_pw_set(self.config.mqttLogin, self.config.mqttPassword)
|
self.client.username_pw_set(self.config.mqttLogin, self.config.mqttPassword)
|
||||||
self.client.connect(self.config.mqttBrokerHost, self.config.mqttBrokerPort)
|
self.client.connect(self.config.mqttBrokerHost, self.config.mqttBrokerPort)
|
||||||
self.client.loop_forever()
|
self.client.loop_start()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
pubItem = self.pubQueue.get()
|
||||||
|
if isinstance(pubItem, PublishItem):
|
||||||
|
self.client.publish(pubItem.topic, pubItem.payload)
|
||||||
|
else:
|
||||||
|
print("Invalid object in publish queue")
|
||||||
|
|
||||||
|
|
||||||
def onConnect(self):
|
def onConnect(self):
|
||||||
# print("MqttProcessor.onConnect")
|
# print("MqttProcessor.onConnect")
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import datetime
|
import datetime
|
||||||
from pymodbus.pdu import ExceptionResponse
|
from pymodbus.pdu import ExceptionResponse
|
||||||
from pymodbus.exceptions import ModbusIOException
|
from pymodbus.exceptions import ModbusIOException
|
||||||
|
from src import MqttProcessor
|
||||||
|
|
||||||
|
|
||||||
class DatapointException(Exception): pass
|
class DatapointException(Exception): pass
|
||||||
@ -39,7 +40,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "[{0!s}, Read: {1}, Write: {2}, Feedback: {3}".format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic)
|
return "[{0!s}, Read: {1}, Write: {2}, Feedback: {3}".format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic)
|
||||||
|
|
||||||
def process(self, client):
|
def process(self, client, pubQueue):
|
||||||
successFull = True
|
successFull = True
|
||||||
giveUp = False
|
giveUp = False
|
||||||
if self.writeRequestValue:
|
if self.writeRequestValue:
|
||||||
@ -62,7 +63,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
|||||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||||
raise DatapointException(result)
|
raise DatapointException(result)
|
||||||
print("{0}: {1!s}".format(self.label, result.registers))
|
print("{0}: {1!s}".format(self.label, result.registers))
|
||||||
|
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(result.registers)))
|
||||||
if successFull:
|
if successFull:
|
||||||
self.lastContact = datetime.datetime.now()
|
self.lastContact = datetime.datetime.now()
|
||||||
# publish value
|
# publish value
|
||||||
@ -89,18 +90,19 @@ class InputRegisterDatapoint(AbstractModbusDatapoint):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "[{0!s}, {1}".format(super().__str__(), self.publishTopic)
|
return "[{0!s}, {1}".format(super().__str__(), self.publishTopic)
|
||||||
|
|
||||||
def process(self, client):
|
def process(self, client, pubQueue):
|
||||||
successFull = True
|
successFull = True
|
||||||
giveUp = False
|
giveUp = False
|
||||||
# perform read operation
|
# perform read operation
|
||||||
|
print("Input register, perform read operation")
|
||||||
result = client.read_input_registers(address=self.address,
|
result = client.read_input_registers(address=self.address,
|
||||||
count=self.count,
|
count=self.count,
|
||||||
unit=self.unit)
|
unit=self.unit)
|
||||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||||
raise DatapointException(result)
|
raise DatapointException(result)
|
||||||
print("{0}: {1!s}".format(self.label, result.registers))
|
print("{0}: {1!s}".format(self.label, result.registers))
|
||||||
|
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(result.registers)))
|
||||||
|
|
||||||
print("Input register, perform read operation")
|
|
||||||
if successFull:
|
if successFull:
|
||||||
self.lastContact = datetime.datetime.now()
|
self.lastContact = datetime.datetime.now()
|
||||||
# publish value
|
# publish value
|
||||||
|
@ -2,6 +2,7 @@ import CmdServer
|
|||||||
import MqttProcessor
|
import MqttProcessor
|
||||||
import CommunicationProcessor
|
import CommunicationProcessor
|
||||||
import MyPriorityQueue
|
import MyPriorityQueue
|
||||||
|
import queue
|
||||||
import NotificationForwarder
|
import NotificationForwarder
|
||||||
import Config
|
import Config
|
||||||
import ScanRateConsideringQueueFeeder
|
import ScanRateConsideringQueueFeeder
|
||||||
@ -13,6 +14,7 @@ import pickle
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
queue = MyPriorityQueue.MyPriorityQueue()
|
queue = MyPriorityQueue.MyPriorityQueue()
|
||||||
|
pubQueue = queue.queue()
|
||||||
nf = NotificationForwarder.NotificationForwarder()
|
nf = NotificationForwarder.NotificationForwarder()
|
||||||
config = Config.Config()
|
config = Config.Config()
|
||||||
|
|
||||||
@ -21,10 +23,10 @@ if __name__ == "__main__":
|
|||||||
datapoints = pickle.load(f)
|
datapoints = pickle.load(f)
|
||||||
RegisterDatapoint.checkRegisterList(datapoints)
|
RegisterDatapoint.checkRegisterList(datapoints)
|
||||||
|
|
||||||
cp = CommunicationProcessor.CommunicationProcessor(config, queue)
|
cp = CommunicationProcessor.CommunicationProcessor(config, queue, pubQueue)
|
||||||
cp.start()
|
cp.start()
|
||||||
|
|
||||||
mp = MqttProcessor.MqttProcessor(config, datapoints, queue)
|
mp = MqttProcessor.MqttProcessor(config, datapoints, queue, pubQueue)
|
||||||
nf.register(mp)
|
nf.register(mp)
|
||||||
mp.start()
|
mp.start()
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user