From cab0aec533b3bbd1b62181ba5852f530989ef432 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Sun, 28 Jul 2019 00:40:09 +0200 Subject: [PATCH] pins and heartbeat --- src/CommunicationProcessor.py | 13 ++++++++----- src/Config.py | 2 ++ src/Heartbeat.py | 20 ++++++++++++++++++++ src/MqttProcessor.py | 4 +++- src/MyRS485.py | 13 ++++++++----- src/Pins.py | 24 ++++++++++++++++++++++++ src/master.py | 11 +++++++++-- 7 files changed, 74 insertions(+), 13 deletions(-) create mode 100644 src/Heartbeat.py create mode 100644 src/Pins.py diff --git a/src/CommunicationProcessor.py b/src/CommunicationProcessor.py index 8c10295..0d4e086 100644 --- a/src/CommunicationProcessor.py +++ b/src/CommunicationProcessor.py @@ -3,7 +3,8 @@ import datetime # import RS485Ext import RegisterDatapoint from pymodbus.client.sync import ModbusSerialClient -import wiringpi +# import wiringpi +import Pins import MyRS485 import time import logging @@ -17,8 +18,8 @@ class CommunicationProcessor(threading.Thread): self.config = config self.queue = queue self.pubQueue = pubQueue - wiringpi.wiringPiSetup() - wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT) + # wiringpi.wiringPiSetup() + # wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT) self.daemon = True if self.config.modbusDebug: logging.getLogger('pymodbus').setLevel(logging.DEBUG) @@ -41,12 +42,14 @@ class CommunicationProcessor(threading.Thread): while True: r = self.queue.get() try: - wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW) + # wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW) + Pins.pinsWrite('ERROR', False) self.logger.debug("Dequeued: {0!s}".format(r)) r.enqueued = False r.process(client, self.pubQueue) except RegisterDatapoint.DatapointException as e: - wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH) + # wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH) + Pins.pinsWrite('ERROR', True) self.logger.error("ERROR when processing '{0}': {1!s}".format(r.label, e)) if client.socket is None: self.logger.error("renew socket") diff --git a/src/Config.py b/src/Config.py index 767974d..98ddd44 100644 --- a/src/Config.py +++ b/src/Config.py @@ -13,3 +13,5 @@ class Config(object): self.serialPort = '/dev/ttyAMA0' self.serialBaudRate = 9600 self.interCommDelay = 0.025 + self.heartbeatTopic = 'Iot/Heartbeat/Modbus2' + self.heartbeatPeriod = 10.0 \ No newline at end of file diff --git a/src/Heartbeat.py b/src/Heartbeat.py new file mode 100644 index 0000000..02e0c2d --- /dev/null +++ b/src/Heartbeat.py @@ -0,0 +1,20 @@ +import threading +import MqttProcessor +import logging +import time + +class Heartbeat(threading.Thread): + def __init__(self, config, pubQueue): + super().__init__() + self.config = config + self.pubQueue = pubQueue + self.daemon = True + self.logger = logging.getLogger('Heartbeat') + + def run(self): + cnt = 0 + while True: + cnt += 1 + pubItem = MqttProcessor.PublishItem(self.config.heartbeatTopic, str(cnt)) + self.pubQueue.put(pubItem) + time.sleep(self.config.heartbeatPeriod) diff --git a/src/MqttProcessor.py b/src/MqttProcessor.py index 7e9bbaf..0ab2cab 100644 --- a/src/MqttProcessor.py +++ b/src/MqttProcessor.py @@ -2,7 +2,7 @@ import threading import paho.mqtt.client as mqtt from NotificationForwarder import AbstractNotificationReceiver import logging - +import Pins class PublishItem(object): def __init__(self, topic, payload): @@ -68,6 +68,7 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): pubItem = self.pubQueue.get() if isinstance(pubItem, PublishItem): self.client.publish(pubItem.topic, pubItem.payload) + Pins.pinsWrite('MSG', False) else: self.logger.error("Invalid object in publish queue") @@ -80,6 +81,7 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): self.logger.error("Disconnected from MQTT broker: {0}".format(rc)) def onMessage(self, topic, payload): + Pins.pinsWrite('MSG', True) # print("MqttProcessor.onMessage") r = self.topicRegisterMap[topic] self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r)) diff --git a/src/MyRS485.py b/src/MyRS485.py index 96e8d81..e117e58 100644 --- a/src/MyRS485.py +++ b/src/MyRS485.py @@ -1,5 +1,6 @@ import serial -import wiringpi +# import wiringpi +import Pins import array import fcntl import termios @@ -9,16 +10,18 @@ DE_PIN = 0 class MyRS485(serial.Serial): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - wiringpi.wiringPiSetup() - wiringpi.pinMode(DE_PIN, wiringpi.OUTPUT) + # wiringpi.wiringPiSetup() + # wiringpi.pinMode(DE_PIN, wiringpi.OUTPUT) self.buf = array.array('h', [0]) def write(self, b): - wiringpi.digitalWrite(DE_PIN, wiringpi.HIGH) + # wiringpi.digitalWrite(DE_PIN, wiringpi.HIGH) + Pins.pinsWrite('DE', True) super().write(b) while True: fcntl.ioctl(self.fileno(), termios.TIOCSERGETLSR, self.buf, 1) if self.buf[0] & termios.TIOCSER_TEMT: break - wiringpi.digitalWrite(DE_PIN, wiringpi.LOW) + # wiringpi.digitalWrite(DE_PIN, wiringpi.LOW) + Pins.pinsWrite('DE', False diff --git a/src/Pins.py b/src/Pins.py new file mode 100644 index 0000000..e639603 --- /dev/null +++ b/src/Pins.py @@ -0,0 +1,24 @@ +import wiringpi + + +PINS = { + 'DE': 0, + 'ERROR': 29, + 'MSG': 28 +} + + + +def pinsInit(): + wiringpi.wiringPiSetup() + for pin in PINS.values(): + wiringpi.pinMode(pin, wiringpi.OUTPUT) + + +def pinsWrite(pinName, v): + if v: + pinState = wiringpi.HIGH + else: + pinState = wiringpi.LOW + wiringpi.digitalWrite(PINS[pinName], pinState) + diff --git a/src/master.py b/src/master.py index 4e55f50..9e5c8b6 100644 --- a/src/master.py +++ b/src/master.py @@ -10,8 +10,8 @@ import datetime import RegisterDatapoint import pickle import logging - - +import Pins +import Heartbeat if __name__ == "__main__": @@ -29,6 +29,8 @@ if __name__ == "__main__": logger.addHandler(fh) logger.addHandler(ch) + Pins.pinsInit() + queue = MyPriorityQueue.MyPriorityQueue() pubQueue = Queue() nf = NotificationForwarder.NotificationForwarder() @@ -55,3 +57,8 @@ if __name__ == "__main__": cs = CmdServer.CmdServer(config, nf, datapoints) cs.start() logger.debug('CmdServer started') + + hb = Heartbeat.HeartBeat(config, pubQueue) + hb.start() + logger.debug('Heartbeat started') +