mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 21:11:12 +00:00
* 初始提交,只保留工作区当前状态 * remove redundant arm_slider meshes --------- Co-authored-by: Junhan Chang <changjh@dp.tech>
180 lines
6.1 KiB
Python
180 lines
6.1 KiB
Python
# coding=utf-8
|
|
from enum import Enum
|
|
from abc import ABC, abstractmethod
|
|
from typing import Tuple, Union, Optional, Any, List
|
|
|
|
from opcua import Client, Node
|
|
from opcua.ua import NodeId, NodeClass, VariantType
|
|
|
|
|
|
class DataType(Enum):
|
|
BOOLEAN = VariantType.Boolean
|
|
SBYTE = VariantType.SByte
|
|
BYTE = VariantType.Byte
|
|
INT16 = VariantType.Int16
|
|
UINT16 = VariantType.UInt16
|
|
INT32 = VariantType.Int32
|
|
UINT32 = VariantType.UInt32
|
|
INT64 = VariantType.Int64
|
|
UINT64 = VariantType.UInt64
|
|
FLOAT = VariantType.Float
|
|
DOUBLE = VariantType.Double
|
|
STRING = VariantType.String
|
|
DATETIME = VariantType.DateTime
|
|
BYTESTRING = VariantType.ByteString
|
|
|
|
|
|
class NodeType(Enum):
|
|
VARIABLE = NodeClass.Variable
|
|
OBJECT = NodeClass.Object
|
|
METHOD = NodeClass.Method
|
|
OBJECTTYPE = NodeClass.ObjectType
|
|
VARIABLETYPE = NodeClass.VariableType
|
|
REFERENCETYPE = NodeClass.ReferenceType
|
|
DATATYPE = NodeClass.DataType
|
|
VIEW = NodeClass.View
|
|
|
|
|
|
class Base(ABC):
|
|
def __init__(self, client: Client, name: str, node_id: str, typ: NodeType, data_type: DataType):
|
|
self._node_id: str = node_id
|
|
self._client = client
|
|
self._name = name
|
|
self._type = typ
|
|
self._data_type = data_type
|
|
self._node: Optional[Node] = None
|
|
|
|
def _get_node(self) -> Node:
|
|
if self._node is None:
|
|
try:
|
|
# 检查是否是NumericNodeId(ns=X;i=Y)格式
|
|
if "NumericNodeId" in self._node_id:
|
|
# 从字符串中提取命名空间和标识符
|
|
import re
|
|
match = re.search(r'ns=(\d+);i=(\d+)', self._node_id)
|
|
if match:
|
|
ns = int(match.group(1))
|
|
identifier = int(match.group(2))
|
|
node_id = NodeId(identifier, ns)
|
|
self._node = self._client.get_node(node_id)
|
|
else:
|
|
raise ValueError(f"无法解析节点ID: {self._node_id}")
|
|
else:
|
|
# 直接使用节点ID字符串
|
|
self._node = self._client.get_node(self._node_id)
|
|
except Exception as e:
|
|
print(f"获取节点失败: {self._node_id}, 错误: {e}")
|
|
raise
|
|
return self._node
|
|
|
|
@abstractmethod
|
|
def read(self) -> Tuple[Any, bool]:
|
|
"""读取节点值,返回(值, 是否出错)"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def write(self, value: Any) -> bool:
|
|
"""写入节点值,返回是否出错"""
|
|
pass
|
|
|
|
@property
|
|
def type(self) -> NodeType:
|
|
return self._type
|
|
|
|
@property
|
|
def node_id(self) -> str:
|
|
return self._node_id
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._name
|
|
|
|
|
|
class Variable(Base):
|
|
def __init__(self, client: Client, name: str, node_id: str, data_type: DataType):
|
|
super().__init__(client, name, node_id, NodeType.VARIABLE, data_type)
|
|
|
|
def read(self) -> Tuple[Any, bool]:
|
|
try:
|
|
value = self._get_node().get_value()
|
|
return value, False
|
|
except Exception as e:
|
|
print(f"读取变量 {self._name} 失败: {e}")
|
|
return None, True
|
|
|
|
def write(self, value: Any) -> bool:
|
|
try:
|
|
self._get_node().set_value(value)
|
|
return False
|
|
except Exception as e:
|
|
print(f"写入变量 {self._name} 失败: {e}")
|
|
return True
|
|
|
|
|
|
class Method(Base):
|
|
def __init__(self, client: Client, name: str, node_id: str, parent_node_id: str, data_type: DataType):
|
|
super().__init__(client, name, node_id, NodeType.METHOD, data_type)
|
|
self._parent_node_id = parent_node_id
|
|
self._parent_node = None
|
|
|
|
def _get_parent_node(self) -> Node:
|
|
if self._parent_node is None:
|
|
try:
|
|
# 检查是否是NumericNodeId(ns=X;i=Y)格式
|
|
if "NumericNodeId" in self._parent_node_id:
|
|
# 从字符串中提取命名空间和标识符
|
|
import re
|
|
match = re.search(r'ns=(\d+);i=(\d+)', self._parent_node_id)
|
|
if match:
|
|
ns = int(match.group(1))
|
|
identifier = int(match.group(2))
|
|
node_id = NodeId(identifier, ns)
|
|
self._parent_node = self._client.get_node(node_id)
|
|
else:
|
|
raise ValueError(f"无法解析父节点ID: {self._parent_node_id}")
|
|
else:
|
|
# 直接使用节点ID字符串
|
|
self._parent_node = self._client.get_node(self._parent_node_id)
|
|
except Exception as e:
|
|
print(f"获取父节点失败: {self._parent_node_id}, 错误: {e}")
|
|
raise
|
|
return self._parent_node
|
|
|
|
def read(self) -> Tuple[Any, bool]:
|
|
"""方法节点不支持读取操作"""
|
|
return None, True
|
|
|
|
def write(self, value: Any) -> bool:
|
|
"""方法节点不支持写入操作"""
|
|
return True
|
|
|
|
def call(self, *args) -> Tuple[Any, bool]:
|
|
"""调用方法,返回(返回值, 是否出错)"""
|
|
try:
|
|
result = self._get_parent_node().call_method(self._get_node(), *args)
|
|
return result, False
|
|
except Exception as e:
|
|
print(f"调用方法 {self._name} 失败: {e}")
|
|
return None, True
|
|
|
|
|
|
class Object(Base):
|
|
def __init__(self, client: Client, name: str, node_id: str):
|
|
super().__init__(client, name, node_id, NodeType.OBJECT, None)
|
|
|
|
def read(self) -> Tuple[Any, bool]:
|
|
"""对象节点不支持直接读取操作"""
|
|
return None, True
|
|
|
|
def write(self, value: Any) -> bool:
|
|
"""对象节点不支持直接写入操作"""
|
|
return True
|
|
|
|
def get_children(self) -> Tuple[List[Node], bool]:
|
|
"""获取子节点列表,返回(子节点列表, 是否出错)"""
|
|
try:
|
|
children = self._get_node().get_children()
|
|
return children, False
|
|
except Exception as e:
|
|
print(f"获取对象 {self._name} 的子节点失败: {e}")
|
|
return [], True |