112 lines
3.1 KiB
Python

import time
import connexion
import json
import werkzeug
import os
from loguru import logger
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException
#with open('/opt/app/config/authservice.key', 'r') as f:
# JWT_PRIV_KEY = f.read()
clients = {
"modbus1": "172.16.2.157"
}
clientConn = {}
class UnknownClientException (Exception): pass
class UnableToConnectException (Exception): pass
class ConnectionException (Exception): pass
class IllegalValueException (Exception): pass
def connect(name):
try:
if name not in clientConn:
clientConn[name] = ModbusTcpClient(clients[name])
if not clientConn[name].connect():
raise UnableToConnectException()
return clientConn[name]
except KeyError as e:
raise UnknownClientException(e)
def close(name):
try:
clientConn[name].close()
except KeyError as e:
raise UnknownClientException(e)
def listClients():
return [ {"name":k, "address":v} for k,v in clients.items() ]
def createClient(**args):
try:
body = args["body"]
name = body["name"]
address = body["address"]
return "Ok"
except KeyError as e:
raise werkzeug.exceptions.BadRequest("name or address missing")
except Exception as e:
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
def readCoil(client, address):
try:
conn = connect(client)
res = conn.read_coils(address)
if isinstance(res, ModbusIOException):
raise ConnectionException()
return int(res.bits[0])
except Exception as e:
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
finally:
close(client)
def writeCoil(client, address, **args):
try:
value = args["body"]
if value not in [ 0, 1 ]:
raise IllegalValueException("Value in body must be 0 or 1")
conn = connect(client)
res = conn.write_coil(address, value)
if isinstance(res, ModbusIOException):
raise ConnectionException()
return "Ok"
except Exception as e:
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
finally:
close(client)
def readDiscreteRegister(client, address):
try:
conn = connect(client)
res = conn.read_discrete_inputs(address)
if isinstance(res, ModbusIOException):
raise ConnectionException()
return int(res.bits[0])
except Exception as e:
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
finally:
close(client)
def readInputRegister(client, address):
try:
conn = connect(client)
res = conn.read_input_registers(address)
if isinstance(res, ModbusIOException):
raise ConnectionException()
return res.registers
except Exception as e:
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
finally:
close(client)