pins and heartbeat
This commit is contained in:
parent
35c282b6ea
commit
cab0aec533
@ -3,7 +3,8 @@ import datetime
|
|||||||
# import RS485Ext
|
# import RS485Ext
|
||||||
import RegisterDatapoint
|
import RegisterDatapoint
|
||||||
from pymodbus.client.sync import ModbusSerialClient
|
from pymodbus.client.sync import ModbusSerialClient
|
||||||
import wiringpi
|
# import wiringpi
|
||||||
|
import Pins
|
||||||
import MyRS485
|
import MyRS485
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
@ -17,8 +18,8 @@ class CommunicationProcessor(threading.Thread):
|
|||||||
self.config = config
|
self.config = config
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.pubQueue = pubQueue
|
self.pubQueue = pubQueue
|
||||||
wiringpi.wiringPiSetup()
|
# wiringpi.wiringPiSetup()
|
||||||
wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT)
|
# wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT)
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
if self.config.modbusDebug:
|
if self.config.modbusDebug:
|
||||||
logging.getLogger('pymodbus').setLevel(logging.DEBUG)
|
logging.getLogger('pymodbus').setLevel(logging.DEBUG)
|
||||||
@ -41,12 +42,14 @@ class CommunicationProcessor(threading.Thread):
|
|||||||
while True:
|
while True:
|
||||||
r = self.queue.get()
|
r = self.queue.get()
|
||||||
try:
|
try:
|
||||||
wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW)
|
# wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW)
|
||||||
|
Pins.pinsWrite('ERROR', False)
|
||||||
self.logger.debug("Dequeued: {0!s}".format(r))
|
self.logger.debug("Dequeued: {0!s}".format(r))
|
||||||
r.enqueued = False
|
r.enqueued = False
|
||||||
r.process(client, self.pubQueue)
|
r.process(client, self.pubQueue)
|
||||||
except RegisterDatapoint.DatapointException as e:
|
except RegisterDatapoint.DatapointException as e:
|
||||||
wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH)
|
# wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH)
|
||||||
|
Pins.pinsWrite('ERROR', True)
|
||||||
self.logger.error("ERROR when processing '{0}': {1!s}".format(r.label, e))
|
self.logger.error("ERROR when processing '{0}': {1!s}".format(r.label, e))
|
||||||
if client.socket is None:
|
if client.socket is None:
|
||||||
self.logger.error("renew socket")
|
self.logger.error("renew socket")
|
||||||
|
@ -13,3 +13,5 @@ class Config(object):
|
|||||||
self.serialPort = '/dev/ttyAMA0'
|
self.serialPort = '/dev/ttyAMA0'
|
||||||
self.serialBaudRate = 9600
|
self.serialBaudRate = 9600
|
||||||
self.interCommDelay = 0.025
|
self.interCommDelay = 0.025
|
||||||
|
self.heartbeatTopic = 'Iot/Heartbeat/Modbus2'
|
||||||
|
self.heartbeatPeriod = 10.0
|
20
src/Heartbeat.py
Normal file
20
src/Heartbeat.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
import threading
|
||||||
|
import MqttProcessor
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
|
class Heartbeat(threading.Thread):
|
||||||
|
def __init__(self, config, pubQueue):
|
||||||
|
super().__init__()
|
||||||
|
self.config = config
|
||||||
|
self.pubQueue = pubQueue
|
||||||
|
self.daemon = True
|
||||||
|
self.logger = logging.getLogger('Heartbeat')
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
cnt = 0
|
||||||
|
while True:
|
||||||
|
cnt += 1
|
||||||
|
pubItem = MqttProcessor.PublishItem(self.config.heartbeatTopic, str(cnt))
|
||||||
|
self.pubQueue.put(pubItem)
|
||||||
|
time.sleep(self.config.heartbeatPeriod)
|
@ -2,7 +2,7 @@ import threading
|
|||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
from NotificationForwarder import AbstractNotificationReceiver
|
from NotificationForwarder import AbstractNotificationReceiver
|
||||||
import logging
|
import logging
|
||||||
|
import Pins
|
||||||
|
|
||||||
class PublishItem(object):
|
class PublishItem(object):
|
||||||
def __init__(self, topic, payload):
|
def __init__(self, topic, payload):
|
||||||
@ -68,6 +68,7 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
|||||||
pubItem = self.pubQueue.get()
|
pubItem = self.pubQueue.get()
|
||||||
if isinstance(pubItem, PublishItem):
|
if isinstance(pubItem, PublishItem):
|
||||||
self.client.publish(pubItem.topic, pubItem.payload)
|
self.client.publish(pubItem.topic, pubItem.payload)
|
||||||
|
Pins.pinsWrite('MSG', False)
|
||||||
else:
|
else:
|
||||||
self.logger.error("Invalid object in publish queue")
|
self.logger.error("Invalid object in publish queue")
|
||||||
|
|
||||||
@ -80,6 +81,7 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
|||||||
self.logger.error("Disconnected from MQTT broker: {0}".format(rc))
|
self.logger.error("Disconnected from MQTT broker: {0}".format(rc))
|
||||||
|
|
||||||
def onMessage(self, topic, payload):
|
def onMessage(self, topic, payload):
|
||||||
|
Pins.pinsWrite('MSG', True)
|
||||||
# print("MqttProcessor.onMessage")
|
# print("MqttProcessor.onMessage")
|
||||||
r = self.topicRegisterMap[topic]
|
r = self.topicRegisterMap[topic]
|
||||||
self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r))
|
self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r))
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import serial
|
import serial
|
||||||
import wiringpi
|
# import wiringpi
|
||||||
|
import Pins
|
||||||
import array
|
import array
|
||||||
import fcntl
|
import fcntl
|
||||||
import termios
|
import termios
|
||||||
@ -9,16 +10,18 @@ DE_PIN = 0
|
|||||||
class MyRS485(serial.Serial):
|
class MyRS485(serial.Serial):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
wiringpi.wiringPiSetup()
|
# wiringpi.wiringPiSetup()
|
||||||
wiringpi.pinMode(DE_PIN, wiringpi.OUTPUT)
|
# wiringpi.pinMode(DE_PIN, wiringpi.OUTPUT)
|
||||||
self.buf = array.array('h', [0])
|
self.buf = array.array('h', [0])
|
||||||
|
|
||||||
def write(self, b):
|
def write(self, b):
|
||||||
wiringpi.digitalWrite(DE_PIN, wiringpi.HIGH)
|
# wiringpi.digitalWrite(DE_PIN, wiringpi.HIGH)
|
||||||
|
Pins.pinsWrite('DE', True)
|
||||||
super().write(b)
|
super().write(b)
|
||||||
while True:
|
while True:
|
||||||
fcntl.ioctl(self.fileno(), termios.TIOCSERGETLSR, self.buf, 1)
|
fcntl.ioctl(self.fileno(), termios.TIOCSERGETLSR, self.buf, 1)
|
||||||
if self.buf[0] & termios.TIOCSER_TEMT:
|
if self.buf[0] & termios.TIOCSER_TEMT:
|
||||||
break
|
break
|
||||||
wiringpi.digitalWrite(DE_PIN, wiringpi.LOW)
|
# wiringpi.digitalWrite(DE_PIN, wiringpi.LOW)
|
||||||
|
Pins.pinsWrite('DE', False
|
||||||
|
|
||||||
|
24
src/Pins.py
Normal file
24
src/Pins.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import wiringpi
|
||||||
|
|
||||||
|
|
||||||
|
PINS = {
|
||||||
|
'DE': 0,
|
||||||
|
'ERROR': 29,
|
||||||
|
'MSG': 28
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def pinsInit():
|
||||||
|
wiringpi.wiringPiSetup()
|
||||||
|
for pin in PINS.values():
|
||||||
|
wiringpi.pinMode(pin, wiringpi.OUTPUT)
|
||||||
|
|
||||||
|
|
||||||
|
def pinsWrite(pinName, v):
|
||||||
|
if v:
|
||||||
|
pinState = wiringpi.HIGH
|
||||||
|
else:
|
||||||
|
pinState = wiringpi.LOW
|
||||||
|
wiringpi.digitalWrite(PINS[pinName], pinState)
|
||||||
|
|
@ -10,8 +10,8 @@ import datetime
|
|||||||
import RegisterDatapoint
|
import RegisterDatapoint
|
||||||
import pickle
|
import pickle
|
||||||
import logging
|
import logging
|
||||||
|
import Pins
|
||||||
|
import Heartbeat
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
@ -29,6 +29,8 @@ if __name__ == "__main__":
|
|||||||
logger.addHandler(fh)
|
logger.addHandler(fh)
|
||||||
logger.addHandler(ch)
|
logger.addHandler(ch)
|
||||||
|
|
||||||
|
Pins.pinsInit()
|
||||||
|
|
||||||
queue = MyPriorityQueue.MyPriorityQueue()
|
queue = MyPriorityQueue.MyPriorityQueue()
|
||||||
pubQueue = Queue()
|
pubQueue = Queue()
|
||||||
nf = NotificationForwarder.NotificationForwarder()
|
nf = NotificationForwarder.NotificationForwarder()
|
||||||
@ -55,3 +57,8 @@ if __name__ == "__main__":
|
|||||||
cs = CmdServer.CmdServer(config, nf, datapoints)
|
cs = CmdServer.CmdServer(config, nf, datapoints)
|
||||||
cs.start()
|
cs.start()
|
||||||
logger.debug('CmdServer started')
|
logger.debug('CmdServer started')
|
||||||
|
|
||||||
|
hb = Heartbeat.HeartBeat(config, pubQueue)
|
||||||
|
hb.start()
|
||||||
|
logger.debug('Heartbeat started')
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user