import asyncio import sys import argparse from asyncua import Client, ua,Node from asyncua.ua.uaerrors import UaStatusCodeError import logging import concurrent.futures def parse_args(parser, requirenodeid=False): args = parser.parse_args() # logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel)) logging.basicConfig(level=getattr(logging, args.loglevel)) if args.url and "://" not in args.url: logging.info(f"Adding default scheme {ua.OPC_TCP_SCHEME} to URL {args.url}") args.url = ua.OPC_TCP_SCHEME + "://" + args.url if requirenodeid: _require_nodeid(parser, args) return args async def _lsprint_0(node, depth, indent=""): if not indent: print("{0:30} {1:25}".format("DisplayName", "NodeId")) print("") for desc in await node.get_children_descriptions(): print( "{0}{1:30} {2:25}".format( indent, desc.DisplayName.to_string(), desc.NodeId.to_string() ) ) if depth: await _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + " ") async def _lsprint_1(node, depth, indent=""): if not indent: print("{0:30} {1:25} {2:25} {3:25}".format("DisplayName", "NodeId", "BrowseName", "Value")) print("") for desc in await node.get_children_descriptions(): if desc.NodeClass == ua.NodeClass.Variable: try: val = await Node(node.server, desc.NodeId).read_value() except UaStatusCodeError as err: val = "Bad (0x{0:x})".format(err.code) print( "{0}{1:30} {2!s:25} {3!s:25}, {4!s:3}".format( indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val, ) ) else: print( "{0}{1:30} {2!s:25} {3!s:25}".format( indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), ) ) if depth: await _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + " ") async def _lsprint_long(pnode, depth, indent=""): if not indent: print( "{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format( "DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value" ) ) print("") for node in await pnode.get_children(): attrs = await node.read_attributes( [ ua.AttributeIds.DisplayName, ua.AttributeIds.BrowseName, ua.AttributeIds.NodeClass, ua.AttributeIds.WriteMask, ua.AttributeIds.UserWriteMask, ua.AttributeIds.DataType, ua.AttributeIds.Value, ] ) name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs] update = attrs[-1].ServerTimestamp if nclass == ua.NodeClass.Variable: print( "{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format( indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val, ) ) else: print( "{0}{1:30} {2:25} {3:25}".format( indent, name.to_string(), bname.to_string(), node.nodeid.to_string() ) ) if depth: await _lsprint_long(node, depth - 1, indent + " ") def add_minimum_args(parser): parser.add_argument( "-u", "--url", help="URL of OPC UA server (for example: opc.tcp://example.org:4840)", default="opc.tcp://localhost:4840", metavar="URL", ) parser.add_argument( "-v", "--verbose", dest="loglevel", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="WARNING", help="Set log level", ) parser.add_argument( "--timeout", dest="timeout", type=int, default=1, help="Set socket timeout (NOT the diverse UA timeouts)", ) def add_common_args(parser, default_node="i=84", require_node=False): add_minimum_args(parser) parser.add_argument( "-n", "--nodeid", help="Fully-qualified node ID (for example: i=85). Default: root node", default=default_node, required=require_node, metavar="NODE", ) parser.add_argument( "-p", "--path", help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)", default="", metavar="BROWSEPATH", ) parser.add_argument( "-i", "--namespace", help="Default namespace", type=int, default=0, metavar="NAMESPACE" ) parser.add_argument( "--security", help="Security settings, for example:" " Basic256Sha256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None", default="", ) parser.add_argument( "--user", help="User name for authentication. Overrides the user name given in the URL." ) parser.add_argument( "--password", help="Password name for authentication. Overrides the password given in the URL.", ) async def get_node(client, args): node = client.get_node(args.nodeid) if args.path: path = args.path.split(",") if node.nodeid == ua.NodeId(84, 0) and path[0] == "0:Root": # let user specify root if not node given path = path[1:] node = await node.get_child(path) return node async def _configure_client_with_args(client, args): if args.user: client.set_user(args.user) if args.password: client.set_password(args.password) await client.set_security_string(args.security) async def _uals(): parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result") add_common_args(parser) parser.add_argument( "-l", dest="long_format", const=3, nargs="?", type=int, help="use a long listing format" ) parser.add_argument("-d", "--depth", default=1, type=int, help="Browse depth") args = parse_args(parser) if args.long_format is None: args.long_format = 1 client = Client(args.url, timeout=args.timeout) await _configure_client_with_args(client, args) try: async with client: node = await get_node(client, args) print(f"Browsing node {node} at {args.url}\n") if args.long_format == 0: await _lsprint_0(node, args.depth - 1) elif args.long_format == 1: await _lsprint_1(node, args.depth - 1) else: await _lsprint_long(node, args.depth - 1) except (OSError, concurrent.futures.TimeoutError) as e: print(e) sys.exit(1) sys.exit(0) asyncio.run(_uals())