Wolfgang Hottgenroth 24c09c0ed8 initial
2022-02-03 14:45:29 +01:00

222 lines
7.2 KiB
Python

import asyncio
import sys
import argparse
from asyncua import Client, ua,Node
from asyncua.ua.uaerrors import UaStatusCodeError
import logging
import concurrent.futures
def parse_args(parser, requirenodeid=False):
args = parser.parse_args()
# logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
logging.basicConfig(level=getattr(logging, args.loglevel))
if args.url and "://" not in args.url:
logging.info(f"Adding default scheme {ua.OPC_TCP_SCHEME} to URL {args.url}")
args.url = ua.OPC_TCP_SCHEME + "://" + args.url
if requirenodeid:
_require_nodeid(parser, args)
return args
async def _lsprint_0(node, depth, indent=""):
if not indent:
print("{0:30} {1:25}".format("DisplayName", "NodeId"))
print("")
for desc in await node.get_children_descriptions():
print(
"{0}{1:30} {2:25}".format(
indent, desc.DisplayName.to_string(), desc.NodeId.to_string()
)
)
if depth:
await _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + " ")
async def _lsprint_1(node, depth, indent=""):
if not indent:
print("{0:30} {1:25} {2:25} {3:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
print("")
for desc in await node.get_children_descriptions():
if desc.NodeClass == ua.NodeClass.Variable:
try:
val = await Node(node.server, desc.NodeId).read_value()
except UaStatusCodeError as err:
val = "Bad (0x{0:x})".format(err.code)
print(
"{0}{1:30} {2!s:25} {3!s:25}, {4!s:3}".format(
indent,
desc.DisplayName.to_string(),
desc.NodeId.to_string(),
desc.BrowseName.to_string(),
val,
)
)
else:
print(
"{0}{1:30} {2!s:25} {3!s:25}".format(
indent,
desc.DisplayName.to_string(),
desc.NodeId.to_string(),
desc.BrowseName.to_string(),
)
)
if depth:
await _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + " ")
async def _lsprint_long(pnode, depth, indent=""):
if not indent:
print(
"{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format(
"DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"
)
)
print("")
for node in await pnode.get_children():
attrs = await node.read_attributes(
[
ua.AttributeIds.DisplayName,
ua.AttributeIds.BrowseName,
ua.AttributeIds.NodeClass,
ua.AttributeIds.WriteMask,
ua.AttributeIds.UserWriteMask,
ua.AttributeIds.DataType,
ua.AttributeIds.Value,
]
)
name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
update = attrs[-1].ServerTimestamp
if nclass == ua.NodeClass.Variable:
print(
"{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(
indent,
name.to_string(),
node.nodeid.to_string(),
bname.to_string(),
dtype.to_string(),
update,
val,
)
)
else:
print(
"{0}{1:30} {2:25} {3:25}".format(
indent, name.to_string(), bname.to_string(), node.nodeid.to_string()
)
)
if depth:
await _lsprint_long(node, depth - 1, indent + " ")
def add_minimum_args(parser):
parser.add_argument(
"-u",
"--url",
help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
default="opc.tcp://localhost:4840",
metavar="URL",
)
parser.add_argument(
"-v",
"--verbose",
dest="loglevel",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="WARNING",
help="Set log level",
)
parser.add_argument(
"--timeout",
dest="timeout",
type=int,
default=1,
help="Set socket timeout (NOT the diverse UA timeouts)",
)
def add_common_args(parser, default_node="i=84", require_node=False):
add_minimum_args(parser)
parser.add_argument(
"-n",
"--nodeid",
help="Fully-qualified node ID (for example: i=85). Default: root node",
default=default_node,
required=require_node,
metavar="NODE",
)
parser.add_argument(
"-p",
"--path",
help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
default="",
metavar="BROWSEPATH",
)
parser.add_argument(
"-i", "--namespace", help="Default namespace", type=int, default=0, metavar="NAMESPACE"
)
parser.add_argument(
"--security",
help="Security settings, for example:"
" Basic256Sha256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
default="",
)
parser.add_argument(
"--user", help="User name for authentication. Overrides the user name given in the URL."
)
parser.add_argument(
"--password",
help="Password name for authentication. Overrides the password given in the URL.",
)
async def get_node(client, args):
node = client.get_node(args.nodeid)
if args.path:
path = args.path.split(",")
if node.nodeid == ua.NodeId(84, 0) and path[0] == "0:Root":
# let user specify root if not node given
path = path[1:]
node = await node.get_child(path)
return node
async def _configure_client_with_args(client, args):
if args.user:
client.set_user(args.user)
if args.password:
client.set_password(args.password)
await client.set_security_string(args.security)
async def _uals():
parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
add_common_args(parser)
parser.add_argument(
"-l", dest="long_format", const=3, nargs="?", type=int, help="use a long listing format"
)
parser.add_argument("-d", "--depth", default=1, type=int, help="Browse depth")
args = parse_args(parser)
if args.long_format is None:
args.long_format = 1
client = Client(args.url, timeout=args.timeout)
await _configure_client_with_args(client, args)
try:
async with client:
node = await get_node(client, args)
print(f"Browsing node {node} at {args.url}\n")
if args.long_format == 0:
await _lsprint_0(node, args.depth - 1)
elif args.long_format == 1:
await _lsprint_1(node, args.depth - 1)
else:
await _lsprint_long(node, args.depth - 1)
except (OSError, concurrent.futures.TimeoutError) as e:
print(e)
sys.exit(1)
sys.exit(0)
asyncio.run(_uals())