import time import connexion import json import werkzeug import os from loguru import logger from pymodbus.client.sync import ModbusTcpClient from pymodbus.exceptions import ModbusIOException #with open('/opt/app/config/authservice.key', 'r') as f: # JWT_PRIV_KEY = f.read() clients = { "modbus1": "172.16.2.157" } clientConn = {} class UnknownClientException (Exception): pass class UnableToConnectException (Exception): pass class ConnectionException (Exception): pass class IllegalValueException (Exception): pass def connect(name): try: if name not in clientConn: clientConn[name] = ModbusTcpClient(clients[name]) if not clientConn[name].connect(): raise UnableToConnectException() return clientConn[name] except KeyError as e: raise UnknownClientException(e) def close(name): try: clientConn[name].close() except KeyError as e: raise UnknownClientException(e) def listClients(): return [ {"name":k, "address":v} for k,v in clients.items() ] def createClient(**args): try: body = args["body"] name = body["name"] address = body["address"] return "Ok" except KeyError as e: raise werkzeug.exceptions.BadRequest("name or address missing") except Exception as e: raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e))) def readCoil(client, address): try: conn = connect(client) res = conn.read_coils(address) if isinstance(res, ModbusIOException): raise ConnectionException() return int(res.bits[0]) except Exception as e: raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e))) finally: close(client) def writeCoil(client, address, **args): try: value = args["body"] if value not in [ 0, 1 ]: raise IllegalValueException("Value in body must be 0 or 1") conn = connect(client) res = conn.write_coil(address, value) if isinstance(res, ModbusIOException): raise ConnectionException() return "Ok" except Exception as e: raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e))) finally: close(client) def readDiscreteRegister(client, address): try: conn = connect(client) res = conn.read_discrete_inputs(address) if isinstance(res, ModbusIOException): raise ConnectionException() return int(res.bits[0]) except Exception as e: raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e))) finally: close(client) def readInputRegister(client, address): try: conn = connect(client) res = conn.read_input_registers(address) if isinstance(res, ModbusIOException): raise ConnectionException() return res.registers except Exception as e: raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e))) finally: close(client)