65 Commits

Author SHA1 Message Date
bcc6229bb6 decrease log level from DEBUG to INFO 2020-05-06 21:27:11 +02:00
5caf3f2ca0 drop read topic from coil command 2019-10-27 13:50:06 +01:00
4e422223bd code style 2019-09-14 22:10:09 +02:00
de7a3fb0da merged 2019-09-14 22:06:44 +02:00
9fcd39a749 drop publishTopic from coil 2019-09-14 22:02:09 +02:00
612c4ab2ac more ci 2019-09-10 16:58:59 +02:00
e78df8be40 more ci 2019-09-10 16:55:12 +02:00
489eb0dda8 more ci 2019-09-10 16:48:55 +02:00
78bf04f191 update ci script 2019-09-10 16:40:59 +02:00
2930d39345 fix style issue 2019-09-10 16:35:51 +02:00
0f007b058d fix style issues 2019-09-10 16:34:45 +02:00
94e60ee172 fix style issues 2019-09-10 16:33:18 +02:00
77d01ca675 fix style issues 2019-09-10 16:18:35 +02:00
c1bf7fd13a some fixes 2019-09-10 16:03:54 +02:00
61de3f3a5b fix in ci script 2019-09-10 16:01:12 +02:00
90de6537de add ci script 2019-09-10 15:59:13 +02:00
832402fea6 service file 2019-09-10 15:54:09 +02:00
880794f966 converter for twos-complement fixed decimal numbers 2019-09-10 15:31:36 +02:00
64c26103df additional datapoints 2019-09-10 15:21:11 +02:00
f8f7d4b57e adjust registers 2019-08-06 21:38:55 +01:00
1d078ba03d adjust registers 2019-08-03 11:15:44 +01:00
020c2ea536 registers adjusted to real world 2019-08-02 19:08:04 +01:00
b6077b0ec8 a couple of fixes and daemon=True disabled 2019-07-27 23:56:19 +01:00
cab0aec533 pins and heartbeat 2019-07-28 00:40:09 +02:00
35c282b6ea changed register set 2019-07-27 23:18:20 +01:00
4b860ee43a fix reset in in cmd 2019-07-19 13:22:20 +02:00
d3eae2cd8d code beautifying 2019-07-18 16:46:33 +02:00
768ded37a6 better handle unknown converter 2019-07-18 16:09:15 +02:00
2cadccf59c fix typo 2019-07-18 15:05:04 +01:00
c779f07cda improve help 2019-07-18 16:00:27 +02:00
84f4ecf5a1 fix in handling exceptions in cmds 2019-07-18 14:02:50 +01:00
d8ce7c8bc2 Merge branch 'master' of gitlab.com:wolutator/modbusmaster 2019-07-18 13:57:20 +01:00
080b22646d read/write counter 2019-07-18 13:56:55 +01:00
2a704738b2 catch exception in cmds 2019-07-18 14:55:27 +02:00
5c8b2599a6 drop obsolete work 2019-07-18 14:46:15 +02:00
31cf5ceb5a readCount and writeCount 2019-07-17 17:22:14 +02:00
a0efe1129b registers changed 2019-07-17 16:15:21 +01:00
7a8a3c661d cmd server adjusted for new register types 2019-07-17 17:12:22 +02:00
122fce519c coil stuff 2019-07-17 16:01:28 +01:00
4200aaf304 coil 2019-07-17 16:47:14 +02:00
0f037b02ea handling of bit reads 2019-07-17 16:26:51 +02:00
ac47ff0ebe fix in converter and json handling 2019-07-17 15:06:28 +01:00
264d1cab14 enable pub and sub at the same holding register 2019-07-17 15:36:45 +02:00
ab91feafd0 fix uint32 converter 2019-07-17 14:17:51 +01:00
2460f570d8 another converter 2019-07-17 14:49:01 +02:00
f6d4218e57 fix in bitwise output 2019-07-17 11:22:50 +01:00
bf3475a796 improve bit output 2019-07-17 12:18:48 +02:00
0bae0f4bb2 fixes in converting code 2019-07-17 10:59:47 +01:00
b9e0fefe17 introduce converter stuff 2019-07-17 11:51:15 +02:00
34ca87f734 write stuff 2019-07-16 17:11:53 +01:00
de01ec20e2 start with writing 2019-07-16 17:18:40 +02:00
6821364273 code beautifying 2019-07-16 14:27:12 +02:00
e297149772 change configuration 2019-07-16 13:21:26 +01:00
6243cad505 remove pickle imports 2019-07-16 14:19:13 +02:00
8fa69d1610 drop pickle file 2019-07-16 14:17:41 +02:00
2fc6fe0830 fix in CmdServer and change file name in config to .json 2019-07-16 13:16:35 +01:00
d97b7469fe jsonifying register file done so far 2019-07-16 14:11:04 +02:00
2665fdd5e3 jsonify test complete, approach seems to work 2019-07-16 12:56:38 +02:00
3e1b9acb86 jsonify test 2019-07-15 21:36:23 +02:00
21555736f6 fixes related to not set subscribeTopic and logger in objects 2019-07-15 15:57:39 +01:00
ab31aea3e0 do not make logger a member of register classes 2019-07-15 15:40:46 +01:00
90cdde8bfe more logging 2019-07-15 15:35:31 +01:00
b7193d8d58 more logging 2019-07-15 15:33:07 +01:00
8c87460632 changes in registers file 2019-07-15 15:28:16 +01:00
454632ec36 debug 2019-07-15 15:11:38 +01:00
28 changed files with 774 additions and 479 deletions

29
.gitlab-ci.yml Normal file
View File

@ -0,0 +1,29 @@
stages:
- check
- deploy
build:
stage: check
image: registry.gitlab.com/wolutator/base-build-env:latest
tags:
- hottis
- linux
- docker
script:
- for I in src/*.py; do python -m py_compile $I; done
- for I in src/*.py; do python -m pycodestyle --max-line-length=120 $I; done
deploy:
stage: deploy
tags:
- hottis
- linux
- rpi
- modbus
only:
- deploy
script:
- sudo service modbusMaster stop
- cp src/*.py /opt/services/modbusMaster
- sudo service modbusMaster start

2
ENV
View File

@ -1,2 +0,0 @@
export LD_LIBRARY_PATH=/home/pi/modbusmaster/pyserialext/
export PYTHONPATH=/home/pi/modbusmaster/pyserialext/

View File

@ -1,14 +0,0 @@
CFLAGS=
LIBS=-lwiringPi
writec.so: writec.o
$(LD) -shared $(LIBS) -o $@ $^
writec.o: writec.c
.c.o:
$(CC) $(CFLAGS) -c $<
.PHONY: clean
clean:
-rm -f *.so *.o

View File

@ -1,16 +0,0 @@
import serial.rs485
import serial.serialutil
import ctypes
class RS485Ext(serial.rs485.RS485):
def __init__(self, *args, **kwargs):
super(RS485Ext, self).__init__(*args, **kwargs)
self.writec = ctypes.cdll.LoadLibrary('writec.so')
r = self.writec.init()
def write(self, b):
d = serial.serialutil.to_bytes(b)
r = self.writec.writec(self.fileno(), d, len(d))
return r

View File

@ -1,26 +0,0 @@
#include <unistd.h>
#include <sys/ioctl.h>
#include <stdint.h>
#include <wiringPi.h>
const uint8_t DE_PIN = 0;
int init() {
wiringPiSetup();
pinMode(DE_PIN, OUTPUT);
digitalWrite(DE_PIN, LOW);
}
ssize_t writec(int fd, char *buf, size_t count) {
digitalWrite(DE_PIN, HIGH);
ssize_t r = write(fd, buf, count);
uint8_t lsr;
do {
int r = ioctl(fd, TIOCSERGETLSR, &lsr);
} while (!(lsr & TIOCSER_TEMT));
digitalWrite(DE_PIN, LOW);
return r;
}

Submodule rpirtscts deleted from 612b065e38

View File

@ -1,212 +0,0 @@
-- Configuration and Provisioning Schema
DROP TABLE tReadDatapoint;
CREATE TABLE tReadDatapoint (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
unit INTEGER NOT NULL,
address INTEGER NOT NULL,
count INTEGER NOT NULL,
converter VARCHAR(10) NOT NULL,
label VARCHAR(128) NOT NULL,
scanRate TIME(3) DEFAULT '00:00:01.000',
topic VARCHAR(256) NOT NULL,
lastContact TIMESTAMP(3) NOT NULL DEFAULT '2000-01-01 00:00:01.000',
lastError VARCHAR(512),
lastValue VARCHAR(512),
backoff TIME(3) DEFAULT '00:00:00.000',
available BOOLEAN DEFAULT TRUE,
retries INTEGER NOT NULL DEFAULT 0,
giveUpCount INTEGER NOT NULL DEFAULT 0,
active BOOLEAN NOT NULL DEFAULT TRUE,
CONSTRAINT uniqueReadDatapoint UNIQUE (unit, address, count, label)
);
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(4, 0x2000, 2, 'F', '(ERR) Unavailable device', 'IoT/ModbusMaster1/UnavailableDevice', '00:00:01.000');
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(1, 0x2000, 4, 'F', '(ERR) Wrong register size', 'IoT/ModbusMaster1/WrongRegisterSize', '00:05:00.000');
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(1, 0x2000, 2, 'F', 'Voltage', 'IoT/ModbusMaster1/Voltage', '00:05:00.000');
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(1, 0x2020, 2, 'F', 'Frequency', 'IoT/ModbusMaster1/Frequency', '00:05:00.000');
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(1, 0x2060, 2, 'F', 'Current', 'IoT/ModbusMaster1/Current', '00:05:00.000');
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(3, 0x0004, 2, 'RF', 'Resistance Channel 1', 'IoT/ModbusMaster1/Channel1/Resistance', '00:00:01.000');
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(3, 0x000C, 2, 'RF', 'Temperature Channel 1', 'IoT/ModbusMaster1/Channel1/Temperature', '00:00:01.000');
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(3, 0x0014, 2, 'RF', 'Resistance Channel 2', 'IoT/ModbusMaster1/Channel2/Resistance', '00:00:01.000');
INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate)
VALUES(3, 0x001C, 2, 'RF', 'Temperature Channel 2', 'IoT/ModbusMaster1/Channel2/Temperature', '00:00:01.000');
DROP TABLE tWriteDatapoint;
CREATE TABLE tWriteDatapoint (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
unit INTEGER NOT NULL,
address INTEGER NOT NULL,
count INTEGER NOT NULL,
converter VARCHAR(10) NOT NULL,
label VARCHAR(128) NOT NULL,
topic VARCHAR(256) NOT NULL,
lastContact TIMESTAMP(3) NOT NULL DEFAULT '2000-01-01 00:00:01.000',
lastError VARCHAR(512),
value VARCHAR(512),
retries INTEGER NOT NULL DEFAULT 0,
active BOOLEAN NOT NULL DEFAULT TRUE,
CONSTRAINT uniqueWriteDatapoint UNIQUE (unit, address, count, label)
);
INSERT INTO tWriteDatapoint (unit, address, count, converter, label, topic, active)
VALUES(5, 0x0000, 1, 'B', 'Relay 1', 'IoT/ModbusMaster1/Relay1', FALSE);
CREATE OR REPLACE VIEW vReadDatapointsToBeHandled AS
SELECT id, unit, address, count, converter
FROM tReadDatapoint
WHERE available AND
active AND
ADDTIME(lastContact, ADDTIME(scanRate, backoff)) < NOW(3)
ORDER BY scanRate;
CREATE OR REPLACE VIEW vWriteDatapintsToBeHandled AS
SELECT id, unit, address, count, converter, value
FROM tWriteDatapoint
WHERE active;
DROP TABLE tReadNotification;
CREATE TABLE tReadNotification (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
readDatapointId INTEGER NOT NULL REFERENCES tReadDatapoint(id),
notificationType VARCHAR(1),
CONSTRAINT checkNotificationType CHECK (notificationtype IN ('V', 'F', 'R')) -- value, failure, return
);
DROP TABLE tWrittenNotification;
CREATE TABLE tWrittenNotification (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
writeDatapointId INTEGER NOT NULL REFERENCES tWriteDatapoint(id),
notificationType VARCHAR(1),
CONSTRAINT checkNotificationType CHECK (notificationtype IN ('S', 'F')) -- success, failure
);
DELIMITER $$
CREATE OR REPLACE PROCEDURE prWriteFeedback (IN p_id INTEGER, IN p_lastError VARCHAR(512))
MODIFIES SQL DATA
BEGIN
DECLARE v_retries INTEGER;
DECLARE v_active BOOLEAN;
IF p_lastError = '' OR p_lastError IS NULL THEN
UPDATE tWriteDatapoint
SET lastError = NULL,
lastContact = NOW(3),
retries = 0,
active = FALSE
WHERE id = p_id;
INSERT INTO tWrittenNotification (writeDatapointId, notificationType) VALUES (p_id, 'S');
ELSE
SELECT retries
INTO v_retries
FROM tWriteDatapoint
WHERE id = p_id;
SET v_retries := v_retries + 1;
IF v_retries >= 5 THEN
SET v_retries := 0;
SET v_active := FALSE;
ELSE
SET v_active := TRUE;
END IF;
UPDATE tWriteDatapoint
SET lastError = p_lastError,
retries = v_retries,
active = v_active
WHERE id = p_id;
IF NOT v_active THEN
INSERT INTO tWrittenNotification (writeDatapointId, notificationType) VALUES(p_id, 'F');
END IF;
END IF;
END; $$
DELIMITER ;
DELIMITER $$
CREATE OR REPLACE PROCEDURE prReadFeedback (IN p_id INTEGER, IN p_lastValue VARCHAR(512), IN p_lastError VARCHAR(512))
MODIFIES SQL DATA
BEGIN
DECLARE v_retries INTEGER;
DECLARE v_backoff TIME(3);
DECLARE v_scanRate TIME(3);
DECLARE v_giveUpCount INTEGER;
DECLARE v_available BOOLEAN;
IF p_lastError = '' OR p_lastError IS NULL THEN
UPDATE tReadDatapoint
SET lastError = NULL,
lastContact = NOW(3),
lastValue = p_lastValue,
retries = 0,
backoff = '00:00:00.000',
giveUpCount = 0
WHERE id = p_id;
INSERT INTO tReadNotification (readDatapointId, notificationType) VALUES(p_id, 'V');
ELSE
SELECT retries, backoff, scanRate, giveUpCount
INTO v_retries, v_backoff, v_scanRate, v_giveUpCount
FROM tReadDatapoint
WHERE id = p_id;
SET v_retries := v_retries + 1;
IF v_retries >= 5 THEN
IF v_backoff = '00:00:00.000' THEN
SET v_backoff = v_scanRate;
ELSE
SET v_backoff = ADDTIME(v_backoff, v_backoff);
END IF;
SET v_retries := 0;
SET v_giveUpCount := v_giveUpCount + 1;
SET v_available := TRUE;
END IF;
IF v_giveUpCount = 10 THEN
SET v_available := FALSE;
SET v_giveUpCount := 0;
SET v_backoff := '00:00:00.000';
END IF;
UPDATE tReadDatapoint
SET lastError = p_lastError,
retries = v_retries,
backoff = v_backoff,
giveUpCount = v_giveUpCount,
available = v_available
WHERE id = p_id;
IF NOT v_available THEN
INSERT INTO tReadNotification (readDatapointId, notificationType) VALUES(p_id, 'F');
END IF;
END IF;
END; $$
DELIMITER ;

105
snippets/test8.py Normal file
View File

@ -0,0 +1,105 @@
import unittest
import json
class A(object):
def __init__(self, arg1=None, arg2=None):
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = self.arg1 + self.arg2
def __str__(self):
return "A: {0!s} {1!s} {2!s}".format(self.arg1, self.arg2, self.arg3)
def jsonify(self):
return {'type':self.__class__.__name__, 'args': {'arg1':self.arg1, 'arg2':self.arg2}}
class MyEncoder(json.JSONEncoder):
def default(self, o):
try:
return o.jsonify()
except TypeError or AttributeError:
return super().default(o)
def objectFactory(j):
klass = eval(j['type'])
o = klass(**j['args'])
return o
def MyDecoder(j):
if type(j) == dict and 'type' in j:
return objectFactory(j)
else:
return j
class Tests(unittest.TestCase):
def test_a1(self):
a1 = A(1, 2)
self.assertEqual(a1.arg1, 1)
self.assertEqual(a1.arg2, 2)
self.assertEqual(a1.arg3, 3)
def test_a2(self):
a2 = A(**{'arg1':2, 'arg2':4})
self.assertEqual(a2.arg1, 2)
self.assertEqual(a2.arg2, 4)
self.assertEqual(a2.arg3, 6)
def test_a3(self):
j = '{ "type": "A", "args": { "arg1": 3, "arg2": 5 } }'
jj = json.loads(j)
klass = eval(jj['type'])
self.assertEqual(A, klass)
a3 = klass(**jj['args'])
self.assertEqual(a3.arg1, 3)
self.assertEqual(a3.arg2, 5)
self.assertEqual(a3.arg3, 8)
def test_a4(self):
j = '{ "type": "A", "args": { "arg1": 3, "arg2": 5 } }'
jj = json.loads(j)
klass = eval(jj['type'])
self.assertEqual(A, klass)
a3 = klass(**jj['args'])
self.assertEqual(a3.arg1, 3)
self.assertEqual(a3.arg2, 5)
self.assertEqual(a3.arg3, 8)
jjjj = json.dumps(a3, cls=MyEncoder)
jjj = json.loads(jjjj)
klass = eval(jjj['type'])
self.assertEqual(A, klass)
a3 = klass(**jjj['args'])
self.assertEqual(a3.arg1, 3)
self.assertEqual(a3.arg2, 5)
self.assertEqual(a3.arg3, 8)
def test_a5(self):
jList = []
jList.append(objectFactory({'type':'A', 'args': {'arg1':1, 'arg2':2}}))
jList.append(objectFactory({'type':'A', 'args': {'arg1':2, 'arg2':3}}))
jList.append(objectFactory({'type':'A', 'args': {'arg1':3, 'arg2':4}}))
js = json.dumps(jList, cls=MyEncoder, sort_keys=True, indent=4)
print(js)
jResultList = json.loads(js, object_hook=MyDecoder)
self.assertEqual(jResultList[0].arg1, 1)
self.assertEqual(jResultList[0].arg2, 2)
self.assertEqual(jResultList[0].arg3, 3)
self.assertEqual(jResultList[1].arg1, 2)
self.assertEqual(jResultList[1].arg2, 3)
self.assertEqual(jResultList[1].arg3, 5)
self.assertEqual(jResultList[2].arg1, 3)
self.assertEqual(jResultList[2].arg2, 4)
self.assertEqual(jResultList[2].arg3, 7)
if __name__ == '__main__':
unittest.main()

14
snippets/test9.py Normal file
View File

@ -0,0 +1,14 @@
class A(object):
def __init__(self):
self.a = 1
def x(self):
return self.a
class B(A):
def __init__(self):
self.a = 2
def x(self):
return self.a
def y(self):
return super().x()

View File

@ -4,11 +4,14 @@ import cmd
import re import re
import io import io
import datetime import datetime
import pickle
import sys
import RegisterDatapoint import RegisterDatapoint
import logging
import Converters
class CmdInterpreterException(ValueError):
pass
class CmdInterpreterException(ValueError): pass
def parseIntArbitraryBase(s): def parseIntArbitraryBase(s):
i = 0 i = 0
@ -20,6 +23,7 @@ def parseIntArbitraryBase(s):
i = int(s, 10) i = int(s, 10)
return i return i
class CmdInterpreter(cmd.Cmd): class CmdInterpreter(cmd.Cmd):
def __init__(self, infile, outfile, config, notifier, registers): def __init__(self, infile, outfile, config, notifier, registers):
super().__init__(stdin=infile, stdout=outfile) super().__init__(stdin=infile, stdout=outfile)
@ -29,7 +33,16 @@ class CmdInterpreter(cmd.Cmd):
self.registers = registers self.registers = registers
self.prompt = "test8> " self.prompt = "test8> "
self.intro = "test8 admin interface" self.intro = "test8 admin interface"
self.splitterRe = re.compile('\s+') self.splitterRe = re.compile(r'\s+')
self.logger = logging.getLogger('CmdInterpreter')
def onecmd(self, line):
try:
return super().onecmd(line)
except Exception as e:
msg = 'Caught exception in cmd "{0}": {1!s}'.format(line, e)
self.__println(msg)
self.logger.error(msg)
def __print(self, text): def __print(self, text):
self.stdout.write(text) self.stdout.write(text)
@ -38,20 +51,14 @@ class CmdInterpreter(cmd.Cmd):
self.stdout.write(text) self.stdout.write(text)
self.stdout.write("\n\r") self.stdout.write("\n\r")
def do_notify(self, arg): def __listConverterNames(self):
self.notifier.notify() return [name for name in Converters.Converters]
def help_notify(self):
self.__println("Notifies threads using the list of datapoints about changes in this list.")
self.__println("Call after modifications on the list.")
def do_quit(self, arg):
self.__println("Bye!")
return True
def do_add_hr(self, arg): def do_add_hr(self, arg):
try: try:
(label, unit, address, count, scanrate, readTopic, writeTopic, feedbackTopic) = self.splitterRe.split(arg) (label, unit, address, count,
scanrate, readTopic, writeTopic,
feedbackTopic, converter) = self.splitterRe.split(arg)
self.__println("Label: {0}".format(label)) self.__println("Label: {0}".format(label))
self.__println("Unit: {0}".format(unit)) self.__println("Unit: {0}".format(unit))
self.__println("Address: {0}".format(address)) self.__println("Address: {0}".format(address))
@ -60,6 +67,7 @@ class CmdInterpreter(cmd.Cmd):
self.__println("ReadTopic: {0}".format(readTopic)) self.__println("ReadTopic: {0}".format(readTopic))
self.__println("WriteTopic: {0}".format(writeTopic)) self.__println("WriteTopic: {0}".format(writeTopic))
self.__println("FeedbackTopic: {0}".format(feedbackTopic)) self.__println("FeedbackTopic: {0}".format(feedbackTopic))
self.__println("Converter: {0}".format(converter))
if readTopic == 'None': if readTopic == 'None':
readTopic = None readTopic = None
@ -67,33 +75,30 @@ class CmdInterpreter(cmd.Cmd):
writeTopic = None writeTopic = None
if feedbackTopic == 'None': if feedbackTopic == 'None':
feedbackTopic = None feedbackTopic = None
if converter == 'None':
converter = None
unit = parseIntArbitraryBase(unit) unit = parseIntArbitraryBase(unit)
address = parseIntArbitraryBase(address) address = parseIntArbitraryBase(address)
count = parseIntArbitraryBase(count) count = parseIntArbitraryBase(count)
scanrate = float(scanrate) scanrate = float(scanrate)
if scanrate == 0: r = RegisterDatapoint.HoldingRegisterDatapoint(label=label,
if readTopic: unit=unit,
raise CmdInterpreterException('readTopic must not be set when scanRate is zero') address=address,
if not writeTopic: count=count,
raise CmdInterpreterException('writeTopic must be set when scanRate is zero') scanRate=datetime.timedelta(seconds=scanrate),
if not feedbackTopic: publishTopic=readTopic,
raise CmdInterpreterException('feedbackTopic must be set when scanRate is zero') subscribe=writeTopic,
else: feedbackTopic=feedbackTopic,
if not readTopic: converter=converter)
raise CmdInterpreterException('readTopic must be set when scanRate is zero')
if writeTopic:
raise CmdInterpreterException('writeTopic must not be set when scanRate is zero')
if feedbackTopic:
raise CmdInterpreterException('feedbackTopic must not be set when scanRate is zero')
r = RegisterDatapoint.HoldingRegisterDatapoint(label, unit, address, count, datetime.timedelta(seconds=scanrate), readTopic, writeTopic, feedbackTopic)
self.registers.append(r) self.registers.append(r)
except ValueError as e: except ValueError as e:
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e)) self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
def help_add_hr(self): def help_add_hr(self):
# HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), # HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None),
self.__println("Usage: add <Label> <Unit> <Address> <Count> <ScanRate>") self.__println("Usage: add_hr <Label> <Unit> <Address> <Count> <ScanRate>")
self.__println(" <ReadTopic> <WriteTopic> <FeedbackTopic>") self.__println(" <ReadTopic> <WriteTopic> <FeedbackTopic>")
self.__println(" <Converter>")
self.__println("Adds a holding register") self.__println("Adds a holding register")
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!") self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
self.__println("---------------------------------------------------------------------") self.__println("---------------------------------------------------------------------")
@ -107,15 +112,53 @@ class CmdInterpreter(cmd.Cmd):
self.__println("<WriteTopic> Topic to be subscribe to receive data to be") self.__println("<WriteTopic> Topic to be subscribe to receive data to be")
self.__println(" written") self.__println(" written")
self.__println("<FeedbackTopic> Topic to publish feedback after a write process,") self.__println("<FeedbackTopic> Topic to publish feedback after a write process,")
self.__println("") self.__println("<Converter> Converter for data, one of {0}"
self.__println("For read items the <ScanRate> must be non-zero, a <ReadTopic> must be set and") .format(', '.join(self.__listConverterNames())))
self.__println("<WriteTopic> and <FeedbackTopic> must be <None>.")
self.__println("For write items the <ScanRate> must be zero, <ReadTopic> must be <None> and ") def do_add_coil(self, arg):
self.__println("<WriteTopic> and <FeedbackTopic> must be set.") try:
(label, unit, address, scanrate, writeTopic, feedbackTopic) = self.splitterRe.split(arg)
self.__println("Label: {0}".format(label))
self.__println("Unit: {0}".format(unit))
self.__println("Address: {0}".format(address))
self.__println("WriteTopic: {0}".format(writeTopic))
self.__println("FeedbackTopic: {0}".format(feedbackTopic))
if writeTopic == 'None':
writeTopic = None
if feedbackTopic == 'None':
feedbackTopic = None
unit = parseIntArbitraryBase(unit)
address = parseIntArbitraryBase(address)
scanrate = 0.0
r = RegisterDatapoint.CoilDatapoint(label=label,
unit=unit,
address=address,
scanRate=datetime.timedelta(seconds=scanrate),
subscribeTopic=writeTopic,
feedbackTopic=feedbackTopic)
self.registers.append(r)
except ValueError as e:
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
def help_add_coil(self):
self.__println("Usage: add_coil <Label> <Unit> <Address> <ScanRate>")
self.__println(" <WriteTopic> <FeedbackTopic>")
self.__println("Adds a coil")
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
self.__println("---------------------------------------------------------------------")
self.__println("<Label> Descriptive label")
self.__println("<Unit> Modbus address of the device")
self.__println("<Address> Register address within the device")
self.__println("<ScanRate> Scanrate in seconds (float), for write datapoints")
self.__println(" set to zero (0)")
self.__println("<WriteTopic> Topic to be subscribe to receive data to be")
self.__println(" written")
self.__println("<FeedbackTopic> Topic to publish feedback after a write process,")
def do_add_ir(self, arg): def do_add_ir(self, arg):
try: try:
(label, unit, address, count, scanrate, updateOnly, readTopic) = self.splitterRe.split(arg) (label, unit, address, count, scanrate, updateOnly, readTopic, converter) = self.splitterRe.split(arg)
self.__println("Label: {0}".format(label)) self.__println("Label: {0}".format(label))
self.__println("Unit: {0}".format(unit)) self.__println("Unit: {0}".format(unit))
self.__println("Address: {0}".format(address)) self.__println("Address: {0}".format(address))
@ -123,9 +166,12 @@ class CmdInterpreter(cmd.Cmd):
self.__println("ScanRate: {0}".format(scanrate)) self.__println("ScanRate: {0}".format(scanrate))
self.__println("UpdateOnly: {0}".format(updateOnly)) self.__println("UpdateOnly: {0}".format(updateOnly))
self.__println("ReadTopic: {0}".format(readTopic)) self.__println("ReadTopic: {0}".format(readTopic))
self.__println("Converter: {0}".format(converter))
if readTopic == 'None': if readTopic == 'None':
readTopic = None readTopic = None
if converter == 'None':
converter = None
if updateOnly in ['true', 'True', 'yes', 'Yes']: if updateOnly in ['true', 'True', 'yes', 'Yes']:
updateOnly = True updateOnly = True
elif updateOnly in ['false', 'False', 'no', 'No']: elif updateOnly in ['false', 'False', 'no', 'No']:
@ -136,16 +182,20 @@ class CmdInterpreter(cmd.Cmd):
address = parseIntArbitraryBase(address) address = parseIntArbitraryBase(address)
count = parseIntArbitraryBase(count) count = parseIntArbitraryBase(count)
scanrate = float(scanrate) scanrate = float(scanrate)
if scanrate == 0.0: r = RegisterDatapoint.InputRegisterDatapoint(label=label,
raise CmdInterpreterException('scanRate must not be zero') unit=unit,
r = RegisterDatapoint.InputRegisterDatapoint(label, unit, address, count, datetime.timedelta(seconds=scanrate), updateOnly, readTopic) address=address,
count=count, scanRate=datetime.timedelta(seconds=scanrate),
updateOnly=updateOnly,
publishTopic=readTopic,
converter=converter)
self.registers.append(r) self.registers.append(r)
except ValueError as e: except ValueError as e:
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e)) self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
def help_add_ir(self): def help_add_ir(self):
self.__println("Usage: add <Label> <Unit> <Address> <Count> <ScanRate>") self.__println("Usage: add_ir <Label> <Unit> <Address> <Count> <ScanRate>")
self.__println(" <UpdateOnly> <ReadTopic>") self.__println(" <UpdateOnly> <ReadTopic> <Converter>")
self.__println("Adds an input register") self.__println("Adds an input register")
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!") self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
self.__println("---------------------------------------------------------------------") self.__println("---------------------------------------------------------------------")
@ -156,10 +206,12 @@ class CmdInterpreter(cmd.Cmd):
self.__println("<ScanRate> Scanrate in seconds (float)") self.__println("<ScanRate> Scanrate in seconds (float)")
self.__println("<UpdateOnly> Publish only when value has changed") self.__println("<UpdateOnly> Publish only when value has changed")
self.__println("<ReadTopic> Topic to publish read data") self.__println("<ReadTopic> Topic to publish read data")
self.__println("<Converter> Converter for data, one of {0}"
.format(', '.join(self.__listConverterNames())))
def do_add_di(self, arg): def do_add_di(self, arg):
try: try:
(label, unit, address, count, scanrate, updateOnly, readTopic) = self.splitterRe.split(arg) (label, unit, address, count, scanrate, updateOnly, readTopic, bitCount) = self.splitterRe.split(arg)
self.__println("Label: {0}".format(label)) self.__println("Label: {0}".format(label))
self.__println("Unit: {0}".format(unit)) self.__println("Unit: {0}".format(unit))
self.__println("Address: {0}".format(address)) self.__println("Address: {0}".format(address))
@ -167,6 +219,7 @@ class CmdInterpreter(cmd.Cmd):
self.__println("ScanRate: {0}".format(scanrate)) self.__println("ScanRate: {0}".format(scanrate))
self.__println("UpdateOnly: {0}".format(updateOnly)) self.__println("UpdateOnly: {0}".format(updateOnly))
self.__println("ReadTopic: {0}".format(readTopic)) self.__println("ReadTopic: {0}".format(readTopic))
self.__println("BitCount: {0}".format(bitCount))
if readTopic == 'None': if readTopic == 'None':
readTopic = None readTopic = None
@ -180,16 +233,22 @@ class CmdInterpreter(cmd.Cmd):
address = parseIntArbitraryBase(address) address = parseIntArbitraryBase(address)
count = parseIntArbitraryBase(count) count = parseIntArbitraryBase(count)
scanrate = float(scanrate) scanrate = float(scanrate)
if scanrate == 0.0: bitCount = int(bitCount)
raise CmdInterpreterException('scanRate must not be zero') r = RegisterDatapoint.DiscreteInputDatapoint(label=label,
r = RegisterDatapoint.DiscreteInputDatapoint(label, unit, address, count, datetime.timedelta(seconds=scanrate), updateOnly, readTopic) unit=unit,
address=address,
count=count,
scanRate=datetime.timedelta(seconds=scanrate),
updateOnly=updateOnly,
publishTopic=readTopic,
bitCount=bitCount)
self.registers.append(r) self.registers.append(r)
except ValueError as e: except ValueError as e:
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e)) self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
def help_add_di(self): def help_add_di(self):
self.__println("Usage: add <Label> <Unit> <Address> <Count> <ScanRate>") self.__println("Usage: add_di <Label> <Unit> <Address> <Count> <ScanRate>")
self.__println(" <UpdateOnly> <ReadTopic>") self.__println(" <UpdateOnly> <ReadTopic> <bitCount>")
self.__println("Adds a discrete input") self.__println("Adds a discrete input")
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!") self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
self.__println("---------------------------------------------------------------------") self.__println("---------------------------------------------------------------------")
@ -200,11 +259,12 @@ class CmdInterpreter(cmd.Cmd):
self.__println("<ScanRate> Scanrate in seconds (float)") self.__println("<ScanRate> Scanrate in seconds (float)")
self.__println("<UpdateOnly> Publish only when value has changed") self.__println("<UpdateOnly> Publish only when value has changed")
self.__println("<ReadTopic> Topic to publish read data") self.__println("<ReadTopic> Topic to publish read data")
self.__println("<BitCount> Number of bit to be considered")
def do_list(self, arg): def do_list(self, arg):
for i, r in enumerate(self.registers): for i, r in enumerate(self.registers):
self.__println("#{0}: {1!s}".format(i, r)) self.__println("#{0}: {1!s}".format(i, r))
def help_list(self): def help_list(self):
self.__println("Usage: list") self.__println("Usage: list")
self.__println("-----------") self.__println("-----------")
@ -213,8 +273,9 @@ class CmdInterpreter(cmd.Cmd):
def do_reset(self, arg): def do_reset(self, arg):
for r in self.registers: for r in self.registers:
r.errorCount = 0 r.errorCount = 0
r.processCount = 0 r.writeCount = 0
r.readCount = 0
def help_reset(self): def help_reset(self):
self.__println("Usage: reset") self.__println("Usage: reset")
self.__println("-----------") self.__println("-----------")
@ -222,21 +283,21 @@ class CmdInterpreter(cmd.Cmd):
def do_stats(self, arg): def do_stats(self, arg):
for i, r in enumerate(self.registers): for i, r in enumerate(self.registers):
if r.processCount == 0: processCount = r.readCount + r.writeCount
if processCount == 0:
ratio = -1 ratio = -1
else: else:
ratio = float(r.errorCount) / float(r.processCount) ratio = float(r.errorCount) / float(processCount)
self.__println("#{0:2d}: {1:15s} ({2:2d}, {3:5d}), pc: {4:7d}, ec: {5:7d}, q: {6:1.4f}" self.__println("#{0:2d}: {1:15s} ({2:2d}, {3:5d}), rc: {4:7d}, "
.format(i, r.label, r.unit, r.address, "wc: {5:7d}, pc: {6:7d}, ec: {7:7d}, q: {8:1.4f}"
r.processCount, r.errorCount, ratio)) .format(i, r.label, r.unit, r.address, r.readCount, r.writeCount,
processCount, r.errorCount, ratio))
def help_stats(self): def help_stats(self):
self.__println("Usage: stats") self.__println("Usage: stats")
self.__println("-----------") self.__println("-----------")
self.__println("List the statistics of configured datapoints") self.__println("List the statistics of configured datapoints")
def do_change(self, arg): def do_change(self, arg):
(idx, key, typ, value) = self.splitterRe.split(arg) (idx, key, typ, value) = self.splitterRe.split(arg)
try: try:
@ -263,10 +324,10 @@ class CmdInterpreter(cmd.Cmd):
value = None value = None
else: else:
raise CmdInterpreterException('unknown type specifier, must be I, F, B, S or T') raise CmdInterpreterException('unknown type specifier, must be I, F, B, S or T')
if key not in r.__dict__: if key not in r.__dict__:
raise CmdInterpreterException('selected datapoint does not support key') raise CmdInterpreterException('selected datapoint does not support key')
r.__dict__[key] = value r.__dict__[key] = value
except ValueError as e: except ValueError as e:
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e)) self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
@ -288,9 +349,6 @@ class CmdInterpreter(cmd.Cmd):
self.__println(" considered)") self.__println(" considered)")
self.__println("<value> New value") self.__println("<value> New value")
def do_del(self, arg): def do_del(self, arg):
try: try:
i = int(arg) i = int(arg)
@ -306,66 +364,75 @@ class CmdInterpreter(cmd.Cmd):
self.__println("Be aware: indexes have been changed, rerun list before removing the next item.") self.__println("Be aware: indexes have been changed, rerun list before removing the next item.")
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!") self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
def __notify(self):
self.notifier.notify()
def do_notify(self, arg):
self.__notify()
def help_notify(self):
self.__println("Notifies threads using the list of datapoints about changes in this list.")
self.__println("Call after modifications on the list.")
def do_quit(self, arg):
self.__println("Bye!")
return True
def __save(self):
RegisterDatapoint.saveRegisterList(self.registers, self.config.registerFile)
def do_save(self, arg): def do_save(self, arg):
with open(self.config.registerFile, 'wb') as f: self.__save()
pickle.dump(self.registers, f)
def help_save(self): def help_save(self):
self.__println("Usage: save") self.__println("Usage: save")
self.__println("Saves a modified register list into the register file.") self.__println("Saves a modified register list into the register file.")
def do_load(self, arg): def do_load(self, arg):
registers = None
with open(self.config.registerFile, 'rb') as f:
registers = pickle.load(f)
try: try:
RegisterDatapoint.checkRegisterList(registers) registers = RegisterDatapoint.loadRegisterList(self.config.registerFile)
self.registers = registers self.registers = registers
except ValueError as e: except Exception as e:
self.__println("Unable to load register list: {0!s}".format(e)) self.__println("Unable to load register list: {0!s}".format(e))
def help_load(self): def help_load(self):
self.__println("Usage: load") self.__println("Usage: load")
self.__println("Reload the register file, overwrite all unsaved changes.") self.__println("Reload the register file, overwrite all unsaved changes.")
#def do_shutdown(self, arg):
# sys.exit()
#def help_shutdown(self):
# self.__println("Usage: shutdown")
# self.__println("Shuts down the application")
class CmdHandle(socketserver.StreamRequestHandler): class CmdHandle(socketserver.StreamRequestHandler):
def handle(self): def handle(self):
cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile), self.server.userData.config, logger = logging.getLogger('CmdHandle')
cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile), self.server.userData.config,
self.server.userData.notifier, self.server.userData.registers) self.server.userData.notifier, self.server.userData.registers)
try: try:
cmd.cmdloop() cmd.cmdloop()
print("Cmd handle terminated") logger.info("Cmd handle terminated")
except ConnectionAbortedError as e: except ConnectionAbortedError as e:
print("Cmd handle externally interrupted") logger.info("Cmd handle externally interrupted")
class MyThreadingTCPServer(socketserver.ThreadingTCPServer): class MyThreadingTCPServer(socketserver.ThreadingTCPServer):
def __init__(self, host, handler, userData): def __init__(self, host, handler, userData):
super().__init__(host, handler) super().__init__(host, handler)
self.userData = userData self.userData = userData
class MyCmdUserData(object): class MyCmdUserData(object):
def __init__(self, config, notifier, registers): def __init__(self, config, notifier, registers):
self.config = config self.config = config
self.notifier = notifier self.notifier = notifier
self.registers = registers self.registers = registers
class CmdServer(threading.Thread): class CmdServer(threading.Thread):
def __init__(self, config, notifier, registers): def __init__(self, config, notifier, registers):
super().__init__() super().__init__()
self.config = config self.config = config
self.server = MyThreadingTCPServer((config.cmdAddress, config.cmdPort), CmdHandle, MyCmdUserData(config, notifier, registers)) self.server = MyThreadingTCPServer((config.cmdAddress, config.cmdPort),
self.daemon = True CmdHandle,
MyCmdUserData(config, notifier, registers))
# self.daemon = True
def start(self): def run(self):
self.server.serve_forever() self.server.serve_forever()

View File

@ -3,7 +3,8 @@ import datetime
# import RS485Ext # import RS485Ext
import RegisterDatapoint import RegisterDatapoint
from pymodbus.client.sync import ModbusSerialClient from pymodbus.client.sync import ModbusSerialClient
import wiringpi # import wiringpi
import Pins
import MyRS485 import MyRS485
import time import time
import logging import logging
@ -11,15 +12,16 @@ import logging
ERROR_PIN = 29 ERROR_PIN = 29
class CommunicationProcessor(threading.Thread): class CommunicationProcessor(threading.Thread):
def __init__(self, config, queue, pubQueue): def __init__(self, config, queue, pubQueue):
super().__init__() super().__init__()
self.config = config self.config = config
self.queue = queue self.queue = queue
self.pubQueue = pubQueue self.pubQueue = pubQueue
wiringpi.wiringPiSetup() # wiringpi.wiringPiSetup()
wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT) # wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT)
self.daemon = True # self.daemon = True
if self.config.modbusDebug: if self.config.modbusDebug:
logging.getLogger('pymodbus').setLevel(logging.DEBUG) logging.getLogger('pymodbus').setLevel(logging.DEBUG)
else: else:
@ -32,7 +34,6 @@ class CommunicationProcessor(threading.Thread):
return MyRS485.MyRS485(port=self.config.serialPort, baudrate=self.config.serialBaudRate, stopbits=1, return MyRS485.MyRS485(port=self.config.serialPort, baudrate=self.config.serialBaudRate, stopbits=1,
timeout=1) timeout=1)
def run(self): def run(self):
client = ModbusSerialClient(method='rtu') client = ModbusSerialClient(method='rtu')
client.socket = self.__getSerial() client.socket = self.__getSerial()
@ -41,19 +42,17 @@ class CommunicationProcessor(threading.Thread):
while True: while True:
r = self.queue.get() r = self.queue.get()
try: try:
wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW) # wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW)
Pins.pinsWrite('ERROR', False)
self.logger.debug("Dequeued: {0!s}".format(r)) self.logger.debug("Dequeued: {0!s}".format(r))
r.enqueued = False r.enqueued = False
r.process(client, self.pubQueue) r.process(client, self.pubQueue)
except RegisterDatapoint.DatapointException as e: except RegisterDatapoint.DatapointException as e:
wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH) # wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH)
Pins.pinsWrite('ERROR', True)
self.logger.error("ERROR when processing '{0}': {1!s}".format(r.label, e)) self.logger.error("ERROR when processing '{0}': {1!s}".format(r.label, e))
if client.socket is None: if client.socket is None:
self.logger.error("renew socket") self.logger.error("renew socket")
client.socket = self.__getSerial() client.socket = self.__getSerial()
finally: finally:
time.sleep(self.config.interCommDelay) time.sleep(self.config.interCommDelay)

View File

@ -9,7 +9,9 @@ class Config(object):
self.mqttPassword = '' self.mqttPassword = ''
self.cmdAddress = '127.0.0.1' self.cmdAddress = '127.0.0.1'
self.cmdPort = 9999 self.cmdPort = 9999
self.registerFile = 'registers.pkl' self.registerFile = 'registers.json'
self.serialPort = '/dev/ttyAMA0' self.serialPort = '/dev/ttyAMA0'
self.serialBaudRate = 9600 self.serialBaudRate = 9600
self.interCommDelay = 0.025 self.interCommDelay = 0.025
self.heartbeatTopic = 'IoT/Heartbeat/Modbus2'
self.heartbeatPeriod = 10.0

30
src/Converters.py Normal file
View File

@ -0,0 +1,30 @@
# in: from Modbus to MQTT, input is a list of 16bit integers, output shall be the desired format
# to be sent in the MQTT message
# out: from MQTT to Modbus, input is the format received from MQTT, output shall be a list of
# 16bit integers to be written to the Modbus slave
from struct import pack, unpack
def fix1twos(x):
x = x[0]
r = x
if x & 0x8000:
r = ((x - 1) ^ 0xffff) * -1
return r / 10
Converters = {
"dht20TOFloat": {
"in": lambda x: float(x[0]) / 10.0,
"out": None
},
"uint32": {
"in": lambda x: unpack('L', pack('HH', *x))[0],
"out": lambda x: unpack('HH', pack('L', int(x)))
},
"fix1twos": {
"in": lambda x: fix1twos(x),
"out": None
}
}

21
src/Heartbeat.py Normal file
View File

@ -0,0 +1,21 @@
import threading
import MqttProcessor
import logging
import time
class Heartbeat(threading.Thread):
def __init__(self, config, pubQueue):
super().__init__()
self.config = config
self.pubQueue = pubQueue
# self.daemon = True
self.logger = logging.getLogger('Heartbeat')
def run(self):
cnt = 0
while True:
cnt += 1
pubItem = MqttProcessor.PublishItem(self.config.heartbeatTopic, str(cnt))
self.pubQueue.put(pubItem)
time.sleep(self.config.heartbeatPeriod)

View File

@ -2,6 +2,7 @@ import threading
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from NotificationForwarder import AbstractNotificationReceiver from NotificationForwarder import AbstractNotificationReceiver
import logging import logging
import Pins
class PublishItem(object): class PublishItem(object):
@ -9,15 +10,22 @@ class PublishItem(object):
self.topic = topic self.topic = topic
self.payload = payload self.payload = payload
def __str__(self):
return 'Topic: {0}, Payload: {1}'.format(self.topic, self.payload)
def mqttOnConnectCallback(client, userdata, flags, rc): def mqttOnConnectCallback(client, userdata, flags, rc):
userdata.onConnect() userdata.onConnect()
def mqttOnMessageCallback(client, userdata, message): def mqttOnMessageCallback(client, userdata, message):
userdata.onMessage(message.topic, message.payload) userdata.onMessage(message.topic, message.payload)
def mqttOnDisconnectCallback(client, userdata, rc): def mqttOnDisconnectCallback(client, userdata, rc):
userdata.onDisconnect(rc) userdata.onDisconnect(rc)
class MqttProcessor(threading.Thread, AbstractNotificationReceiver): class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
def __init__(self, config, registers, queue, pubQueue): def __init__(self, config, registers, queue, pubQueue):
super().__init__() super().__init__()
@ -27,14 +35,15 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
self.pubQueue = pubQueue self.pubQueue = pubQueue
self.client = mqtt.Client(userdata=self) self.client = mqtt.Client(userdata=self)
self.subscriptions = [] self.subscriptions = []
self.topicRegisterMap ={} self.topicRegisterMap = {}
self.daemon = True # self.daemon = True
self.logger = logging.getLogger('MqttProcessor') self.logger = logging.getLogger('MqttProcessor')
def __processUpdatedRegisters(self, force=False): def __processUpdatedRegisters(self, force=False):
self.logger.debug("MqttProcessor.__updateSubscriptions") self.logger.debug("MqttProcessor.__updateSubscriptions")
subscribeTopics = [ r.subscribeTopic for r in self.registers if r.subscribeTopic] subscribeTopics = [r.subscribeTopic for r in self.registers if hasattr(r, 'subscribeTopic')
and r.subscribeTopic]
self.logger.debug("Topics: {0!s}".format(subscribeTopics)) self.logger.debug("Topics: {0!s}".format(subscribeTopics))
for subscribeTopic in subscribeTopics: for subscribeTopic in subscribeTopics:
@ -49,7 +58,8 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
self.client.unsubscribe(subscription) self.client.unsubscribe(subscription)
self.subscriptions.remove(subscription) self.subscriptions.remove(subscription)
self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if r.subscribeTopic } self.topicRegisterMap = {r.subscribeTopic: r for r in self.registers if hasattr(r, 'subscribeTopic')
and r.subscribeTopic}
def receiveNotification(self, arg): def receiveNotification(self, arg):
self.logger.info("MqttProcessor:registersChanged") self.logger.info("MqttProcessor:registersChanged")
@ -67,11 +77,12 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
while True: while True:
pubItem = self.pubQueue.get() pubItem = self.pubQueue.get()
if isinstance(pubItem, PublishItem): if isinstance(pubItem, PublishItem):
self.logger.debug('Publishing {0!s}'.format(pubItem))
self.client.publish(pubItem.topic, pubItem.payload) self.client.publish(pubItem.topic, pubItem.payload)
Pins.pinsWrite('MSG', False)
else: else:
self.logger.error("Invalid object in publish queue") self.logger.error("Invalid object in publish queue")
def onConnect(self): def onConnect(self):
# print("MqttProcessor.onConnect") # print("MqttProcessor.onConnect")
self.__processUpdatedRegisters(force=True) self.__processUpdatedRegisters(force=True)
@ -80,9 +91,9 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
self.logger.error("Disconnected from MQTT broker: {0}".format(rc)) self.logger.error("Disconnected from MQTT broker: {0}".format(rc))
def onMessage(self, topic, payload): def onMessage(self, topic, payload):
Pins.pinsWrite('MSG', True)
# print("MqttProcessor.onMessage") # print("MqttProcessor.onMessage")
r = self.topicRegisterMap[topic] r = self.topicRegisterMap[topic]
self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r)) self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r))
r.onMessage(payload) r.onMessage(payload)
self.queue.put(r) self.queue.put(r)

View File

@ -6,12 +6,18 @@ class MyPriorityQueueItem(object):
self.itemWithPriority = itemWithPriority self.itemWithPriority = itemWithPriority
def __lt__(self, other): return self.itemWithPriority.priority < other.itemWithPriority.priority def __lt__(self, other): return self.itemWithPriority.priority < other.itemWithPriority.priority
def __le__(self, other): return self.itemWithPriority.priority <= other.itemWithPriority.priority def __le__(self, other): return self.itemWithPriority.priority <= other.itemWithPriority.priority
def __eq__(self, other): return self.itemWithPriority.priority == other.itemWithPriority.priority def __eq__(self, other): return self.itemWithPriority.priority == other.itemWithPriority.priority
def __ne__(self, other): return self.itemWithPriority.priority != other.itemWithPriority.priority def __ne__(self, other): return self.itemWithPriority.priority != other.itemWithPriority.priority
def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority
def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority
class MyPriorityQueue(queue.PriorityQueue): class MyPriorityQueue(queue.PriorityQueue):
def _put(self, itemWithPriority): def _put(self, itemWithPriority):
i = MyPriorityQueueItem(itemWithPriority) i = MyPriorityQueueItem(itemWithPriority)

View File

@ -1,24 +1,27 @@
import serial import serial
import wiringpi # import wiringpi
import Pins
import array import array
import fcntl import fcntl
import termios import termios
DE_PIN = 0 DE_PIN = 0
class MyRS485(serial.Serial): class MyRS485(serial.Serial):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
wiringpi.wiringPiSetup() # wiringpi.wiringPiSetup()
wiringpi.pinMode(DE_PIN, wiringpi.OUTPUT) # wiringpi.pinMode(DE_PIN, wiringpi.OUTPUT)
self.buf = array.array('h', [0]) self.buf = array.array('h', [0])
def write(self, b): def write(self, b):
wiringpi.digitalWrite(DE_PIN, wiringpi.HIGH) # wiringpi.digitalWrite(DE_PIN, wiringpi.HIGH)
Pins.pinsWrite('DE', True)
super().write(b) super().write(b)
while True: while True:
fcntl.ioctl(self.fileno(), termios.TIOCSERGETLSR, self.buf, 1) fcntl.ioctl(self.fileno(), termios.TIOCSERGETLSR, self.buf, 1)
if self.buf[0] & termios.TIOCSER_TEMT: if self.buf[0] & termios.TIOCSER_TEMT:
break break
wiringpi.digitalWrite(DE_PIN, wiringpi.LOW) # wiringpi.digitalWrite(DE_PIN, wiringpi.LOW)
Pins.pinsWrite('DE', False)

View File

@ -3,6 +3,7 @@ class AbstractNotificationReceiver(object):
def receiveNotification(self, arg): def receiveNotification(self, arg):
raise NotImplementedError raise NotImplementedError
class NotificationForwarder(object): class NotificationForwarder(object):
def __init__(self): def __init__(self):
self.receivers = [] self.receivers = []

22
src/Pins.py Normal file
View File

@ -0,0 +1,22 @@
import wiringpi
PINS = {
'DE': 0,
'ERROR': 29,
'MSG': 28
}
def pinsInit():
wiringpi.wiringPiSetup()
for pin in PINS.values():
wiringpi.pinMode(pin, wiringpi.OUTPUT)
def pinsWrite(pinName, v):
if v:
pinState = wiringpi.HIGH
else:
pinState = wiringpi.LOW
wiringpi.digitalWrite(PINS[pinName], pinState)

View File

@ -3,23 +3,32 @@ from pymodbus.pdu import ExceptionResponse
from pymodbus.exceptions import ModbusIOException from pymodbus.exceptions import ModbusIOException
import MqttProcessor import MqttProcessor
import logging import logging
import pickle import json
import Converters
class DatapointException(Exception): pass class DatapointException(Exception):
pass
class AbstractModbusDatapoint(object): class AbstractModbusDatapoint(object):
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None): def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, converter=None):
self.argList = ['label', 'unit', 'address', 'count', 'scanRate', 'converter']
self.label = label self.label = label
self.unit = unit self.unit = unit
self.address = address self.address = address
self.count = count self.count = count
self.scanRate = scanRate self.converter = converter
if type(scanRate) == float:
self.scanRate = datetime.timedelta(seconds=scanRate)
else:
self.scanRate = scanRate
self.type = 'abstract data point' self.type = 'abstract data point'
self.enqueued = False self.enqueued = False
self.lastContact = None self.lastContact = None
self.errorCount = 0 self.errorCount = 0
self.processCount = 0 self.readCount = 0
self.writeCount = 0
if self.scanRate: if self.scanRate:
self.priority = 1 self.priority = 1
else: else:
@ -27,19 +36,26 @@ class AbstractModbusDatapoint(object):
def __str__(self): def __str__(self):
return ("{0}, {1}: unit: {2}, address: {3}, count: {4}, scanRate: {5}, " return ("{0}, {1}: unit: {2}, address: {3}, count: {4}, scanRate: {5}, "
"enqueued: {6}, lastContact: {7}, errorCount: {8}, processCount: {9}" "enqueued: {6}, lastContact: {7}, errorCount: {8}, readCount: {9}, "
"writeCount: {10}, converter: {11}"
.format(self.type, self.label, self.unit, self.address, self.count, .format(self.type, self.label, self.unit, self.address, self.count,
self.scanRate, self.enqueued, self.lastContact, self.scanRate, self.enqueued, self.lastContact,
self.errorCount, self.processCount)) self.errorCount, self.readCount, self.writeCount, self.converter))
def jsonify(self):
return {'type': self.__class__.__name__,
'args': {k: getattr(self, k) for k in self.argList}
}
def process(self, client): def process(self, client):
raise NotImplementedError raise NotImplementedError
class HoldingRegisterDatapoint(AbstractModbusDatapoint): class HoldingRegisterDatapoint(AbstractModbusDatapoint):
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None,
publishTopic=None, subscribeTopic=None, feedbackTopic=None): publishTopic=None, subscribeTopic=None, feedbackTopic=None, converter=None):
super().__init__(label, unit, address, count, scanRate) super().__init__(label, unit, address, count, scanRate, converter)
self.argList = self.argList + ['publishTopic', 'subscribeTopic', 'feedbackTopic']
self.publishTopic = publishTopic self.publishTopic = publishTopic
self.subscribeTopic = subscribeTopic self.subscribeTopic = subscribeTopic
self.feedbackTopic = feedbackTopic self.feedbackTopic = feedbackTopic
@ -53,32 +69,108 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint):
self.writeRequestValue)) self.writeRequestValue))
def process(self, client, pubQueue): def process(self, client, pubQueue):
self.logger = logging.getLogger('HoldingRegisterDatapoint') logger = logging.getLogger('HoldingRegisterDatapoint')
if self.writeRequestValue: if self.writeRequestValue:
# perform write operation # perform write operation
self.logger.debug("Holding register, perform write operation") logger.debug("Holding register, perform write operation")
self.writeCount += 1
values = None
logger.debug("{0}: raw: {1!s}".format(self.label, self.writeRequestValue))
try:
if self.converter and Converters.Converters[self.converter]['out']:
values = Converters.Converters[self.converter]['out'](self.writeRequestValue)
logger.debug("{0}: converted: {1!s}".format(self.label, values))
else:
values = [int(self.writeRequestValue)]
except Exception as e:
raise DatapointException("Exception caught when trying to converter modbus data: {0!s}".format(e))
result = client.write_registers(address=self.address,
unit=self.unit,
values=values)
logger.debug("Write result: {0!s}".format(result))
self.writeRequestValue = None self.writeRequestValue = None
else: else:
# perform read operation # perform read operation
self.logger.debug("Holding register, perform read operation") logger.debug("Holding register, perform read operation")
self.processCount += 1 self.readCount += 1
result = client.read_holding_registers(address=self.address, result = client.read_holding_registers(address=self.address,
count=self.count, count=self.count,
unit=self.unit) unit=self.unit)
if type(result) in [ExceptionResponse, ModbusIOException]: if type(result) in [ExceptionResponse, ModbusIOException]:
self.errorCount += 1 self.errorCount += 1
raise DatapointException(result) raise DatapointException(result)
self.logger.debug("{0}: {1!s}".format(self.label, result.registers)) logger.debug("{0}: {1!s}".format(self.label, result.registers))
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(result.registers))) value = None
try:
if self.converter and Converters.Converters[self.converter]['in']:
value = Converters.Converters[self.converter]['in'](result.registers)
logger.debug("{0}: converted: {1!s}".format(self.label, value))
else:
value = result.registers
except Exception as e:
raise DatapointException("Exception caught when trying to converter modbus data: {0!s}".format(e))
if self.publishTopic:
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(value)))
self.lastContact = datetime.datetime.now() self.lastContact = datetime.datetime.now()
def onMessage(self, value): def onMessage(self, value):
self.writeRequestValue = value self.writeRequestValue = value
class CoilDatapoint(AbstractModbusDatapoint):
def __init__(self, label=None, unit=None, address=None, scanRate=None, subscribeTopic=None,
feedbackTopic=None):
super().__init__(label, unit, address, 1, scanRate, None)
self.argList = ['label', 'unit', 'address', 'scanRate', 'subscribeTopic', 'feedbackTopic']
self.subscribeTopic = subscribeTopic
self.feedbackTopic = feedbackTopic
self.writeRequestValue = None
self.type = 'coil'
def __str__(self):
return ("{0}, {1}: unit: {2}, address: {3}, scanRate: {4}, "
"enqueued: {5}, lastContact: {6}, errorCount: {7}, readCount: {8}, "
"writeCount: {9}, subscribeTopic: {10}, feedbackTopic: {11}"
.format(self.type, self.label, self.unit, self.address,
self.scanRate, self.enqueued, self.lastContact,
self.errorCount, self.readCount, self.writeCount,
self.subscribeTopic, self.feedbackTopic))
def onMessage(self, value):
self.writeRequestValue = value.decode()
def process(self, client, pubQueue):
logger = logging.getLogger('CoilDatapoint')
if self.writeRequestValue:
# perform write operation
logger.debug("Coil, perform write operation")
self.writeCount += 1
logger.debug("{0}: raw: {1!s}".format(self.label, self.writeRequestValue))
value = None
if self.writeRequestValue in ['true', 'True', 'yes', 'Yes', 'On', 'on']:
value = True
elif self.writeRequestValue in ['false', 'False', 'no', 'No', 'Off', 'off']:
value = False
else:
self.writeRequestValue = None
raise DatapointException('illegal value {0!s} for coil write'.format(self.writeRequestValue))
result = client.write_coil(address=self.address,
unit=self.unit,
value=value)
logger.debug("Write result: {0!s}".format(result))
if self.feedbackTopic:
pubQueue.put(MqttProcessor.PublishItem(self.feedbackTopic, str(value)))
self.writeRequestValue = None
else:
# no write op, strange
logger.debug("Coil, process call but no write value available, strange")
class ReadOnlyDatapoint(AbstractModbusDatapoint): class ReadOnlyDatapoint(AbstractModbusDatapoint):
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, publishTopic=None): def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
super().__init__(label, unit, address, count, scanRate) publishTopic=None, converter=None):
super().__init__(label, unit, address, count, scanRate, converter)
self.argList = self.argList + ['updateOnly', 'publishTopic']
self.updateOnly = updateOnly self.updateOnly = updateOnly
self.lastValue = None self.lastValue = None
self.publishTopic = publishTopic self.publishTopic = publishTopic
@ -89,17 +181,17 @@ class ReadOnlyDatapoint(AbstractModbusDatapoint):
self.lastValue)) self.lastValue))
class InputRegisterDatapoint(ReadOnlyDatapoint): class InputRegisterDatapoint(ReadOnlyDatapoint):
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, publishTopic=None): def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic) publishTopic=None, converter=None):
super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic, converter)
self.type = 'input register' self.type = 'input register'
def process(self, client, pubQueue): def process(self, client, pubQueue):
self.logger = logging.getLogger('InputRegisterDatapoint') logger = logging.getLogger('InputRegisterDatapoint')
# perform read operation # perform read operation
self.logger.debug("Input register, perform read operation") logger.debug("Input register, perform read operation")
self.processCount += 1 self.readCount += 1
result = client.read_input_registers(address=self.address, result = client.read_input_registers(address=self.address,
count=self.count, count=self.count,
unit=self.unit) unit=self.unit)
@ -108,68 +200,86 @@ class InputRegisterDatapoint(ReadOnlyDatapoint):
raise DatapointException(result) raise DatapointException(result)
if not self.updateOnly or (result.registers != self.lastValue): if not self.updateOnly or (result.registers != self.lastValue):
self.lastValue = result.registers self.lastValue = result.registers
self.logger.debug("{0}: {1!s}".format(self.label, result.registers)) logger.debug("{0}: raw: {1!s}".format(self.label, result.registers))
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(result.registers))) value = None
try:
if self.converter and Converters.Converters[self.converter]['in']:
value = Converters.Converters[self.converter]['in'](result.registers)
logger.debug("{0}: converted: {1!s}".format(self.label, value))
else:
value = result.registers
except Exception as e:
raise DatapointException("Exception caught when trying to converter modbus data: {0!s}".format(e))
if self.publishTopic:
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(value)))
self.lastContact = datetime.datetime.now() self.lastContact = datetime.datetime.now()
class DiscreteInputDatapoint(ReadOnlyDatapoint): class DiscreteInputDatapoint(ReadOnlyDatapoint):
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, publishTopic=None): def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic) publishTopic=None, converter=None, bitCount=8):
super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic, converter)
self.argList = self.argList + ['bitCount']
self.type = 'discrete input' self.type = 'discrete input'
self.bitCount = bitCount
self.lastValues = [None] * self.bitCount
def __str__(self):
return ("[{0!s}, bitCount: {1}"
.format(super().__str__(), self.bitCount))
def process(self, client, pubQueue): def process(self, client, pubQueue):
self.logger = logging.getLogger('DiscreteInputDatapoint') logger = logging.getLogger('DiscreteInputDatapoint')
# perform read operation # perform read operation
self.logger.debug("Discrete input, perform read operation") logger.debug("Discrete input, perform read operation")
self.processCount += 1 self.readCount += 1
result = client.read_discrete_inputs(address=self.address, result = client.read_discrete_inputs(address=self.address,
count=self.count, count=self.count,
unit=self.unit) unit=self.unit)
if type(result) in [ExceptionResponse, ModbusIOException]: if type(result) in [ExceptionResponse, ModbusIOException]:
self.errorCount += 1 self.errorCount += 1
raise DatapointException(result) raise DatapointException(result)
if not self.updateOnly or (result.bits != self.lastValue): logger.debug("{0}: raw: {1!s}".format(self.label, result.bits))
self.lastValue = result.bits for i in range(self.bitCount):
self.logger.debug("{0}: {1!s}".format(self.label, result.bits)) if not self.updateOnly or (result.getBit(i) != self.lastValues[i]):
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(result.bits))) self.lastValues[i] = result.getBit(i)
logger.debug("{0}, {1}: changed: {2!s}".format(self.label, i, result.getBit(i)))
if self.publishTopic:
pubQueue.put(MqttProcessor.PublishItem("{0}/{1}"
.format(self.publishTopic, i), str(result.getBit(i))))
self.lastContact = datetime.datetime.now() self.lastContact = datetime.datetime.now()
class JsonifyEncoder(json.JSONEncoder):
def loadRegisterList(registerList): def default(self, o):
# Load, check and auto-update registers file res = None
try:
with open(registerList, 'rb') as f: res = o.jsonify()
datapoints = pickle.load(f) except (TypeError, AttributeError):
if type(o) == datetime.timedelta:
checkRegisterList(datapoints) res = o.total_seconds()
else:
newDatapoints = [] res = super().default(o)
for dp in datapoints: return res
ndp = type(dp)()
for k,v in dp.__dict__.items():
ndp.__dict__[k] = v
newDatapoints.append(ndp)
logging.getLogger('loadRegisterList').debug("Datapoint loaded: {0!s}".format(ndp))
checkRegisterList(newDatapoints, reset=True)
with open(registerList, 'wb') as f:
pickle.dump(newDatapoints, f)
return newDatapoints
def checkRegisterList(registers, reset=False): def datapointObjectHook(j):
for r in registers: if type(j) == dict and 'type' in j and 'args' in j:
if not isinstance(r, AbstractModbusDatapoint): klass = eval(j['type'])
raise ValueError('Entry in register list {0!s} is not derived from class AbstractModbusDatapoint'.format(r)) o = klass(**j['args'])
else: return o
if reset: else:
r.errorCount = 0 return j
r.processCount = 0
r.enqueued = False
def saveRegisterList(registerList, registerListFile):
js = json.dumps(registerList, cls=JsonifyEncoder, sort_keys=True, indent=4)
with open(registerListFile, 'w') as f:
f.write(js)
def loadRegisterList(registerListFile):
with open(registerListFile, 'r') as f:
js = f.read()
registerList = json.loads(js, object_hook=datapointObjectHook)
return registerList

View File

@ -3,6 +3,7 @@ import datetime
from NotificationForwarder import AbstractNotificationReceiver from NotificationForwarder import AbstractNotificationReceiver
import logging import logging
class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver):
def __init__(self, config, registers, queue): def __init__(self, config, registers, queue):
super().__init__() super().__init__()
@ -10,11 +11,11 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei
self.registers = registers self.registers = registers
self.queue = queue self.queue = queue
self.delayEvent = threading.Event() self.delayEvent = threading.Event()
self.daemon = True # self.daemon = True
self.logger = logging.getLogger('ScanRateConsideringQueueFeeder') self.logger = logging.getLogger('ScanRateConsideringQueueFeeder')
def getMinimalScanrate(self): def getMinimalScanrate(self):
return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])
def receiveNotification(self, arg): def receiveNotification(self, arg):
self.logger.info("ScanRateConsideringQueueFeeder:registersChanged") self.logger.info("ScanRateConsideringQueueFeeder:registersChanged")
@ -26,10 +27,10 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei
registersToBeHandled = [ registersToBeHandled = [
r for r in self.registers if ((not r.enqueued) and r for r in self.registers if ((not r.enqueued) and
(r.scanRate) and (r.scanRate) and
((not r.lastContact) or ((not r.lastContact) or
(r.lastContact + r.scanRate < datetime.datetime.now()))) (r.lastContact + r.scanRate < datetime.datetime.now())))
] ]
registersToBeHandled.sort(key=lambda x : x.scanRate) registersToBeHandled.sort(key=lambda x: x.scanRate)
for r in registersToBeHandled: for r in registersToBeHandled:
self.queue.put(r) self.queue.put(r)
r.enqueued = True r.enqueued = True

View File

@ -4,12 +4,14 @@ import pickle
datapoints = [ datapoints = [
RegisterDatapoint.InputRegisterDatapoint('Temperature', 5, 0x0001, 1, datetime.timedelta(seconds=1.0), False, 'Pub/Temperature'), RegisterDatapoint.InputRegisterDatapoint('Temperature', 5, 0x0001, 1,
RegisterDatapoint.InputRegisterDatapoint('Humidity', 5, 0x0002, 1, datetime.timedelta(seconds=1.0), True, 'Pub/Humidity'), datetime.timedelta(seconds=1.0), False, 'Pub/Temperature'),
RegisterDatapoint.DiscreteInputDatapoint('Switches', 4, 0x0000, 1, datetime.timedelta(seconds=1.0), True, 'Pub/Switches'), RegisterDatapoint.InputRegisterDatapoint('Humidity', 5, 0x0002, 1,
datetime.timedelta(seconds=1.0), True, 'Pub/Humidity'),
RegisterDatapoint.DiscreteInputDatapoint('Switches', 4, 0x0000, 1,
datetime.timedelta(seconds=1.0), True, 'Pub/Switches'),
] ]
with open('registers.pkl', 'wb') as f: with open('registers.pkl', 'wb') as f:
pickle.dump(datapoints, f) pickle.dump(datapoints, f)

6
src/loadRegisterFile.py Normal file
View File

@ -0,0 +1,6 @@
import RegisterDatapoint
registers = RegisterDatapoint.loadRegisterList('registers.json')
for r in registers:
print("{0!s}".format(r))

View File

@ -10,17 +10,17 @@ import datetime
import RegisterDatapoint import RegisterDatapoint
import pickle import pickle
import logging import logging
import Pins
import Heartbeat
if __name__ == "__main__": if __name__ == "__main__":
config = Config.Config() config = Config.Config()
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.DEBUG) logger.setLevel(logging.INFO)
fh = logging.FileHandler(config.logFile) fh = logging.FileHandler(config.logFile)
fh.setLevel(logging.DEBUG) fh.setLevel(logging.INFO)
ch = logging.StreamHandler() ch = logging.StreamHandler()
ch.setLevel(logging.ERROR) ch.setLevel(logging.ERROR)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@ -29,12 +29,13 @@ if __name__ == "__main__":
logger.addHandler(fh) logger.addHandler(fh)
logger.addHandler(ch) logger.addHandler(ch)
Pins.pinsInit()
queue = MyPriorityQueue.MyPriorityQueue() queue = MyPriorityQueue.MyPriorityQueue()
pubQueue = Queue() pubQueue = Queue()
nf = NotificationForwarder.NotificationForwarder() nf = NotificationForwarder.NotificationForwarder()
logger.debug('infrastructure prepared') logger.debug('infrastructure prepared')
datapoints = RegisterDatapoint.loadRegisterList(config.registerFile) datapoints = RegisterDatapoint.loadRegisterList(config.registerFile)
logger.debug('datapoints read') logger.debug('datapoints read')
@ -47,11 +48,15 @@ if __name__ == "__main__":
mp.start() mp.start()
logger.debug('MqttProcessor started') logger.debug('MqttProcessor started')
cs = CmdServer.CmdServer(config, nf, datapoints)
cs.start()
logger.debug('CmdServer started')
hb = Heartbeat.Heartbeat(config, pubQueue)
hb.start()
logger.debug('Heartbeat started')
qf = ScanRateConsideringQueueFeeder.ScanRateConsideringQueueFeeder(config, datapoints, queue) qf = ScanRateConsideringQueueFeeder.ScanRateConsideringQueueFeeder(config, datapoints, queue)
nf.register(qf) nf.register(qf)
qf.start() qf.start()
logger.debug('ScanRateConsideringQueueFeeder started') logger.debug('ScanRateConsideringQueueFeeder started')
cs = CmdServer.CmdServer(config, nf, datapoints)
cs.start()
logger.debug('CmdServer started')

19
src/modbusMaster.service Normal file
View File

@ -0,0 +1,19 @@
[Unit]
Description=ModbusMaster
Wants=network-online.target
After=network-online.target
[Service]
Type=simple
GuessMainPID=yes
ExecStart=/usr/bin/python master.py
ExecStop=kill -SIGINT $mainpid
Restart=on-failure
WorkingDirectory=/opt/services/modbusMaster
[Install]
Alias=ModbusMaster
WantedBy=multi-user.target

116
src/registers.json Normal file
View File

@ -0,0 +1,116 @@
[
{
"args": {
"address": 1,
"converter": "dht20TOFloat",
"count": 1,
"label": "Temperature",
"publishTopic": "IoT/Measurment/Modbus2/Laundry/Temperature",
"scanRate": 60.0,
"unit": 5,
"updateOnly": false
},
"type": "InputRegisterDatapoint"
},
{
"args": {
"address": 2,
"converter": "dht20TOFloat",
"count": 1,
"label": "Humidity",
"publishTopic": "IoT/Measurment/Modbus2/Laundry/Humidity",
"scanRate": 60.0,
"unit": 5,
"updateOnly": false
},
"type": "InputRegisterDatapoint"
},
{
"args": {
"address": 0,
"bitCount": 8,
"converter": null,
"count": 1,
"label": "Switches",
"publishTopic": "IoT/Status/Modbus2/Switches",
"scanRate": 0.5,
"unit": 4,
"updateOnly": true
},
"type": "DiscreteInputDatapoint"
},
{
"args": {
"address": 0,
"feedbackTopic": "IoT/Feedback/Modbus2/Coils/0",
"label": "Coil0",
"publishTopic": "IoT/Status/Modbus2/Coils/0",
"scanRate": 0.0,
"subscribeTopic": "IoT/Action/Modbus2/Coils/0",
"unit": 4
},
"type": "CoilDatapoint"
},
{
"args": {
"address": 1,
"feedbackTopic": "IoT/Feedback/Modbus2/Coils/1",
"label": "Coil1",
"publishTopic": "IoT/Status/Modbus2/Coils/1",
"scanRate": 0.0,
"subscribeTopic": "IoT/Action/Modbus2/Coils/1",
"unit": 4
},
"type": "CoilDatapoint"
},
{
"args": {
"address": 2,
"feedbackTopic": "IoT/Feedback/Modbus2/Coils/2",
"label": "Coil2",
"publishTopic": "IoT/Status/Modbus2/Coils/2",
"scanRate": 0.0,
"subscribeTopic": "IoT/Action/Modbus2/Coils/2",
"unit": 4
},
"type": "CoilDatapoint"
},
{
"args": {
"address": 3,
"feedbackTopic": "IoT/Feedback/Modbus2/Coils/3",
"label": "Coil3",
"publishTopic": "IoT/Status/Modbus2/Coils/3",
"scanRate": 0.0,
"subscribeTopic": "IoT/Action/Modbus2/Coils/3",
"unit": 4
},
"type": "CoilDatapoint"
},
{
"args": {
"address": 1,
"converter": "fix1twos",
"count": 1,
"label": "wago1",
"publishTopic": "IoT/Measurement/Modbus2/Wago1",
"scanRate": 1.0,
"unit": 11,
"updateOnly": false
},
"type": "InputRegisterDatapoint"
},
{
"args": {
"address": 0,
"converter": "fix1twos",
"count": 1,
"label": "Freezer",
"publishTopic": "IoT/Measurement/Modbus2/Freezer",
"scanRate": 1.0,
"unit": 11,
"updateOnly": false
},
"type": "InputRegisterDatapoint"
}
]

Binary file not shown.

View File

@ -1,24 +1,21 @@
import datetime import datetime
import RegisterDatapoint import RegisterDatapoint
import pickle import pickle
import json
with open('registers.pkl', 'rb') as f: with open('registers.pkl', 'rb') as f:
datapoints = pickle.load(f) datapoints = pickle.load(f)
RegisterDatapoint.checkRegisterList(datapoints, reset=True)
newDatapoints = [] newDatapoints = []
for dp in datapoints: for dp in datapoints:
ndp = type(dp)() ndp = type(dp)()
for k,v in dp.__dict__.items(): for k, v in dp.__dict__.items():
ndp.__dict__[k] = v if k != 'logger':
ndp.__dict__[k] = v
newDatapoints.append(ndp) newDatapoints.append(ndp)
RegisterDatapoint.checkRegisterList(newDatapoints, reset=True) js = json.dumps(newDatapoints, cls=RegisterDatapoint.JsonifyEncoder, sort_keys=True, indent=4)
print(js)
with open('registers.pkl', 'wb') as f: RegisterDatapoint.saveRegisterList(newDatapoints, 'registers.json')
pickle.dump(newDatapoints, f)