90 lines
2.5 KiB
Python

from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException
from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
from pymodbus.constants import Endian
from loguru import logger
import sys
import logging
from time import sleep
class LocalException(Exception):
def __init__(self, msg='', cause=None):
super().__init__(self)
self.msg = msg
self.cause = cause
def __str__(self):
return f"LocalException: Msg:{self.msg}, Cause:{self.cause}"
def readInputRegister(client, addr):
client.connect()
try:
res = client.read_input_registers(addr, 2, slave=2)
if (isinstance(res, ReadInputRegistersResponse)):
v = BinaryPayloadDecoder.fromRegisters(res.registers, byteorder=Endian.Big, wordorder=Endian.Big).decode_32bit_float()
return v
elif (isinstance(res, ModbusIOException)):
msg = f"Error: {type(res)}, Content: {res}"
logger.warning(msg)
raise LocalException(msg=msg, cause=res)
else:
msg = f"Unknown type: {type(res)}, Content: {res}"
logger.warning(msg)
raise LocalException(msg=msg)
finally:
client.close()
def writeCoil(client, addr, value):
client.connect()
try:
res = client.write_coil(addr, value, slave=1)
logger.debug(f"write coil result {res}")
return value
finally:
client.close()
l = logging.getLogger()
for h in l.handlers:
l.removeHandler(h)
client = ModbusTcpClient("172.16.2.42")
registers = [
{ "addr":0x0046, "name":"Frequency", "unit":"Hz" },
{ "addr":0x0048, "name":"Import active energy", "unit":"kWh" },
{ "addr":0x004a, "name":"Export active energy", "unit":"kWh" },
{ "addr":0x004c, "name":"Import reactive energy", "unit":"kvarh" },
{ "addr":0x004e, "name":"Export reactive energy", "unit":"kvarh" },
{ "addr":0x000c, "name":"Active power", "unit":"W" },
{ "addr":0x0058, "name":"Positive power", "unit":"W" },
{ "addr":0x005c, "name":"Reverse power", "unit":"W" },
{ "addr":0x0000, "name":"Voltage", "unit":"V" },
{ "addr":0x0006, "name":"Current", "unit":"A" }
]
cnt = 0
coilState = True
while True:
try:
for reg in registers:
v = readInputRegister(client, reg['addr'])
logger.info(f"{reg['name']}: {v} {reg['unit']}")
cnt += 1
if ((cnt % 2) == 0):
coilState = not coilState
logger.info(f"Write {coilState} to coil")
writeCoil(client, 0, coilState)
sleep(5)
except Exception as e:
logger.error(f"Caught exception: {str(e)}")