112 lines
3.1 KiB
Python
112 lines
3.1 KiB
Python
import time
|
|
import connexion
|
|
import json
|
|
import werkzeug
|
|
import os
|
|
from loguru import logger
|
|
from pymodbus.client.sync import ModbusTcpClient
|
|
from pymodbus.exceptions import ModbusIOException
|
|
|
|
|
|
#with open('/opt/app/config/authservice.key', 'r') as f:
|
|
# JWT_PRIV_KEY = f.read()
|
|
|
|
clients = {
|
|
"modbus1": "172.16.2.157"
|
|
}
|
|
|
|
clientConn = {}
|
|
|
|
class UnknownClientException (Exception): pass
|
|
|
|
class UnableToConnectException (Exception): pass
|
|
|
|
class ConnectionException (Exception): pass
|
|
|
|
class IllegalValueException (Exception): pass
|
|
|
|
|
|
def connect(name):
|
|
try:
|
|
if name not in clientConn:
|
|
clientConn[name] = ModbusTcpClient(clients[name])
|
|
if not clientConn[name].connect():
|
|
raise UnableToConnectException()
|
|
return clientConn[name]
|
|
except KeyError as e:
|
|
raise UnknownClientException(e)
|
|
|
|
def close(name):
|
|
try:
|
|
clientConn[name].close()
|
|
except KeyError as e:
|
|
raise UnknownClientException(e)
|
|
|
|
|
|
def listClients():
|
|
return [ {"name":k, "address":v} for k,v in clients.items() ]
|
|
|
|
def createClient(**args):
|
|
try:
|
|
body = args["body"]
|
|
|
|
name = body["name"]
|
|
address = body["address"]
|
|
|
|
return "Ok"
|
|
except KeyError as e:
|
|
raise werkzeug.exceptions.BadRequest("name or address missing")
|
|
except Exception as e:
|
|
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
|
|
|
|
def readCoil(client, address):
|
|
try:
|
|
conn = connect(client)
|
|
res = conn.read_coils(address)
|
|
if isinstance(res, ModbusIOException):
|
|
raise ConnectionException()
|
|
return int(res.bits[0])
|
|
except Exception as e:
|
|
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
|
|
finally:
|
|
close(client)
|
|
|
|
def writeCoil(client, address, **args):
|
|
try:
|
|
value = args["body"]
|
|
if value not in [ 0, 1 ]:
|
|
raise IllegalValueException("Value in body must be 0 or 1")
|
|
conn = connect(client)
|
|
res = conn.write_coil(address, value)
|
|
if isinstance(res, ModbusIOException):
|
|
raise ConnectionException()
|
|
return "Ok"
|
|
except Exception as e:
|
|
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
|
|
finally:
|
|
close(client)
|
|
|
|
def readDiscreteRegister(client, address):
|
|
try:
|
|
conn = connect(client)
|
|
res = conn.read_discrete_inputs(address)
|
|
if isinstance(res, ModbusIOException):
|
|
raise ConnectionException()
|
|
return int(res.bits[0])
|
|
except Exception as e:
|
|
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
|
|
finally:
|
|
close(client)
|
|
|
|
def readInputRegister(client, address):
|
|
try:
|
|
conn = connect(client)
|
|
res = conn.read_input_registers(address)
|
|
if isinstance(res, ModbusIOException):
|
|
raise ConnectionException()
|
|
return res.registers
|
|
except Exception as e:
|
|
raise werkzeug.exceptions.BadRequest("{}: {}".format(e.__class__.__name__, str(e)))
|
|
finally:
|
|
close(client)
|