Compare commits
139 Commits
WORKS1
...
refactor_p
Author | SHA1 | Date | |
---|---|---|---|
b6077b0ec8 | |||
cab0aec533
|
|||
35c282b6ea | |||
4b860ee43a
|
|||
d3eae2cd8d
|
|||
768ded37a6
|
|||
2cadccf59c | |||
c779f07cda
|
|||
84f4ecf5a1 | |||
d8ce7c8bc2 | |||
080b22646d | |||
2a704738b2
|
|||
5c8b2599a6
|
|||
31cf5ceb5a
|
|||
a0efe1129b | |||
7a8a3c661d
|
|||
122fce519c | |||
4200aaf304
|
|||
0f037b02ea
|
|||
ac47ff0ebe | |||
264d1cab14
|
|||
ab91feafd0 | |||
2460f570d8
|
|||
f6d4218e57 | |||
bf3475a796
|
|||
0bae0f4bb2 | |||
b9e0fefe17
|
|||
34ca87f734 | |||
de01ec20e2
|
|||
6821364273
|
|||
e297149772 | |||
6243cad505
|
|||
8fa69d1610
|
|||
2fc6fe0830 | |||
d97b7469fe
|
|||
2665fdd5e3
|
|||
3e1b9acb86
|
|||
21555736f6 | |||
ab31aea3e0 | |||
90cdde8bfe | |||
b7193d8d58 | |||
8c87460632 | |||
454632ec36 | |||
d89d705b76
|
|||
9f6f449c5a
|
|||
5a9a6d67f0 | |||
21c9bd0eb1
|
|||
cefebd69c6
|
|||
a251015a33
|
|||
c1bb481cac
|
|||
c3cd27261d
|
|||
1cf6378ca0 | |||
34766a0d32 | |||
c6552317db
|
|||
870bbcb66b
|
|||
2c4b42850e | |||
b48f56260a
|
|||
048dceeb02
|
|||
3b61437da7
|
|||
782a4a296b
|
|||
7c6ebd8d0c
|
|||
b63e513200
|
|||
57c2c7251d
|
|||
b179b4de76
|
|||
8540bd6da5
|
|||
338d289fa6
|
|||
5982486940
|
|||
3c6c420178
|
|||
edeb4ea0f0
|
|||
06733b0507
|
|||
fc0f7b0a1a
|
|||
b1e0e700d3
|
|||
39a1b18234
|
|||
46e40500f5
|
|||
a1acf04dbe
|
|||
87e2e65ce2 | |||
22b8ee6404 | |||
9bdd889bc4
|
|||
e9e2e41491
|
|||
55344b158b
|
|||
80a8304986
|
|||
6229989dd6 | |||
e1750e5387
|
|||
99238720c9 | |||
3024cadb5d
|
|||
1a1ba21c45
|
|||
7c8663d539 | |||
37aa84d0f5
|
|||
58467d9c9d | |||
12f83d21ff
|
|||
93b3333356
|
|||
303f4b50f1
|
|||
25bb774a5d
|
|||
c6a340746a
|
|||
f7290b3ef2
|
|||
04c1d777e4
|
|||
756ba2175d | |||
75ddb6069a
|
|||
55f875e27e | |||
761a1b35e9 | |||
851cfd76d8
|
|||
8d6d8e5901
|
|||
54d33007fc
|
|||
a2a5a924bd
|
|||
c30acfabdb
|
|||
44f82937d3
|
|||
7674aac137
|
|||
37548cfd53
|
|||
4a090c5a73
|
|||
7990567378
|
|||
04be7219c2
|
|||
146f8df8e8
|
|||
186b1c5adb
|
|||
2ba9f83569 | |||
3c582ca833
|
|||
3fbcedc7ed | |||
344b68e3e3 | |||
5e15f1cda6 | |||
eee1db510c
|
|||
d19bc80783
|
|||
fcefd538d8
|
|||
faa8b82236
|
|||
3fd0ed27b4 | |||
bf12960dc4 | |||
f8fe6485d3 | |||
8fd1367f24
|
|||
fa405d539c
|
|||
9ebb9eb403
|
|||
754812155f | |||
ad9c6291c0 | |||
66ef2124d3 | |||
5c726a000a | |||
eb455916ae | |||
10365339df | |||
2b84200f9e | |||
9fcd37006f | |||
4b812106ad | |||
b4ba3e7868 | |||
fef904e761
|
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
**/*.o
|
||||
**/*.so
|
||||
**/*.pyc
|
||||
.pymodhis
|
||||
**/__pycache__
|
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -1,3 +0,0 @@
|
||||
[submodule "rpirtscts"]
|
||||
path = rpirtscts
|
||||
url = https://github.com/wollud1969/rpirtscts.git
|
||||
|
15
.pymodhis
15
.pymodhis
@ -1,15 +0,0 @@
|
||||
|
||||
# 2019-06-14 22:50:36.107575
|
||||
+help
|
||||
|
||||
# 2019-06-14 22:50:54.780566
|
||||
+q
|
||||
|
||||
# 2019-06-14 22:52:53.061353
|
||||
+help
|
||||
|
||||
# 2019-06-14 22:53:29.359029
|
||||
+client.read_ho
|
||||
|
||||
# 2019-06-14 22:53:48.878000
|
||||
+client.read_holding_registers unit=3 address=0 count=3
|
BIN
docs/modbus-not-ok.png
Normal file
BIN
docs/modbus-not-ok.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 8.8 KiB |
BIN
docs/modbus-ok.png
Normal file
BIN
docs/modbus-ok.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 9.0 KiB |
BIN
docs/modbus-really-ok.png
Normal file
BIN
docs/modbus-really-ok.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 9.6 KiB |
BIN
docs/nice-signals-with-fail-safe-resistors.png
Normal file
BIN
docs/nice-signals-with-fail-safe-resistors.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 8.6 KiB |
BIN
docs/rs485_should_be_ok_now.JPG
Normal file
BIN
docs/rs485_should_be_ok_now.JPG
Normal file
Binary file not shown.
After Width: | Height: | Size: 1.7 MiB |
@ -1,13 +0,0 @@
|
||||
CFLAGS=-I/opt/bcm2835/include
|
||||
|
||||
writec.so: writec.o /opt/bcm2835/lib/libbcm2835.a
|
||||
$(LD) -shared -o $@ $^
|
||||
|
||||
writec.o: writec.c
|
||||
|
||||
.c.o:
|
||||
$(CC) $(CFLAGS) -c $<
|
||||
|
||||
.PHONY: clean
|
||||
clean:
|
||||
-rm -f *.so *.o
|
@ -1,19 +0,0 @@
|
||||
import serial.rs485
|
||||
import serial.serialutil
|
||||
import ctypes
|
||||
|
||||
|
||||
class RS485Ext(serial.rs485.RS485):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RS485Ext, self).__init__(*args, **kwargs)
|
||||
self.writec = ctypes.cdll.LoadLibrary('writec.so')
|
||||
fd = self.fileno()
|
||||
r = self.writec.set_rs485_mode(fd)
|
||||
|
||||
def write(self, b):
|
||||
d = serial.serialutil.to_bytes(b)
|
||||
l = len(d)
|
||||
fd = self.fileno()
|
||||
r = self.writec.writec(fd, d, l)
|
||||
return r
|
||||
|
Binary file not shown.
@ -1,31 +0,0 @@
|
||||
#include <unistd.h>
|
||||
#include <termios.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <stdio.h>
|
||||
#include <linux/serial.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <bcm2835.h>
|
||||
|
||||
|
||||
const uint8_t DE_PIN = RPI_V2_GPIO_P1_11;
|
||||
|
||||
int set_rs485_mode(int fd) {
|
||||
bcm2835_init();
|
||||
bcm2835_gpio_fsel(DE_PIN, BCM2835_GPIO_FSEL_OUTP);
|
||||
bcm2835_gpio_write(DE_PIN, LOW);
|
||||
}
|
||||
|
||||
ssize_t writec(int fd, char *buf, size_t count) {
|
||||
bcm2835_gpio_write(DE_PIN, LOW);
|
||||
ssize_t r = write(fd, buf, count);
|
||||
uint8_t lsr;
|
||||
do {
|
||||
int r = ioctl(fd, TIOCSERGETLSR, &lsr);
|
||||
} while (!(lsr & TIOCSER_TEMT));
|
||||
bcm2835_gpio_write(DE_PIN, HIGH);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
Binary file not shown.
Binary file not shown.
123
readme.md
123
readme.md
@ -1,5 +1,8 @@
|
||||
# modbusmaster
|
||||
|
||||
|
||||

|
||||
|
||||
## Disable Bluetooth on RPi3
|
||||
|
||||
Add at the end of `/boot/config`:
|
||||
@ -11,6 +14,7 @@ Remove mentions of `serial0` from `/boot/cmdline`.
|
||||
|
||||
## Enable rs485 mode
|
||||
|
||||
<del>
|
||||
Use the submodule rpirtscts to enable to alternate functions of the related
|
||||
pins at the RPi MCU. It is submoduled here, can be found directly at https://github.com/mholling/rpirtscts
|
||||
|
||||
@ -23,6 +27,10 @@ This needs to be done at every boot.
|
||||
Kudos to danjperron, cmp. https://www.raspberrypi.org/forums/viewtopic.php?f=98&t=224533&hilit=rs+485#p1383709
|
||||
|
||||
(Note, please: This software is under the GPL 3.0 license. However, I do not derive from this software, I use it in an unchanged way. It is not integrated into my sources, it just needs to be called once the RPi has booted.)
|
||||
</del>
|
||||
|
||||
The approach to enable the transmitter has been changed due to timing issues with disabling the transmitter after the TX phase when talking to Modbus devices (see below), so this step is not longer required.
|
||||
|
||||
|
||||
## Pinout
|
||||
|
||||
@ -38,6 +46,10 @@ TX is at GPIO14, RX is at GPIO15 and RTS (control line for transmitter enable) i
|
||||
|
||||
## Python snippet to test
|
||||
|
||||
### Pure RS485 test
|
||||
|
||||
#### Test 1: Transmit only
|
||||
|
||||
(See also `./snippets/test1.py`.)
|
||||
|
||||
import serial.rs485
|
||||
@ -52,4 +64,115 @@ Find an signal screenshot here:
|
||||
Channel 1 in yellow has the TX line (GPIO14), channel 3 in purple has the RTS (GPIO17, transmitter enable line).
|
||||
|
||||
|
||||
#### Test 2: Transmit and receive
|
||||
|
||||
(See also `./snippets/test2.py`.)
|
||||
|
||||
Here in contrast to the last script first an octet is received and then echoed.
|
||||
|
||||
import serial.rs485
|
||||
ser=serial.rs485.RS485(port='/dev/ttyAMA0',baudrate=2400)
|
||||
ser.rs485_mode = serial.rs485.RS485Settings(False,True)
|
||||
ser.write('a test'.encode('utf-8'))
|
||||
|
||||
while True:
|
||||
c = ser.read(1)
|
||||
ser.write(c)
|
||||
print(c, end='')
|
||||
|
||||
The signal graph is here:
|
||||
|
||||

|
||||
|
||||
Channel 1 in yellow has the TX line (GPIO14), channel 2 in blue has the RX line and channel 3 in purple has the RTS (GPIO17, transmitter enable line).
|
||||
|
||||
It is obvious that the DE (transmitter enable line, RTS controlled by `pyserial` in RS485 mode) is hold active a good while after all data already have been transmitted.
|
||||
|
||||
|
||||
#### Test 3: First Modbus communication
|
||||
|
||||
(See also `./snippets/test3.py`.)
|
||||
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
import serial.rs485
|
||||
|
||||
ser=serial.rs485.RS485(port='/dev/ttyAMA0',baudrate=1200)
|
||||
ser.rs485_mode = serial.rs485.RS485Settings(rts_level_for_tx=False,
|
||||
rts_level_for_rx=True,
|
||||
delay_before_tx=0.005,
|
||||
delay_before_rx=-0.0)
|
||||
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = ser
|
||||
client.connect()
|
||||
result = client.read_holding_registers(address=0x2000, count=2, unit=1)
|
||||
print(result)
|
||||
print(result.registers)
|
||||
client.close()
|
||||
|
||||
Here, the port is initialized in RS485 mode and a device is queried.
|
||||
|
||||
Sometimes this works:
|
||||
|
||||

|
||||
|
||||
But sometimes is does not work:
|
||||
|
||||

|
||||
|
||||
The long hold time of about 18ms of the DE (transmitter enable, RTS line) becomes a problem, the response of the device already starts when the transmitter of the master is still enabled and thus the receiver of the master is still disabled.
|
||||
|
||||
A couple of experiments with deriving from the `RS485` class of `pyserial` and moving the time critical code (disabling the transmitter after the transmit) into C code failed. It wasn't faster at all. It became obvious that the system call `tcdrain`, which waits for all octets in the buffer to be transmitted returns very late.
|
||||
|
||||
Finally, the solution was to get away from the RS485 mode in `pyserial` and instead use the line status register of the UART via a system call to see whether the transmit register is empty and switch the DE line of the transmitter not longer with the RTS functionality but directly using `wiringPi`.
|
||||
|
||||
Derived class from `RS485` in `pyserial` (maybe this can be switched to the regular `Serial` class ...)
|
||||
|
||||
class RS485Ext(serial.rs485.RS485):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RS485Ext, self).__init__(*args, **kwargs)
|
||||
self.writec = ctypes.cdll.LoadLibrary('writec.so')
|
||||
r = self.writec.init()
|
||||
|
||||
def write(self, b):
|
||||
d = serial.serialutil.to_bytes(b)
|
||||
r = self.writec.writec(self.fileno(), d, len(d))
|
||||
return r
|
||||
|
||||
|
||||
C code of the function `writec` in the library `writec`:
|
||||
|
||||
ssize_t writec(int fd, char *buf, size_t count) {
|
||||
digitalWrite(DE_PIN, HIGH);
|
||||
ssize_t r = write(fd, buf, count);
|
||||
uint8_t lsr;
|
||||
do {
|
||||
int r = ioctl(fd, TIOCSERGETLSR, &lsr);
|
||||
} while (!(lsr & TIOCSER_TEMT));
|
||||
digitalWrite(DE_PIN, LOW);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
This change brought the break through:
|
||||
|
||||

|
||||
|
||||
The delta between TX end and DE disable is now only 2.2ms (instead of nearly 20ms before).
|
||||
|
||||
And finally Modbus communication works a lot more reliable:
|
||||
|
||||

|
||||
|
||||
However, still a lot of errors in the Modbus communication, certainly because of the dirty RX signal (blue). Furthermore it appears that the communication fails completely as soon as the termination resistor of 120 Ohms was placed.
|
||||
|
||||
A hint from the application AN-960 from Analog Devices (https://www.analog.com/media/en/technical-documentation/application-notes/AN-960.pdf) helped here. In chapter "Fail-Safe Biasing" it is explained that, when no transmitter is active at all the lines are completely floating and thus the RX signal behind the transceiver becomes dirty, as in the image above.
|
||||
|
||||
Pulling the A line to `Vcc` and the B line to `Gnd` using 1.5kOhm resistors, as proposed, solved this problem. And now also the termination resistor worked as expected.
|
||||
|
||||

|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
Submodule rpirtscts deleted from 612b065e38
@ -1,18 +1,99 @@
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
import serial.rs485
|
||||
from pymodbus.pdu import ExceptionResponse
|
||||
import RS485Ext
|
||||
import struct
|
||||
import time
|
||||
|
||||
#ser=serial.rs485.RS485(port='/dev/ttyAMA0',baudrate=1200)
|
||||
#ser.rs485_mode = serial.rs485.RS485Settings(rts_level_for_tx=False,
|
||||
# rts_level_for_rx=True,
|
||||
# delay_before_tx=0.005,
|
||||
# delay_before_rx=-0.0)
|
||||
ser=RS485Ext.RS485Ext(port='/dev/ttyAMA0', baudrate=1200)
|
||||
def registersToIeeeFloat(i):
|
||||
return struct.unpack('f', bytes(
|
||||
[((x & 0xff00) >> 8) if y else (x & 0x00ff)
|
||||
for x in i[::-1]
|
||||
for y in [False, True]
|
||||
]
|
||||
)
|
||||
)[0]
|
||||
|
||||
def registersToIeeeFloatReverse(i):
|
||||
return struct.unpack('f', bytes(
|
||||
[((x & 0xff00) >> 8) if y else (x & 0x00ff)
|
||||
for x in i
|
||||
for y in [False, True]
|
||||
]
|
||||
)
|
||||
)[0]
|
||||
|
||||
class ModbusException(Exception):
|
||||
def __init__(self, resp):
|
||||
self.msg = str(result)
|
||||
|
||||
def __str__(self):
|
||||
return self.msg
|
||||
|
||||
|
||||
|
||||
ser=RS485Ext.RS485Ext(port='/dev/ttyAMA0', baudrate=1200, stopbits=1)
|
||||
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = ser
|
||||
client.connect()
|
||||
result = client.read_holding_registers(address=0x2000, count=2, unit=1)
|
||||
print(result)
|
||||
print(result.registers)
|
||||
client.close()
|
||||
|
||||
delay = 0.05
|
||||
|
||||
try:
|
||||
# BG-Tech, Voltage
|
||||
result = client.read_holding_registers(address=0x2000, count=2, unit=1)
|
||||
if type(result) == ExceptionResponse:
|
||||
raise ModbusException(result)
|
||||
print("Voltage: {:.2f}".format(registersToIeeeFloat(result.registers)))
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
# BG-Tech, Frequency
|
||||
result = client.read_holding_registers(address=0x2020, count=2, unit=1)
|
||||
if type(result) == ExceptionResponse:
|
||||
raise ModbusException(result)
|
||||
print("Frequency: {:.2f}".format(registersToIeeeFloat(result.registers)))
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
# BG-Tech, Current
|
||||
result = client.read_holding_registers(address=0x2060, count=2, unit=1)
|
||||
if type(result) == ExceptionResponse:
|
||||
raise ModbusException(result)
|
||||
print("Current: {:.2f}".format(registersToIeeeFloat(result.registers)))
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
# Hottis Thermometer, Resistance Channel 1
|
||||
result = client.read_holding_registers(address=0x0004, count=2, unit=3)
|
||||
if type(result) == ExceptionResponse:
|
||||
raise ModbusException(result)
|
||||
print("Resistance Channel 1: {:.2f}".format(registersToIeeeFloatReverse(result.registers)))
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
# Hottis Thermometer, Temperature Channel 1
|
||||
result = client.read_holding_registers(address=0x000c, count=2, unit=3)
|
||||
if type(result) == ExceptionResponse:
|
||||
raise ModbusException(result)
|
||||
print("Temperature Channel 1: {:.2f}".format(registersToIeeeFloatReverse(result.registers)))
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
# Hottis Thermometer, Resistance Channel 2
|
||||
result = client.read_holding_registers(address=0x0014, count=2, unit=3)
|
||||
if type(result) == ExceptionResponse:
|
||||
raise ModbusException(result)
|
||||
print("Resistance Channel 2: {:.2f}".format(registersToIeeeFloatReverse(result.registers)))
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
# Hottis Thermometer, Temperature Channel 2
|
||||
result = client.read_holding_registers(address=0x001c, count=2, unit=3)
|
||||
if type(result) == ExceptionResponse:
|
||||
raise ModbusException(result)
|
||||
print("Temperature Channel 2: {:.2f}".format(registersToIeeeFloatReverse(result.registers)))
|
||||
except ModbusException as e:
|
||||
print("ERROR: %s" % e)
|
||||
finally:
|
||||
client.close()
|
||||
|
21
snippets/test6.py
Normal file
21
snippets/test6.py
Normal file
@ -0,0 +1,21 @@
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
import RS485Ext
|
||||
import time
|
||||
|
||||
ser=RS485Ext.RS485Ext(port='/dev/ttyAMA0', baudrate=9600, stopbits=1)
|
||||
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = ser
|
||||
client.connect()
|
||||
|
||||
while True:
|
||||
try:
|
||||
result = client.read_holding_registers(address=0x9c42, count=1, unit=1)
|
||||
print(result)
|
||||
print(result.registers)
|
||||
except Exception as e:
|
||||
print("ERROR: %s" % str(e))
|
||||
time.sleep(2)
|
||||
|
||||
client.close()
|
||||
|
23
snippets/test6b.py
Normal file
23
snippets/test6b.py
Normal file
@ -0,0 +1,23 @@
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
import RS485Ext
|
||||
import time
|
||||
|
||||
ser=RS485Ext.RS485Ext(port='/dev/ttyAMA0', baudrate=9600, stopbits=1)
|
||||
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = ser
|
||||
client.connect()
|
||||
|
||||
try:
|
||||
result = client.read_holding_registers(address=0x9c43, count=1, unit=4)
|
||||
# result = client.read_holding_registers(address=0x0102, count=1, unit=5)
|
||||
# result = client.read_input_registers(address=0x0002, count=1, unit=5)
|
||||
# result = client.read_discrete_inputs(address=0x0000, count=1, unit=4)
|
||||
print(result)
|
||||
print(result.registers)
|
||||
# print(result.bits)
|
||||
except Exception as e:
|
||||
print("ERROR: %s" % str(e))
|
||||
|
||||
client.close()
|
||||
|
19
snippets/test6c.py
Normal file
19
snippets/test6c.py
Normal file
@ -0,0 +1,19 @@
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
import RS485Ext
|
||||
import time
|
||||
|
||||
ser=RS485Ext.RS485Ext(port='/dev/ttyAMA0', baudrate=9600, stopbits=1)
|
||||
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = ser
|
||||
client.connect()
|
||||
|
||||
try:
|
||||
result = client.write_register(address=0x9c48, unit=4, value=0x000F)
|
||||
# result = client.write_coil(address=0x0000, unit=4, value=1)
|
||||
print(result)
|
||||
except Exception as e:
|
||||
print("ERROR: %s" % str(e))
|
||||
|
||||
client.close()
|
||||
|
25
snippets/test6d.py
Normal file
25
snippets/test6d.py
Normal file
@ -0,0 +1,25 @@
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
import RS485Ext
|
||||
import time
|
||||
|
||||
ser=RS485Ext.RS485Ext(port='/dev/ttyAMA0', baudrate=1200, stopbits=1)
|
||||
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = ser
|
||||
client.connect()
|
||||
|
||||
v = 0
|
||||
while True:
|
||||
if v == 0:
|
||||
v = 1
|
||||
else:
|
||||
v = 0
|
||||
try:
|
||||
result = client.write_coil(address=0x0000, unit=4, value=v)
|
||||
print(result)
|
||||
except Exception as e:
|
||||
print("ERROR: %s" % str(e))
|
||||
time.sleep(0.5)
|
||||
|
||||
client.close()
|
||||
|
21
snippets/test6e.py
Normal file
21
snippets/test6e.py
Normal file
@ -0,0 +1,21 @@
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
import RS485Ext
|
||||
import time
|
||||
|
||||
ser=RS485Ext.RS485Ext(port='/dev/ttyAMA0', baudrate=1200, stopbits=1)
|
||||
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = ser
|
||||
client.connect()
|
||||
|
||||
while True:
|
||||
try:
|
||||
result = client.read_discrete_inputs(address=0x0000, count=1, unit=4)
|
||||
print(result)
|
||||
print(result.bits)
|
||||
except Exception as e:
|
||||
print("ERROR: %s" % str(e))
|
||||
time.sleep(0.5)
|
||||
|
||||
client.close()
|
||||
|
117
snippets/test7.py
Normal file
117
snippets/test7.py
Normal file
@ -0,0 +1,117 @@
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
from pymodbus.pdu import ExceptionResponse
|
||||
from pymodbus.exceptions import ModbusIOException
|
||||
import RS485Ext
|
||||
import struct
|
||||
import time
|
||||
|
||||
def registersToIeeeFloat(i):
|
||||
return struct.unpack('f', bytes(
|
||||
[((x & 0xff00) >> 8) if y else (x & 0x00ff)
|
||||
for x in i[::-1]
|
||||
for y in [False, True]
|
||||
]
|
||||
)
|
||||
)[0]
|
||||
|
||||
def registersToIeeeFloatReverse(i):
|
||||
return struct.unpack('f', bytes(
|
||||
[((x & 0xff00) >> 8) if y else (x & 0x00ff)
|
||||
for x in i
|
||||
for y in [False, True]
|
||||
]
|
||||
)
|
||||
)[0]
|
||||
|
||||
def dataConverter(t, d):
|
||||
if t == 'F':
|
||||
return registersToIeeeFloat(d)
|
||||
elif t == 'RF':
|
||||
return registersToIeeeFloatReverse(d)
|
||||
else:
|
||||
raise Exception("Converter '{0}' is not supported".format(t))
|
||||
|
||||
|
||||
class ModbusException(Exception):
|
||||
def __init__(self, resp):
|
||||
self.msg = str(result)
|
||||
|
||||
def __str__(self):
|
||||
return self.msg
|
||||
|
||||
class ModbusRequestDefinition(object):
|
||||
def __init__(self, kind, unit, address, count, converter, label):
|
||||
self.kind = kind
|
||||
self.unit = unit
|
||||
self.address = address
|
||||
self.count = count
|
||||
self.converter = converter
|
||||
self.label = label
|
||||
|
||||
reqs = [
|
||||
# ModbusRequestDefinition(4, 0x2000, 2, 'F', '(ERR) Unavailable device'),
|
||||
# ModbusRequestDefinition(1, 0x2000, 4, 'F', '(ERR) Wrong register size'),
|
||||
# ModbusRequestDefinition(1, 0x2000, 2, 'F', 'Voltage'),
|
||||
# ModbusRequestDefinition(1, 0x2020, 2, 'F', 'Frequency'),
|
||||
# ModbusRequestDefinition(1, 0x2060, 2, 'F', 'Current'),
|
||||
# ModbusRequestDefinition('H', 3, 0x0004, 2, 'RF', 'Resistance Channel 1'),
|
||||
# ModbusRequestDefinition('H', 3, 0x000C, 2, 'RF', 'Temperature Channel 1'),
|
||||
# ModbusRequestDefinition('H', 3, 0x0014, 2, 'RF', 'Resistance Channel 2'),
|
||||
# ModbusRequestDefinition('H', 3, 0x001C, 2, 'RF', 'Temperature Channel 2'),
|
||||
ModbusRequestDefinition('D', 4, 0x0000, 1, '', 'Discrete Input'),
|
||||
ModbusRequestDefinition('I', 5, 0x0001, 1, '', 'Temperature'),
|
||||
ModbusRequestDefinition('I', 5, 0x0002, 1, '', 'Humidity'),
|
||||
]
|
||||
|
||||
|
||||
def getSerial():
|
||||
return RS485Ext.RS485Ext(port='/dev/ttyAMA0', baudrate=9600, stopbits=1,
|
||||
timeout=1)
|
||||
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = getSerial()
|
||||
client.connect()
|
||||
|
||||
delay = 0.05
|
||||
period = 0.5
|
||||
|
||||
while True:
|
||||
for req in reqs:
|
||||
try:
|
||||
time.sleep(delay)
|
||||
if req.kind == 'H':
|
||||
# print("Trying to read: {0} {1} {2}".format(req.address, req.count, req.unit))
|
||||
result = client.read_holding_registers(address=req.address,
|
||||
count=req.count,
|
||||
unit=req.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
raise ModbusException(result)
|
||||
print("{0}: {1:.2f}".format(req.label,
|
||||
dataConverter(req.converter,
|
||||
result.registers)))
|
||||
elif req.kind == 'D':
|
||||
result = client.read_discrete_inputs(address=req.address,
|
||||
count=req.count,
|
||||
unit=req.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
raise ModbusException(result)
|
||||
print("{0}: {1!s}".format(req.label, result.bits))
|
||||
elif req.kind == 'I':
|
||||
result = client.read_input_registers(address=req.address,
|
||||
count=req.count,
|
||||
unit=req.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
raise ModbusException(result)
|
||||
print("{0}: {1}".format(req.label, result.registers))
|
||||
except ModbusException as e:
|
||||
print("ERROR when querying '{0}': {1!s}".format(req.label, e))
|
||||
if client.socket is None:
|
||||
print("renew socket")
|
||||
client.socket = getSerial()
|
||||
|
||||
print("-------------")
|
||||
time.sleep(period)
|
||||
|
||||
|
||||
client.close()
|
||||
|
105
snippets/test8.py
Normal file
105
snippets/test8.py
Normal file
@ -0,0 +1,105 @@
|
||||
import unittest
|
||||
import json
|
||||
|
||||
class A(object):
|
||||
def __init__(self, arg1=None, arg2=None):
|
||||
self.arg1 = arg1
|
||||
self.arg2 = arg2
|
||||
self.arg3 = self.arg1 + self.arg2
|
||||
|
||||
def __str__(self):
|
||||
return "A: {0!s} {1!s} {2!s}".format(self.arg1, self.arg2, self.arg3)
|
||||
|
||||
def jsonify(self):
|
||||
return {'type':self.__class__.__name__, 'args': {'arg1':self.arg1, 'arg2':self.arg2}}
|
||||
|
||||
class MyEncoder(json.JSONEncoder):
|
||||
def default(self, o):
|
||||
try:
|
||||
return o.jsonify()
|
||||
except TypeError or AttributeError:
|
||||
return super().default(o)
|
||||
|
||||
|
||||
def objectFactory(j):
|
||||
klass = eval(j['type'])
|
||||
o = klass(**j['args'])
|
||||
return o
|
||||
|
||||
def MyDecoder(j):
|
||||
if type(j) == dict and 'type' in j:
|
||||
return objectFactory(j)
|
||||
else:
|
||||
return j
|
||||
|
||||
class Tests(unittest.TestCase):
|
||||
def test_a1(self):
|
||||
a1 = A(1, 2)
|
||||
self.assertEqual(a1.arg1, 1)
|
||||
self.assertEqual(a1.arg2, 2)
|
||||
self.assertEqual(a1.arg3, 3)
|
||||
|
||||
def test_a2(self):
|
||||
a2 = A(**{'arg1':2, 'arg2':4})
|
||||
self.assertEqual(a2.arg1, 2)
|
||||
self.assertEqual(a2.arg2, 4)
|
||||
self.assertEqual(a2.arg3, 6)
|
||||
|
||||
def test_a3(self):
|
||||
j = '{ "type": "A", "args": { "arg1": 3, "arg2": 5 } }'
|
||||
jj = json.loads(j)
|
||||
klass = eval(jj['type'])
|
||||
self.assertEqual(A, klass)
|
||||
a3 = klass(**jj['args'])
|
||||
self.assertEqual(a3.arg1, 3)
|
||||
self.assertEqual(a3.arg2, 5)
|
||||
self.assertEqual(a3.arg3, 8)
|
||||
|
||||
def test_a4(self):
|
||||
j = '{ "type": "A", "args": { "arg1": 3, "arg2": 5 } }'
|
||||
jj = json.loads(j)
|
||||
klass = eval(jj['type'])
|
||||
self.assertEqual(A, klass)
|
||||
a3 = klass(**jj['args'])
|
||||
self.assertEqual(a3.arg1, 3)
|
||||
self.assertEqual(a3.arg2, 5)
|
||||
self.assertEqual(a3.arg3, 8)
|
||||
|
||||
jjjj = json.dumps(a3, cls=MyEncoder)
|
||||
|
||||
jjj = json.loads(jjjj)
|
||||
klass = eval(jjj['type'])
|
||||
self.assertEqual(A, klass)
|
||||
a3 = klass(**jjj['args'])
|
||||
self.assertEqual(a3.arg1, 3)
|
||||
self.assertEqual(a3.arg2, 5)
|
||||
self.assertEqual(a3.arg3, 8)
|
||||
|
||||
def test_a5(self):
|
||||
jList = []
|
||||
jList.append(objectFactory({'type':'A', 'args': {'arg1':1, 'arg2':2}}))
|
||||
jList.append(objectFactory({'type':'A', 'args': {'arg1':2, 'arg2':3}}))
|
||||
jList.append(objectFactory({'type':'A', 'args': {'arg1':3, 'arg2':4}}))
|
||||
|
||||
js = json.dumps(jList, cls=MyEncoder, sort_keys=True, indent=4)
|
||||
|
||||
print(js)
|
||||
|
||||
jResultList = json.loads(js, object_hook=MyDecoder)
|
||||
|
||||
self.assertEqual(jResultList[0].arg1, 1)
|
||||
self.assertEqual(jResultList[0].arg2, 2)
|
||||
self.assertEqual(jResultList[0].arg3, 3)
|
||||
|
||||
self.assertEqual(jResultList[1].arg1, 2)
|
||||
self.assertEqual(jResultList[1].arg2, 3)
|
||||
self.assertEqual(jResultList[1].arg3, 5)
|
||||
|
||||
self.assertEqual(jResultList[2].arg1, 3)
|
||||
self.assertEqual(jResultList[2].arg2, 4)
|
||||
self.assertEqual(jResultList[2].arg3, 7)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
14
snippets/test9.py
Normal file
14
snippets/test9.py
Normal file
@ -0,0 +1,14 @@
|
||||
class A(object):
|
||||
def __init__(self):
|
||||
self.a = 1
|
||||
def x(self):
|
||||
return self.a
|
||||
|
||||
class B(A):
|
||||
def __init__(self):
|
||||
self.a = 2
|
||||
def x(self):
|
||||
return self.a
|
||||
def y(self):
|
||||
return super().x()
|
||||
|
436
src/CmdServer.py
Normal file
436
src/CmdServer.py
Normal file
@ -0,0 +1,436 @@
|
||||
import threading
|
||||
import socketserver
|
||||
import cmd
|
||||
import re
|
||||
import io
|
||||
import datetime
|
||||
import RegisterDatapoint
|
||||
import logging
|
||||
import Converters
|
||||
|
||||
class CmdInterpreterException(ValueError): pass
|
||||
|
||||
def parseIntArbitraryBase(s):
|
||||
i = 0
|
||||
if s.startswith('0x'):
|
||||
i = int(s, 16)
|
||||
elif s.startswith('0b'):
|
||||
i = int(s, 2)
|
||||
else:
|
||||
i = int(s, 10)
|
||||
return i
|
||||
|
||||
class CmdInterpreter(cmd.Cmd):
|
||||
def __init__(self, infile, outfile, config, notifier, registers):
|
||||
super().__init__(stdin=infile, stdout=outfile)
|
||||
self.use_rawinput = False
|
||||
self.config = config
|
||||
self.notifier = notifier
|
||||
self.registers = registers
|
||||
self.prompt = "test8> "
|
||||
self.intro = "test8 admin interface"
|
||||
self.splitterRe = re.compile('\s+')
|
||||
self.logger = logging.getLogger('CmdInterpreter')
|
||||
|
||||
def onecmd(self, line):
|
||||
try:
|
||||
return super().onecmd(line)
|
||||
except Exception as e:
|
||||
msg = 'Caught exception in cmd "{0}": {1!s}'.format(line, e)
|
||||
self.__println(msg)
|
||||
self.logger.error(msg)
|
||||
|
||||
def __print(self, text):
|
||||
self.stdout.write(text)
|
||||
|
||||
def __println(self, text):
|
||||
self.stdout.write(text)
|
||||
self.stdout.write("\n\r")
|
||||
|
||||
|
||||
|
||||
def __listConverterNames(self):
|
||||
return [ name for name in Converters.Converters ]
|
||||
|
||||
def do_add_hr(self, arg):
|
||||
try:
|
||||
(label, unit, address, count, scanrate, readTopic, writeTopic, feedbackTopic, converter) = self.splitterRe.split(arg)
|
||||
self.__println("Label: {0}".format(label))
|
||||
self.__println("Unit: {0}".format(unit))
|
||||
self.__println("Address: {0}".format(address))
|
||||
self.__println("Count: {0}".format(count))
|
||||
self.__println("ScanRate: {0}".format(scanrate))
|
||||
self.__println("ReadTopic: {0}".format(readTopic))
|
||||
self.__println("WriteTopic: {0}".format(writeTopic))
|
||||
self.__println("FeedbackTopic: {0}".format(feedbackTopic))
|
||||
self.__println("Converter: {0}".format(converter))
|
||||
|
||||
if readTopic == 'None':
|
||||
readTopic = None
|
||||
if writeTopic == 'None':
|
||||
writeTopic = None
|
||||
if feedbackTopic == 'None':
|
||||
feedbackTopic = None
|
||||
if converter == 'None':
|
||||
converter = None
|
||||
unit = parseIntArbitraryBase(unit)
|
||||
address = parseIntArbitraryBase(address)
|
||||
count = parseIntArbitraryBase(count)
|
||||
scanrate = float(scanrate)
|
||||
r = RegisterDatapoint.HoldingRegisterDatapoint(label=label,
|
||||
unit=unit,
|
||||
address=address,
|
||||
count=count,
|
||||
scanRate=datetime.timedelta(seconds=scanrate),
|
||||
publishTopic=readTopic,
|
||||
subscribe=writeTopic,
|
||||
feedbackTopic=feedbackTopic,
|
||||
converter=converter)
|
||||
self.registers.append(r)
|
||||
except ValueError as e:
|
||||
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
|
||||
|
||||
def help_add_hr(self):
|
||||
# HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None),
|
||||
self.__println("Usage: add_hr <Label> <Unit> <Address> <Count> <ScanRate>")
|
||||
self.__println(" <ReadTopic> <WriteTopic> <FeedbackTopic>")
|
||||
self.__println(" <Converter>")
|
||||
self.__println("Adds a holding register")
|
||||
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
|
||||
self.__println("---------------------------------------------------------------------")
|
||||
self.__println("<Label> Descriptive label")
|
||||
self.__println("<Unit> Modbus address of the device")
|
||||
self.__println("<Address> Register address within the device")
|
||||
self.__println("<Count> Count of registers to be read or write in words")
|
||||
self.__println("<ScanRate> Scanrate in seconds (float), for write datapoints")
|
||||
self.__println(" set to zero (0)")
|
||||
self.__println("<ReadTopic> Topic to publish read data")
|
||||
self.__println("<WriteTopic> Topic to be subscribe to receive data to be")
|
||||
self.__println(" written")
|
||||
self.__println("<FeedbackTopic> Topic to publish feedback after a write process,")
|
||||
self.__println("<Converter> Converter for data, one of {0}".format(', '.join(self.__listConverterNames())))
|
||||
|
||||
|
||||
def do_add_coil(self, arg):
|
||||
try:
|
||||
(label, unit, address, scanrate, readTopic, writeTopic, feedbackTopic) = self.splitterRe.split(arg)
|
||||
self.__println("Label: {0}".format(label))
|
||||
self.__println("Unit: {0}".format(unit))
|
||||
self.__println("Address: {0}".format(address))
|
||||
self.__println("ScanRate: {0}".format(scanrate))
|
||||
self.__println("ReadTopic: {0}".format(readTopic))
|
||||
self.__println("WriteTopic: {0}".format(writeTopic))
|
||||
self.__println("FeedbackTopic: {0}".format(feedbackTopic))
|
||||
|
||||
if readTopic == 'None':
|
||||
readTopic = None
|
||||
if writeTopic == 'None':
|
||||
writeTopic = None
|
||||
if feedbackTopic == 'None':
|
||||
feedbackTopic = None
|
||||
unit = parseIntArbitraryBase(unit)
|
||||
address = parseIntArbitraryBase(address)
|
||||
scanrate = float(scanrate)
|
||||
r = RegisterDatapoint.CoilDatapoint(label=label,
|
||||
unit=unit,
|
||||
address=address,
|
||||
scanRate=datetime.timedelta(seconds=scanrate),
|
||||
publishTopic=readTopic,
|
||||
subscribeTopic=writeTopic,
|
||||
feedbackTopic=feedbackTopic)
|
||||
self.registers.append(r)
|
||||
except ValueError as e:
|
||||
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
|
||||
|
||||
def help_add_coil(self):
|
||||
self.__println("Usage: add_coil <Label> <Unit> <Address> <ScanRate>")
|
||||
self.__println(" <ReadTopic> <WriteTopic> <FeedbackTopic>")
|
||||
self.__println("Adds a coil")
|
||||
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
|
||||
self.__println("---------------------------------------------------------------------")
|
||||
self.__println("<Label> Descriptive label")
|
||||
self.__println("<Unit> Modbus address of the device")
|
||||
self.__println("<Address> Register address within the device")
|
||||
self.__println("<ScanRate> Scanrate in seconds (float), for write datapoints")
|
||||
self.__println(" set to zero (0)")
|
||||
self.__println("<ReadTopic> Topic to publish read data")
|
||||
self.__println("<WriteTopic> Topic to be subscribe to receive data to be")
|
||||
self.__println(" written")
|
||||
self.__println("<FeedbackTopic> Topic to publish feedback after a write process,")
|
||||
|
||||
def do_add_ir(self, arg):
|
||||
try:
|
||||
(label, unit, address, count, scanrate, updateOnly, readTopic, converter) = self.splitterRe.split(arg)
|
||||
self.__println("Label: {0}".format(label))
|
||||
self.__println("Unit: {0}".format(unit))
|
||||
self.__println("Address: {0}".format(address))
|
||||
self.__println("Count: {0}".format(count))
|
||||
self.__println("ScanRate: {0}".format(scanrate))
|
||||
self.__println("UpdateOnly: {0}".format(updateOnly))
|
||||
self.__println("ReadTopic: {0}".format(readTopic))
|
||||
self.__println("Converter: {0}".format(converter))
|
||||
|
||||
if readTopic == 'None':
|
||||
readTopic = None
|
||||
if converter == 'None':
|
||||
converter = None
|
||||
if updateOnly in ['true', 'True', 'yes', 'Yes']:
|
||||
updateOnly = True
|
||||
elif updateOnly in ['false', 'False', 'no', 'No']:
|
||||
updateOnly = False
|
||||
else:
|
||||
raise CmdInterpreterException('updateOnly must be true or false, yes or no')
|
||||
unit = parseIntArbitraryBase(unit)
|
||||
address = parseIntArbitraryBase(address)
|
||||
count = parseIntArbitraryBase(count)
|
||||
scanrate = float(scanrate)
|
||||
r = RegisterDatapoint.InputRegisterDatapoint(label=label,
|
||||
unit=unit,
|
||||
address=address,
|
||||
count=count, scanRate=datetime.timedelta(seconds=scanrate),
|
||||
updateOnly=updateOnly,
|
||||
publishTopic=readTopic,
|
||||
converter=converter)
|
||||
self.registers.append(r)
|
||||
except ValueError as e:
|
||||
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
|
||||
|
||||
def help_add_ir(self):
|
||||
self.__println("Usage: add_ir <Label> <Unit> <Address> <Count> <ScanRate>")
|
||||
self.__println(" <UpdateOnly> <ReadTopic> <Converter>")
|
||||
self.__println("Adds an input register")
|
||||
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
|
||||
self.__println("---------------------------------------------------------------------")
|
||||
self.__println("<Label> Descriptive label")
|
||||
self.__println("<Unit> Modbus address of the device")
|
||||
self.__println("<Address> Register address within the device")
|
||||
self.__println("<Count> Count of registers to be read in words")
|
||||
self.__println("<ScanRate> Scanrate in seconds (float)")
|
||||
self.__println("<UpdateOnly> Publish only when value has changed")
|
||||
self.__println("<ReadTopic> Topic to publish read data")
|
||||
self.__println("<Converter> Converter for data, one of {0}".format(', '.join(self.__listConverterNames())))
|
||||
|
||||
def do_add_di(self, arg):
|
||||
try:
|
||||
(label, unit, address, count, scanrate, updateOnly, readTopic, bitCount) = self.splitterRe.split(arg)
|
||||
self.__println("Label: {0}".format(label))
|
||||
self.__println("Unit: {0}".format(unit))
|
||||
self.__println("Address: {0}".format(address))
|
||||
self.__println("Count: {0}".format(count))
|
||||
self.__println("ScanRate: {0}".format(scanrate))
|
||||
self.__println("UpdateOnly: {0}".format(updateOnly))
|
||||
self.__println("ReadTopic: {0}".format(readTopic))
|
||||
self.__println("BitCount: {0}".format(bitCount))
|
||||
|
||||
if readTopic == 'None':
|
||||
readTopic = None
|
||||
if updateOnly in ['true', 'True', 'yes', 'Yes']:
|
||||
updateOnly = True
|
||||
elif updateOnly in ['false', 'False', 'no', 'No']:
|
||||
updateOnly = False
|
||||
else:
|
||||
raise CmdInterpreterException('updateOnly must be true or false, yes or no')
|
||||
unit = parseIntArbitraryBase(unit)
|
||||
address = parseIntArbitraryBase(address)
|
||||
count = parseIntArbitraryBase(count)
|
||||
scanrate = float(scanrate)
|
||||
bitCount = int(bitCount)
|
||||
r = RegisterDatapoint.DiscreteInputDatapoint(label=label,
|
||||
unit=unit,
|
||||
address=address,
|
||||
count=count,
|
||||
scanRate=datetime.timedelta(seconds=scanrate),
|
||||
updateOnly=updateOnly,
|
||||
publishTopic=readTopic,
|
||||
bitCount=bitCount)
|
||||
self.registers.append(r)
|
||||
except ValueError as e:
|
||||
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
|
||||
|
||||
def help_add_di(self):
|
||||
self.__println("Usage: add_di <Label> <Unit> <Address> <Count> <ScanRate>")
|
||||
self.__println(" <UpdateOnly> <ReadTopic> <bitCount>")
|
||||
self.__println("Adds a discrete input")
|
||||
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
|
||||
self.__println("---------------------------------------------------------------------")
|
||||
self.__println("<Label> Descriptive label")
|
||||
self.__println("<Unit> Modbus address of the device")
|
||||
self.__println("<Address> Register address within the device")
|
||||
self.__println("<Count> Count of registers to be read in words")
|
||||
self.__println("<ScanRate> Scanrate in seconds (float)")
|
||||
self.__println("<UpdateOnly> Publish only when value has changed")
|
||||
self.__println("<ReadTopic> Topic to publish read data")
|
||||
self.__println("<BitCount> Number of bit to be considered")
|
||||
|
||||
def do_list(self, arg):
|
||||
for i, r in enumerate(self.registers):
|
||||
self.__println("#{0}: {1!s}".format(i, r))
|
||||
|
||||
def help_list(self):
|
||||
self.__println("Usage: list")
|
||||
self.__println("-----------")
|
||||
self.__println("List the configured datapoints")
|
||||
|
||||
def do_reset(self, arg):
|
||||
for r in self.registers:
|
||||
r.errorCount = 0
|
||||
r.writeCount = 0
|
||||
r.readCount = 0
|
||||
|
||||
def help_reset(self):
|
||||
self.__println("Usage: reset")
|
||||
self.__println("-----------")
|
||||
self.__println("Resets the statistics of configured datapoints")
|
||||
|
||||
def do_stats(self, arg):
|
||||
for i, r in enumerate(self.registers):
|
||||
processCount = r.readCount + r.writeCount
|
||||
if processCount == 0:
|
||||
ratio = -1
|
||||
else:
|
||||
ratio = float(r.errorCount) / float(processCount)
|
||||
self.__println("#{0:2d}: {1:15s} ({2:2d}, {3:5d}), rc: {4:7d}, wc: {5:7d}, pc: {6:7d}, ec: {7:7d}, q: {8:1.4f}"
|
||||
.format(i, r.label, r.unit, r.address, r.readCount, r.writeCount,
|
||||
processCount, r.errorCount, ratio))
|
||||
|
||||
def help_stats(self):
|
||||
self.__println("Usage: stats")
|
||||
self.__println("-----------")
|
||||
self.__println("List the statistics of configured datapoints")
|
||||
|
||||
def do_change(self, arg):
|
||||
(idx, key, typ, value) = self.splitterRe.split(arg)
|
||||
try:
|
||||
i = int(idx)
|
||||
r = self.registers[i]
|
||||
|
||||
if typ == 'I':
|
||||
value = parseIntArbitraryBase(value)
|
||||
elif typ == 'F':
|
||||
value = float(value)
|
||||
elif typ == 'B':
|
||||
if value in ['true', 'True', 'yes', 'Yes']:
|
||||
value = True
|
||||
elif value in ['false', 'False', 'no', 'No']:
|
||||
value = False
|
||||
else:
|
||||
raise CmdInterpreterException('boolean value must be true or false, yes or no')
|
||||
elif typ == 'S':
|
||||
# string
|
||||
pass
|
||||
elif typ == 'T':
|
||||
value = datetime.timedelta(seconds=float(value))
|
||||
elif typ == 'N':
|
||||
value = None
|
||||
else:
|
||||
raise CmdInterpreterException('unknown type specifier, must be I, F, B, S or T')
|
||||
|
||||
if key not in r.__dict__:
|
||||
raise CmdInterpreterException('selected datapoint does not support key')
|
||||
|
||||
r.__dict__[key] = value
|
||||
except ValueError as e:
|
||||
self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e))
|
||||
|
||||
def help_change(self):
|
||||
self.__println("Usage: change <idx> <key> <type> <value>")
|
||||
self.__println("Changes on attribute of a datapoint")
|
||||
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
|
||||
self.__println("---------------------------------------------------------------------")
|
||||
self.__println("<idx> Index, use list command to find")
|
||||
self.__println("<key> Name of attribute")
|
||||
self.__println("<type> Type of attribute")
|
||||
self.__println(" I .. Integer")
|
||||
self.__println(" F .. Float")
|
||||
self.__println(" B .. Boolean")
|
||||
self.__println(" T .. Timedelta, give in seconds")
|
||||
self.__println(" S .. String")
|
||||
self.__println(" N .. None (Value must be given but is not")
|
||||
self.__println(" considered)")
|
||||
self.__println("<value> New value")
|
||||
|
||||
def do_del(self, arg):
|
||||
try:
|
||||
i = int(arg)
|
||||
r = self.registers[i]
|
||||
self.registers.remove(r)
|
||||
self.__println("{0!s} removed".format(r))
|
||||
except ValueError as e:
|
||||
self.__println("ERROR: {0!s}".format(e))
|
||||
|
||||
def help_del(self):
|
||||
self.__println("Usage: del <idx>")
|
||||
self.__println("Removes an item from the list of datapoints by its index, see list command.")
|
||||
self.__println("Be aware: indexes have been changed, rerun list before removing the next item.")
|
||||
self.__println("DO NOT FORGET TO SAVE AFTERWARDS!")
|
||||
|
||||
def __notify(self):
|
||||
self.notifier.notify()
|
||||
|
||||
def do_notify(self, arg):
|
||||
self.__notify()
|
||||
|
||||
def help_notify(self):
|
||||
self.__println("Notifies threads using the list of datapoints about changes in this list.")
|
||||
self.__println("Call after modifications on the list.")
|
||||
|
||||
def do_quit(self, arg):
|
||||
self.__println("Bye!")
|
||||
return True
|
||||
|
||||
def __save(self):
|
||||
RegisterDatapoint.saveRegisterList(self.registers, self.config.registerFile)
|
||||
|
||||
def do_save(self, arg):
|
||||
self.__save()
|
||||
|
||||
def help_save(self):
|
||||
self.__println("Usage: save")
|
||||
self.__println("Saves a modified register list into the register file.")
|
||||
|
||||
def do_load(self, arg):
|
||||
try:
|
||||
registers = RegisterDatapoint.loadRegisterList(self.config.registerFile)
|
||||
self.registers = registers
|
||||
except Exception as e:
|
||||
self.__println("Unable to load register list: {0!s}".format(e))
|
||||
|
||||
def help_load(self):
|
||||
self.__println("Usage: load")
|
||||
self.__println("Reload the register file, overwrite all unsaved changes.")
|
||||
|
||||
|
||||
|
||||
class CmdHandle(socketserver.StreamRequestHandler):
|
||||
def handle(self):
|
||||
logger = logging.getLogger('CmdHandle')
|
||||
cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile), self.server.userData.config,
|
||||
self.server.userData.notifier, self.server.userData.registers)
|
||||
try:
|
||||
cmd.cmdloop()
|
||||
logger.info("Cmd handle terminated")
|
||||
except ConnectionAbortedError as e:
|
||||
logger.info("Cmd handle externally interrupted")
|
||||
|
||||
class MyThreadingTCPServer(socketserver.ThreadingTCPServer):
|
||||
def __init__(self, host, handler, userData):
|
||||
super().__init__(host, handler)
|
||||
self.userData = userData
|
||||
|
||||
class MyCmdUserData(object):
|
||||
def __init__(self, config, notifier, registers):
|
||||
self.config = config
|
||||
self.notifier = notifier
|
||||
self.registers = registers
|
||||
|
||||
class CmdServer(threading.Thread):
|
||||
def __init__(self, config, notifier, registers):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
self.server = MyThreadingTCPServer((config.cmdAddress, config.cmdPort), CmdHandle, MyCmdUserData(config, notifier, registers))
|
||||
# self.daemon = True
|
||||
|
||||
def run(self):
|
||||
self.server.serve_forever()
|
||||
|
||||
|
62
src/CommunicationProcessor.py
Normal file
62
src/CommunicationProcessor.py
Normal file
@ -0,0 +1,62 @@
|
||||
import threading
|
||||
import datetime
|
||||
# import RS485Ext
|
||||
import RegisterDatapoint
|
||||
from pymodbus.client.sync import ModbusSerialClient
|
||||
# import wiringpi
|
||||
import Pins
|
||||
import MyRS485
|
||||
import time
|
||||
import logging
|
||||
|
||||
|
||||
ERROR_PIN = 29
|
||||
|
||||
class CommunicationProcessor(threading.Thread):
|
||||
def __init__(self, config, queue, pubQueue):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
self.queue = queue
|
||||
self.pubQueue = pubQueue
|
||||
# wiringpi.wiringPiSetup()
|
||||
# wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT)
|
||||
# self.daemon = True
|
||||
if self.config.modbusDebug:
|
||||
logging.getLogger('pymodbus').setLevel(logging.DEBUG)
|
||||
else:
|
||||
logging.getLogger('pymodbus').setLevel(logging.ERROR)
|
||||
self.logger = logging.getLogger('CommunicationProcessor')
|
||||
|
||||
def __getSerial(self):
|
||||
# return RS485Ext.RS485Ext(port=self.config.serialPort, baudrate=self.config.serialBaudRate, stopbits=1,
|
||||
# timeout=1)
|
||||
return MyRS485.MyRS485(port=self.config.serialPort, baudrate=self.config.serialBaudRate, stopbits=1,
|
||||
timeout=1)
|
||||
|
||||
|
||||
def run(self):
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = self.__getSerial()
|
||||
client.connect()
|
||||
|
||||
while True:
|
||||
r = self.queue.get()
|
||||
try:
|
||||
# wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW)
|
||||
Pins.pinsWrite('ERROR', False)
|
||||
self.logger.debug("Dequeued: {0!s}".format(r))
|
||||
r.enqueued = False
|
||||
r.process(client, self.pubQueue)
|
||||
except RegisterDatapoint.DatapointException as e:
|
||||
# wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH)
|
||||
Pins.pinsWrite('ERROR', True)
|
||||
self.logger.error("ERROR when processing '{0}': {1!s}".format(r.label, e))
|
||||
if client.socket is None:
|
||||
self.logger.error("renew socket")
|
||||
client.socket = self.__getSerial()
|
||||
finally:
|
||||
time.sleep(self.config.interCommDelay)
|
||||
|
||||
|
||||
|
||||
|
17
src/Config.py
Normal file
17
src/Config.py
Normal file
@ -0,0 +1,17 @@
|
||||
|
||||
class Config(object):
|
||||
def __init__(self):
|
||||
self.logFile = '/tmp/mbm.log'
|
||||
self.modbusDebug = False
|
||||
self.mqttBrokerHost = '172.16.2.16'
|
||||
self.mqttBrokerPort = 1883
|
||||
self.mqttLogin = ''
|
||||
self.mqttPassword = ''
|
||||
self.cmdAddress = '127.0.0.1'
|
||||
self.cmdPort = 9999
|
||||
self.registerFile = 'registers.json'
|
||||
self.serialPort = '/dev/ttyAMA0'
|
||||
self.serialBaudRate = 9600
|
||||
self.interCommDelay = 0.025
|
||||
self.heartbeatTopic = 'IoT/Heartbeat/Modbus2'
|
||||
self.heartbeatPeriod = 10.0
|
18
src/Converters.py
Normal file
18
src/Converters.py
Normal file
@ -0,0 +1,18 @@
|
||||
# in: from Modbus to MQTT, input is a list of 16bit integers, output shall be the desired format
|
||||
# to be sent in the MQTT message
|
||||
# out: from MQTT to Modbus, input is the format received from MQTT, output shall be a list of
|
||||
# 16bit integers to be written to the Modbus slave
|
||||
|
||||
from struct import pack, unpack
|
||||
|
||||
|
||||
Converters = {
|
||||
"dht20TOFloat": {
|
||||
"in": lambda x : float(x[0]) / 10.0,
|
||||
"out": None
|
||||
},
|
||||
"uint32": {
|
||||
"in": lambda x : unpack('L', pack('HH', *x))[0],
|
||||
"out": lambda x : unpack('HH', pack('L', int(x)))
|
||||
}
|
||||
}
|
20
src/Heartbeat.py
Normal file
20
src/Heartbeat.py
Normal file
@ -0,0 +1,20 @@
|
||||
import threading
|
||||
import MqttProcessor
|
||||
import logging
|
||||
import time
|
||||
|
||||
class Heartbeat(threading.Thread):
|
||||
def __init__(self, config, pubQueue):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
self.pubQueue = pubQueue
|
||||
# self.daemon = True
|
||||
self.logger = logging.getLogger('Heartbeat')
|
||||
|
||||
def run(self):
|
||||
cnt = 0
|
||||
while True:
|
||||
cnt += 1
|
||||
pubItem = MqttProcessor.PublishItem(self.config.heartbeatTopic, str(cnt))
|
||||
self.pubQueue.put(pubItem)
|
||||
time.sleep(self.config.heartbeatPeriod)
|
94
src/MqttProcessor.py
Normal file
94
src/MqttProcessor.py
Normal file
@ -0,0 +1,94 @@
|
||||
import threading
|
||||
import paho.mqtt.client as mqtt
|
||||
from NotificationForwarder import AbstractNotificationReceiver
|
||||
import logging
|
||||
import Pins
|
||||
|
||||
class PublishItem(object):
|
||||
def __init__(self, topic, payload):
|
||||
self.topic = topic
|
||||
self.payload = payload
|
||||
|
||||
def __str__(self):
|
||||
return 'Topic: {0}, Payload: {1}'.format(self.topic, self.payload)
|
||||
|
||||
def mqttOnConnectCallback(client, userdata, flags, rc):
|
||||
userdata.onConnect()
|
||||
|
||||
def mqttOnMessageCallback(client, userdata, message):
|
||||
userdata.onMessage(message.topic, message.payload)
|
||||
|
||||
def mqttOnDisconnectCallback(client, userdata, rc):
|
||||
userdata.onDisconnect(rc)
|
||||
|
||||
class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
||||
def __init__(self, config, registers, queue, pubQueue):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
self.registers = registers
|
||||
self.queue = queue
|
||||
self.pubQueue = pubQueue
|
||||
self.client = mqtt.Client(userdata=self)
|
||||
self.subscriptions = []
|
||||
self.topicRegisterMap ={}
|
||||
# self.daemon = True
|
||||
self.logger = logging.getLogger('MqttProcessor')
|
||||
|
||||
def __processUpdatedRegisters(self, force=False):
|
||||
self.logger.debug("MqttProcessor.__updateSubscriptions")
|
||||
|
||||
subscribeTopics = [ r.subscribeTopic for r in self.registers if hasattr(r,'subscribeTopic') and r.subscribeTopic]
|
||||
self.logger.debug("Topics: {0!s}".format(subscribeTopics))
|
||||
|
||||
for subscribeTopic in subscribeTopics:
|
||||
if (subscribeTopic not in self.subscriptions) or force:
|
||||
self.logger.debug("Subscribe to {0}".format(subscribeTopic))
|
||||
self.client.subscribe(subscribeTopic)
|
||||
self.subscriptions.append(subscribeTopic)
|
||||
|
||||
for subscription in self.subscriptions:
|
||||
if (subscription not in subscribeTopics) and not force:
|
||||
self.logger.debug("Unsubscribe from {0}".format(subscription))
|
||||
self.client.unsubscribe(subscription)
|
||||
self.subscriptions.remove(subscription)
|
||||
|
||||
self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if hasattr(r,'subscribeTopic') and r.subscribeTopic }
|
||||
|
||||
def receiveNotification(self, arg):
|
||||
self.logger.info("MqttProcessor:registersChanged")
|
||||
self.__processUpdatedRegisters()
|
||||
|
||||
def run(self):
|
||||
self.client.on_message = mqttOnMessageCallback
|
||||
self.client.on_connect = mqttOnConnectCallback
|
||||
self.client.on_disconnect = mqttOnDisconnectCallback
|
||||
if self.config.mqttLogin and self.config.mqttPassword:
|
||||
self.client.username_pw_set(self.config.mqttLogin, self.config.mqttPassword)
|
||||
self.client.connect(self.config.mqttBrokerHost, self.config.mqttBrokerPort)
|
||||
self.client.loop_start()
|
||||
|
||||
while True:
|
||||
pubItem = self.pubQueue.get()
|
||||
if isinstance(pubItem, PublishItem):
|
||||
self.logger.debug('Publishing {0!s}'.format(pubItem))
|
||||
self.client.publish(pubItem.topic, pubItem.payload)
|
||||
Pins.pinsWrite('MSG', False)
|
||||
else:
|
||||
self.logger.error("Invalid object in publish queue")
|
||||
|
||||
|
||||
def onConnect(self):
|
||||
# print("MqttProcessor.onConnect")
|
||||
self.__processUpdatedRegisters(force=True)
|
||||
|
||||
def onDisconnect(self, rc):
|
||||
self.logger.error("Disconnected from MQTT broker: {0}".format(rc))
|
||||
|
||||
def onMessage(self, topic, payload):
|
||||
Pins.pinsWrite('MSG', True)
|
||||
# print("MqttProcessor.onMessage")
|
||||
r = self.topicRegisterMap[topic]
|
||||
self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r))
|
||||
r.onMessage(payload)
|
||||
self.queue.put(r)
|
||||
|
22
src/MyPriorityQueue.py
Normal file
22
src/MyPriorityQueue.py
Normal file
@ -0,0 +1,22 @@
|
||||
import queue
|
||||
|
||||
|
||||
class MyPriorityQueueItem(object):
|
||||
def __init__(self, itemWithPriority):
|
||||
self.itemWithPriority = itemWithPriority
|
||||
|
||||
def __lt__(self, other): return self.itemWithPriority.priority < other.itemWithPriority.priority
|
||||
def __le__(self, other): return self.itemWithPriority.priority <= other.itemWithPriority.priority
|
||||
def __eq__(self, other): return self.itemWithPriority.priority == other.itemWithPriority.priority
|
||||
def __ne__(self, other): return self.itemWithPriority.priority != other.itemWithPriority.priority
|
||||
def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority
|
||||
def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority
|
||||
|
||||
class MyPriorityQueue(queue.PriorityQueue):
|
||||
def _put(self, itemWithPriority):
|
||||
i = MyPriorityQueueItem(itemWithPriority)
|
||||
super()._put(i)
|
||||
|
||||
def _get(self):
|
||||
i = super()._get()
|
||||
return i.itemWithPriority
|
27
src/MyRS485.py
Normal file
27
src/MyRS485.py
Normal file
@ -0,0 +1,27 @@
|
||||
import serial
|
||||
# import wiringpi
|
||||
import Pins
|
||||
import array
|
||||
import fcntl
|
||||
import termios
|
||||
|
||||
DE_PIN = 0
|
||||
|
||||
class MyRS485(serial.Serial):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
# wiringpi.wiringPiSetup()
|
||||
# wiringpi.pinMode(DE_PIN, wiringpi.OUTPUT)
|
||||
self.buf = array.array('h', [0])
|
||||
|
||||
def write(self, b):
|
||||
# wiringpi.digitalWrite(DE_PIN, wiringpi.HIGH)
|
||||
Pins.pinsWrite('DE', True)
|
||||
super().write(b)
|
||||
while True:
|
||||
fcntl.ioctl(self.fileno(), termios.TIOCSERGETLSR, self.buf, 1)
|
||||
if self.buf[0] & termios.TIOCSER_TEMT:
|
||||
break
|
||||
# wiringpi.digitalWrite(DE_PIN, wiringpi.LOW)
|
||||
Pins.pinsWrite('DE', False)
|
||||
|
15
src/NotificationForwarder.py
Normal file
15
src/NotificationForwarder.py
Normal file
@ -0,0 +1,15 @@
|
||||
|
||||
class AbstractNotificationReceiver(object):
|
||||
def receiveNotification(self, arg):
|
||||
raise NotImplementedError
|
||||
|
||||
class NotificationForwarder(object):
|
||||
def __init__(self):
|
||||
self.receivers = []
|
||||
|
||||
def register(self, receiver):
|
||||
self.receivers.append(receiver)
|
||||
|
||||
def notify(self, arg=None):
|
||||
for r in self.receivers:
|
||||
r.receiveNotification(arg)
|
24
src/Pins.py
Normal file
24
src/Pins.py
Normal file
@ -0,0 +1,24 @@
|
||||
import wiringpi
|
||||
|
||||
|
||||
PINS = {
|
||||
'DE': 0,
|
||||
'ERROR': 29,
|
||||
'MSG': 28
|
||||
}
|
||||
|
||||
|
||||
|
||||
def pinsInit():
|
||||
wiringpi.wiringPiSetup()
|
||||
for pin in PINS.values():
|
||||
wiringpi.pinMode(pin, wiringpi.OUTPUT)
|
||||
|
||||
|
||||
def pinsWrite(pinName, v):
|
||||
if v:
|
||||
pinState = wiringpi.HIGH
|
||||
else:
|
||||
pinState = wiringpi.LOW
|
||||
wiringpi.digitalWrite(PINS[pinName], pinState)
|
||||
|
292
src/RegisterDatapoint.py
Normal file
292
src/RegisterDatapoint.py
Normal file
@ -0,0 +1,292 @@
|
||||
import datetime
|
||||
from pymodbus.pdu import ExceptionResponse
|
||||
from pymodbus.exceptions import ModbusIOException
|
||||
import MqttProcessor
|
||||
import logging
|
||||
import json
|
||||
import Converters
|
||||
|
||||
class DatapointException(Exception): pass
|
||||
|
||||
class AbstractModbusDatapoint(object):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, converter=None):
|
||||
self.argList = ['label', 'unit', 'address', 'count', 'scanRate', 'converter']
|
||||
self.label = label
|
||||
self.unit = unit
|
||||
self.address = address
|
||||
self.count = count
|
||||
self.converter = converter
|
||||
if type(scanRate) == float:
|
||||
self.scanRate = datetime.timedelta(seconds=scanRate)
|
||||
else:
|
||||
self.scanRate = scanRate
|
||||
self.type = 'abstract data point'
|
||||
self.enqueued = False
|
||||
self.lastContact = None
|
||||
self.errorCount = 0
|
||||
self.readCount = 0
|
||||
self.writeCount = 0
|
||||
if self.scanRate:
|
||||
self.priority = 1
|
||||
else:
|
||||
self.priority = 0
|
||||
|
||||
def __str__(self):
|
||||
return ("{0}, {1}: unit: {2}, address: {3}, count: {4}, scanRate: {5}, "
|
||||
"enqueued: {6}, lastContact: {7}, errorCount: {8}, readCount: {9}, "
|
||||
"writeCount: {10}, converter: {11}"
|
||||
.format(self.type, self.label, self.unit, self.address, self.count,
|
||||
self.scanRate, self.enqueued, self.lastContact,
|
||||
self.errorCount, self.readCount, self.writeCount, self.converter))
|
||||
|
||||
def jsonify(self):
|
||||
return {'type':self.__class__.__name__,
|
||||
'args': { k: getattr(self, k) for k in self.argList }
|
||||
}
|
||||
|
||||
def process(self, client):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
|
||||
class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None,
|
||||
publishTopic=None, subscribeTopic=None, feedbackTopic=None, converter=None):
|
||||
super().__init__(label, unit, address, count, scanRate, converter)
|
||||
self.argList = self.argList + ['publishTopic', 'subscribeTopic', 'feedbackTopic']
|
||||
self.publishTopic = publishTopic
|
||||
self.subscribeTopic = subscribeTopic
|
||||
self.feedbackTopic = feedbackTopic
|
||||
self.writeRequestValue = None
|
||||
self.type = 'holding register'
|
||||
|
||||
def __str__(self):
|
||||
return ("[{0!s}, publishTopic: {1}, subscribeTopic: {2}, feedbackTopic: {3}, "
|
||||
"writeRequestValue: {4!s}"
|
||||
.format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic,
|
||||
self.writeRequestValue))
|
||||
|
||||
def process(self, client, pubQueue):
|
||||
logger = logging.getLogger('HoldingRegisterDatapoint')
|
||||
if self.writeRequestValue:
|
||||
# perform write operation
|
||||
logger.debug("Holding register, perform write operation")
|
||||
self.writeCount += 1
|
||||
values = None
|
||||
logger.debug("{0}: raw: {1!s}".format(self.label, self.writeRequestValue))
|
||||
try:
|
||||
if self.converter and Converters.Converters[self.converter]['out']:
|
||||
values = Converters.Converters[self.converter]['out'](self.writeRequestValue)
|
||||
logger.debug("{0}: converted: {1!s}".format(self.label, values))
|
||||
else:
|
||||
values = [int(self.writeRequestValue)]
|
||||
except Exception as e:
|
||||
raise DatapointException("Exception caught when trying to converter modbus data: {0!s}".format(e))
|
||||
result = client.write_registers(address=self.address,
|
||||
unit=self.unit,
|
||||
values=values)
|
||||
logger.debug("Write result: {0!s}".format(result))
|
||||
self.writeRequestValue = None
|
||||
else:
|
||||
# perform read operation
|
||||
logger.debug("Holding register, perform read operation")
|
||||
self.readCount += 1
|
||||
result = client.read_holding_registers(address=self.address,
|
||||
count=self.count,
|
||||
unit=self.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
self.errorCount += 1
|
||||
raise DatapointException(result)
|
||||
logger.debug("{0}: {1!s}".format(self.label, result.registers))
|
||||
value = None
|
||||
try:
|
||||
if self.converter and Converters.Converters[self.converter]['in']:
|
||||
value = Converters.Converters[self.converter]['in'](result.registers)
|
||||
logger.debug("{0}: converted: {1!s}".format(self.label, value))
|
||||
else:
|
||||
value = result.registers
|
||||
except Exception as e:
|
||||
raise DatapointException("Exception caught when trying to converter modbus data: {0!s}".format(e))
|
||||
if self.publishTopic:
|
||||
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(value)))
|
||||
self.lastContact = datetime.datetime.now()
|
||||
|
||||
def onMessage(self, value):
|
||||
self.writeRequestValue = value
|
||||
|
||||
|
||||
class CoilDatapoint(AbstractModbusDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, scanRate=None, publishTopic=None, subscribeTopic=None,
|
||||
feedbackTopic=None):
|
||||
super().__init__(label, unit, address, 1, scanRate, None)
|
||||
self.argList = ['label', 'unit','address','scanRate','publishTopic', 'subscribeTopic', 'feedbackTopic']
|
||||
self.publishTopic = publishTopic
|
||||
self.subscribeTopic = subscribeTopic
|
||||
self.feedbackTopic = feedbackTopic
|
||||
self.writeRequestValue = None
|
||||
self.type = 'coil'
|
||||
|
||||
def __str__(self):
|
||||
return ("{0}, {1}: unit: {2}, address: {3}, scanRate: {4}, "
|
||||
"enqueued: {5}, lastContact: {6}, errorCount: {7}, readCount: {8}, "
|
||||
"writeCount: {9}, publishTopic: {10}, subscribeTopic: {11}, feedbackTopic: {12}"
|
||||
.format(self.type, self.label, self.unit, self.address,
|
||||
self.scanRate, self.enqueued, self.lastContact,
|
||||
self.errorCount, self.readCount, self.writeCount,
|
||||
self.publishTopic, self.subscribeTopic, self.feedbackTopic))
|
||||
|
||||
def onMessage(self, value):
|
||||
self.writeRequestValue = value.decode()
|
||||
|
||||
def process(self, client, pubQueue):
|
||||
logger = logging.getLogger('CoilDatapoint')
|
||||
if self.writeRequestValue:
|
||||
# perform write operation
|
||||
logger.debug("Coil, perform write operation")
|
||||
self.writeCount += 1
|
||||
logger.debug("{0}: raw: {1!s}".format(self.label, self.writeRequestValue))
|
||||
value=None
|
||||
if self.writeRequestValue in ['true', 'True', 'yes', 'Yes', 'On', 'on']:
|
||||
value = True
|
||||
elif self.writeRequestValue in ['false', 'False', 'no', 'No', 'Off', 'off']:
|
||||
value = False
|
||||
else:
|
||||
self.writeRequestValue = None
|
||||
raise DatapointException('illegal value {0!s} for coil write'.format(self.writeRequestValue))
|
||||
result = client.write_coil(address=self.address,
|
||||
unit=self.unit,
|
||||
value=value)
|
||||
logger.debug("Write result: {0!s}".format(result))
|
||||
self.writeRequestValue = None
|
||||
else:
|
||||
# perform read operation
|
||||
logger.debug("Coil, perform read operation")
|
||||
self.readCount += 1
|
||||
result = client.read_coils(address=self.address,
|
||||
unit=self.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
self.errorCount += 1
|
||||
raise DatapointException(result)
|
||||
logger.debug("{0}: {1!s}".format(self.label, result.getBit(0)))
|
||||
value = result.getBit(0)
|
||||
if self.publishTopic:
|
||||
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(value)))
|
||||
self.lastContact = datetime.datetime.now()
|
||||
|
||||
|
||||
class ReadOnlyDatapoint(AbstractModbusDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, publishTopic=None, converter=None):
|
||||
super().__init__(label, unit, address, count, scanRate, converter)
|
||||
self.argList = self.argList + ['updateOnly', 'publishTopic']
|
||||
self.updateOnly = updateOnly
|
||||
self.lastValue = None
|
||||
self.publishTopic = publishTopic
|
||||
|
||||
def __str__(self):
|
||||
return ("[{0!s}, updateOnly: {1}, publishTopic: {2}, lastValue: {3!s}"
|
||||
.format(super().__str__(), self.updateOnly, self.publishTopic,
|
||||
self.lastValue))
|
||||
|
||||
|
||||
|
||||
class InputRegisterDatapoint(ReadOnlyDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
|
||||
publishTopic=None, converter=None):
|
||||
super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic, converter)
|
||||
self.type = 'input register'
|
||||
|
||||
def process(self, client, pubQueue):
|
||||
logger = logging.getLogger('InputRegisterDatapoint')
|
||||
# perform read operation
|
||||
logger.debug("Input register, perform read operation")
|
||||
self.readCount += 1
|
||||
result = client.read_input_registers(address=self.address,
|
||||
count=self.count,
|
||||
unit=self.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
self.errorCount += 1
|
||||
raise DatapointException(result)
|
||||
if not self.updateOnly or (result.registers != self.lastValue):
|
||||
self.lastValue = result.registers
|
||||
logger.debug("{0}: raw: {1!s}".format(self.label, result.registers))
|
||||
value = None
|
||||
try:
|
||||
if self.converter and Converters.Converters[self.converter]['in']:
|
||||
value = Converters.Converters[self.converter]['in'](result.registers)
|
||||
logger.debug("{0}: converted: {1!s}".format(self.label, value))
|
||||
else:
|
||||
value = result.registers
|
||||
except Exception as e:
|
||||
raise DatapointException("Exception caught when trying to converter modbus data: {0!s}".format(e))
|
||||
if self.publishTopic:
|
||||
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(value)))
|
||||
self.lastContact = datetime.datetime.now()
|
||||
|
||||
|
||||
class DiscreteInputDatapoint(ReadOnlyDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
|
||||
publishTopic=None, converter=None, bitCount=8):
|
||||
super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic, converter)
|
||||
self.argList = self.argList + ['bitCount']
|
||||
self.type = 'discrete input'
|
||||
self.bitCount = bitCount
|
||||
self.lastValues = [None] * self.bitCount
|
||||
|
||||
def __str__(self):
|
||||
return ("[{0!s}, bitCount: {1}"
|
||||
.format(super().__str__(), self.bitCount))
|
||||
|
||||
def process(self, client, pubQueue):
|
||||
logger = logging.getLogger('DiscreteInputDatapoint')
|
||||
# perform read operation
|
||||
logger.debug("Discrete input, perform read operation")
|
||||
self.readCount += 1
|
||||
result = client.read_discrete_inputs(address=self.address,
|
||||
count=self.count,
|
||||
unit=self.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
self.errorCount += 1
|
||||
raise DatapointException(result)
|
||||
logger.debug("{0}: raw: {1!s}".format(self.label, result.bits))
|
||||
for i in range(self.bitCount):
|
||||
if not self.updateOnly or (result.getBit(i) != self.lastValues[i]):
|
||||
self.lastValues[i] = result.getBit(i)
|
||||
logger.debug("{0}, {1}: changed: {2!s}".format(self.label, i, result.getBit(i)))
|
||||
if self.publishTopic:
|
||||
pubQueue.put(MqttProcessor.PublishItem("{0}/{1}".format(self.publishTopic, i), str(result.getBit(i))))
|
||||
self.lastContact = datetime.datetime.now()
|
||||
|
||||
|
||||
|
||||
|
||||
class JsonifyEncoder(json.JSONEncoder):
|
||||
def default(self, o):
|
||||
res = None
|
||||
try:
|
||||
res = o.jsonify()
|
||||
except (TypeError, AttributeError):
|
||||
if type(o) == datetime.timedelta:
|
||||
res = o.total_seconds()
|
||||
else:
|
||||
res = super().default(o)
|
||||
return res
|
||||
|
||||
def datapointObjectHook(j):
|
||||
if type(j) == dict and 'type' in j and 'args' in j:
|
||||
klass = eval(j['type'])
|
||||
o = klass(**j['args'])
|
||||
return o
|
||||
else:
|
||||
return j
|
||||
|
||||
def saveRegisterList(registerList, registerListFile):
|
||||
js = json.dumps(registerList, cls=JsonifyEncoder, sort_keys=True, indent=4)
|
||||
with open(registerListFile, 'w') as f:
|
||||
f.write(js)
|
||||
|
||||
def loadRegisterList(registerListFile):
|
||||
with open(registerListFile, 'r') as f:
|
||||
js = f.read()
|
||||
registerList = json.loads(js, object_hook=datapointObjectHook)
|
||||
return registerList
|
||||
|
36
src/ScanRateConsideringQueueFeeder.py
Normal file
36
src/ScanRateConsideringQueueFeeder.py
Normal file
@ -0,0 +1,36 @@
|
||||
import threading
|
||||
import datetime
|
||||
from NotificationForwarder import AbstractNotificationReceiver
|
||||
import logging
|
||||
|
||||
class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver):
|
||||
def __init__(self, config, registers, queue):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
self.registers = registers
|
||||
self.queue = queue
|
||||
self.delayEvent = threading.Event()
|
||||
# self.daemon = True
|
||||
self.logger = logging.getLogger('ScanRateConsideringQueueFeeder')
|
||||
|
||||
def getMinimalScanrate(self):
|
||||
return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])
|
||||
|
||||
def receiveNotification(self, arg):
|
||||
self.logger.info("ScanRateConsideringQueueFeeder:registersChanged")
|
||||
self.delay = self.getMinimalScanrate()
|
||||
|
||||
def run(self):
|
||||
self.delay = self.getMinimalScanrate()
|
||||
while True:
|
||||
registersToBeHandled = [
|
||||
r for r in self.registers if ((not r.enqueued) and
|
||||
(r.scanRate) and
|
||||
((not r.lastContact) or
|
||||
(r.lastContact + r.scanRate < datetime.datetime.now())))
|
||||
]
|
||||
registersToBeHandled.sort(key=lambda x : x.scanRate)
|
||||
for r in registersToBeHandled:
|
||||
self.queue.put(r)
|
||||
r.enqueued = True
|
||||
self.delayEvent.wait(self.delay)
|
15
src/initialRegisterFile.py
Normal file
15
src/initialRegisterFile.py
Normal file
@ -0,0 +1,15 @@
|
||||
import datetime
|
||||
import RegisterDatapoint
|
||||
import pickle
|
||||
|
||||
|
||||
datapoints = [
|
||||
RegisterDatapoint.InputRegisterDatapoint('Temperature', 5, 0x0001, 1, datetime.timedelta(seconds=1.0), False, 'Pub/Temperature'),
|
||||
RegisterDatapoint.InputRegisterDatapoint('Humidity', 5, 0x0002, 1, datetime.timedelta(seconds=1.0), True, 'Pub/Humidity'),
|
||||
RegisterDatapoint.DiscreteInputDatapoint('Switches', 4, 0x0000, 1, datetime.timedelta(seconds=1.0), True, 'Pub/Switches'),
|
||||
]
|
||||
|
||||
|
||||
with open('registers.pkl', 'wb') as f:
|
||||
pickle.dump(datapoints, f)
|
||||
|
6
src/loadRegisterFile.py
Normal file
6
src/loadRegisterFile.py
Normal file
@ -0,0 +1,6 @@
|
||||
import RegisterDatapoint
|
||||
|
||||
registers = RegisterDatapoint.loadRegisterList('registers.json')
|
||||
|
||||
for r in registers:
|
||||
print("{0!s}".format(r))
|
63
src/master.py
Normal file
63
src/master.py
Normal file
@ -0,0 +1,63 @@
|
||||
import CmdServer
|
||||
import MqttProcessor
|
||||
import CommunicationProcessor
|
||||
import MyPriorityQueue
|
||||
from queue import Queue
|
||||
import NotificationForwarder
|
||||
import Config
|
||||
import ScanRateConsideringQueueFeeder
|
||||
import datetime
|
||||
import RegisterDatapoint
|
||||
import pickle
|
||||
import logging
|
||||
import Pins
|
||||
import Heartbeat
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
config = Config.Config()
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.DEBUG)
|
||||
fh = logging.FileHandler(config.logFile)
|
||||
fh.setLevel(logging.DEBUG)
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(logging.ERROR)
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
fh.setFormatter(formatter)
|
||||
ch.setFormatter(formatter)
|
||||
logger.addHandler(fh)
|
||||
logger.addHandler(ch)
|
||||
|
||||
Pins.pinsInit()
|
||||
|
||||
queue = MyPriorityQueue.MyPriorityQueue()
|
||||
pubQueue = Queue()
|
||||
nf = NotificationForwarder.NotificationForwarder()
|
||||
logger.debug('infrastructure prepared')
|
||||
|
||||
|
||||
datapoints = RegisterDatapoint.loadRegisterList(config.registerFile)
|
||||
logger.debug('datapoints read')
|
||||
|
||||
cp = CommunicationProcessor.CommunicationProcessor(config, queue, pubQueue)
|
||||
cp.start()
|
||||
logger.debug('CommunicationProcessor started')
|
||||
|
||||
mp = MqttProcessor.MqttProcessor(config, datapoints, queue, pubQueue)
|
||||
nf.register(mp)
|
||||
mp.start()
|
||||
logger.debug('MqttProcessor started')
|
||||
|
||||
cs = CmdServer.CmdServer(config, nf, datapoints)
|
||||
cs.start()
|
||||
logger.debug('CmdServer started')
|
||||
|
||||
hb = Heartbeat.Heartbeat(config, pubQueue)
|
||||
hb.start()
|
||||
logger.debug('Heartbeat started')
|
||||
|
||||
qf = ScanRateConsideringQueueFeeder.ScanRateConsideringQueueFeeder(config, datapoints, queue)
|
||||
nf.register(qf)
|
||||
qf.start()
|
||||
logger.debug('ScanRateConsideringQueueFeeder started')
|
122
src/registers.json
Normal file
122
src/registers.json
Normal file
@ -0,0 +1,122 @@
|
||||
[
|
||||
{
|
||||
"args": {
|
||||
"address": 1,
|
||||
"converter": "dht20TOFloat",
|
||||
"count": 1,
|
||||
"label": "Temperature",
|
||||
"publishTopic": "IoT/Measurment/Modbus2/Laundry/Temperature",
|
||||
"scanRate": 60.0,
|
||||
"unit": 5,
|
||||
"updateOnly": false
|
||||
},
|
||||
"type": "InputRegisterDatapoint"
|
||||
},
|
||||
{
|
||||
"args": {
|
||||
"address": 2,
|
||||
"converter": "dht20TOFloat",
|
||||
"count": 1,
|
||||
"label": "Humidity",
|
||||
"publishTopic": "IoT/Measurment/Modbus2/Laundry/Humidity",
|
||||
"scanRate": 60.0,
|
||||
"unit": 5,
|
||||
"updateOnly": false
|
||||
},
|
||||
"type": "InputRegisterDatapoint"
|
||||
},
|
||||
{
|
||||
"args": {
|
||||
"address": 0,
|
||||
"bitCount": 8,
|
||||
"converter": null,
|
||||
"count": 1,
|
||||
"label": "Switches",
|
||||
"publishTopic": "Pub/Switches",
|
||||
"scanRate": 0.5,
|
||||
"unit": 4,
|
||||
"updateOnly": false
|
||||
},
|
||||
"type": "DiscreteInputDatapoint"
|
||||
},
|
||||
{
|
||||
"args": {
|
||||
"address": 40010,
|
||||
"converter": "uint32",
|
||||
"count": 2,
|
||||
"feedbackTopic": "FB/Counter1",
|
||||
"label": "Counter1",
|
||||
"publishTopic": "Pub/Counter1",
|
||||
"scanRate": 1.0,
|
||||
"subscribeTopic": "Sub/Counter1",
|
||||
"unit": 4
|
||||
},
|
||||
"type": "HoldingRegisterDatapoint"
|
||||
},
|
||||
{
|
||||
"args": {
|
||||
"address": 40012,
|
||||
"converter": "uint32",
|
||||
"count": 2,
|
||||
"feedbackTopic": "FB/Counter2",
|
||||
"label": "Counter2",
|
||||
"publishTopic": "Pub/Counter2",
|
||||
"scanRate": null,
|
||||
"subscribeTopic": "Pub/Counter2",
|
||||
"unit": 4
|
||||
},
|
||||
"type": "HoldingRegisterDatapoint"
|
||||
},
|
||||
{
|
||||
"args": {
|
||||
"address": 40014,
|
||||
"converter": "uint32",
|
||||
"count": 2,
|
||||
"feedbackTopic": "FB/Counter3",
|
||||
"label": "Counter3",
|
||||
"publishTopic": "Pub/Counter3",
|
||||
"scanRate": null,
|
||||
"subscribeTopic": "FB/Counter3",
|
||||
"unit": 4
|
||||
},
|
||||
"type": "HoldingRegisterDatapoint"
|
||||
},
|
||||
{
|
||||
"args": {
|
||||
"address": 40016,
|
||||
"converter": "uint32",
|
||||
"count": 2,
|
||||
"feedbackTopic": "FB/Counter4",
|
||||
"label": "Counter4",
|
||||
"publishTopic": "Pub/Counter4",
|
||||
"scanRate": 1.0,
|
||||
"subscribeTopic": "Sub/Counter4",
|
||||
"unit": 4
|
||||
},
|
||||
"type": "HoldingRegisterDatapoint"
|
||||
},
|
||||
{
|
||||
"args": {
|
||||
"address": 0,
|
||||
"feedbackTopic": "FB/Coil1",
|
||||
"label": "Coil1",
|
||||
"publishTopic": "Pub/Coil1",
|
||||
"scanRate": 1.0,
|
||||
"subscribeTopic": "Sub/Coil1",
|
||||
"unit": 4
|
||||
},
|
||||
"type": "CoilDatapoint"
|
||||
},
|
||||
{
|
||||
"args": {
|
||||
"address": 1,
|
||||
"feedbackTopic": "FB/Coil2",
|
||||
"label": "Coil2",
|
||||
"publishTopic": "Pub/Coil2",
|
||||
"scanRate": 1.0,
|
||||
"subscribeTopic": "Sub/Coil2",
|
||||
"unit": 4
|
||||
},
|
||||
"type": "CoilDatapoint"
|
||||
}
|
||||
]
|
24
src/updateRegisterFile.py
Normal file
24
src/updateRegisterFile.py
Normal file
@ -0,0 +1,24 @@
|
||||
import datetime
|
||||
import RegisterDatapoint
|
||||
import pickle
|
||||
import json
|
||||
|
||||
|
||||
with open('registers.pkl', 'rb') as f:
|
||||
datapoints = pickle.load(f)
|
||||
|
||||
newDatapoints = []
|
||||
for dp in datapoints:
|
||||
ndp = type(dp)()
|
||||
for k,v in dp.__dict__.items():
|
||||
if k != 'logger':
|
||||
ndp.__dict__[k] = v
|
||||
newDatapoints.append(ndp)
|
||||
|
||||
|
||||
|
||||
js = json.dumps(newDatapoints, cls=RegisterDatapoint.JsonifyEncoder, sort_keys=True, indent=4)
|
||||
print(js)
|
||||
|
||||
|
||||
RegisterDatapoint.saveRegisterList(newDatapoints, 'registers.json')
|
Reference in New Issue
Block a user