from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ModbusIOException from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder from pymodbus.constants import Endian from loguru import logger import sys import logging from time import sleep class LocalModbusException(Exception): def __init__(self, msg='', cause=None): super().__init__(self) self.msg = msg self.cause = cause def __str__(self): return f"LocalModbusException: Msg:{self.msg}, Cause:{self.cause}" class ModbusHandler: def __init__(self, config): self.config = config['modbus'] self.client = ModbusTcpClient(self.config['gateway']) self.client.connect() def readRegister(self, typ, slave, addr): if typ == 'input': return self.readInputRegister(slave, addr) elif typ == 'holding': return self.readHoldingRegister(slave, addr) else: raise LocalModbusException('unsupported read type') def readInputRegister(self, slave, addr): res = self.client.read_input_registers(addr, 2, slave=slave) if (isinstance(res, ReadInputRegistersResponse)): v = BinaryPayloadDecoder.fromRegisters(res.registers, byteorder=Endian.BIG, wordorder=Endian.BIG).decode_32bit_float() return v elif (isinstance(res, LocalModbusException)): msg = f"Error: {type(res)}, Content: {res}" logger.warning(msg) raise LocalModbusException(msg=msg, cause=res) else: msg = f"Unknown type: {type(res)}, Content: {res}" logger.warning(msg) raise LocalModbusException(msg=msg) def readHoldingRegister(self, slave, addr): res = self.client.read_holding_registers(addr, 2, slave=slave) if (isinstance(res, ReadHoldingRegistersResponse)): v = res.registers return v elif (isinstance(res, LocalModbusException)): msg = f"Error: {type(res)}, Content: {res}" logger.warning(msg) raise LocalModbusException(msg=msg, cause=res) else: msg = f"Unknown type: {type(res)}, Content: {res}" logger.warning(msg) raise LocalModbusException(msg=msg) def writeCoil(self, slave, addr, value): res = self.client.write_coil(addr, value, slave=slave) logger.debug(f"write coil result {res}") return value #client = ModbusTcpClient("172.16.2.42") #registers = [ # { "addr":0x0046, "name":"Frequency", "unit":"Hz" }, # { "addr":0x0048, "name":"Import active energy", "unit":"kWh" }, # { "addr":0x004a, "name":"Export active energy", "unit":"kWh" }, # { "addr":0x004c, "name":"Import reactive energy", "unit":"kvarh" }, # { "addr":0x004e, "name":"Export reactive energy", "unit":"kvarh" }, # { "addr":0x000c, "name":"Active power", "unit":"W" }, # { "addr":0x0058, "name":"Positive power", "unit":"W" }, # { "addr":0x005c, "name":"Reverse power", "unit":"W" }, # { "addr":0x0000, "name":"Voltage", "unit":"V" }, # { "addr":0x0006, "name":"Current", "unit":"A" } #] # # #cnt = 0 #coilState = True #per = 2 #while True: # try: # for reg in registers: # v = readInputRegister(client, reg['addr']) # logger.info(f"{reg['name']}: {v} {reg['unit']}") # # cnt += 1 # if ((cnt == per) == 0): # cnt = 0 # coilState = not coilState # per = 2 if coilState else 10 # logger.info(f"Write {coilState} to coil") # writeCoil(client, 0, coilState) # # # # sleep(2) # except Exception as e: # logger.error(f"Caught exception: {str(e)}")