Merge branch 'dev' into pr/169

# Conflicts:
#	unilabos/device_comms/opcua_client/client.py
#	unilabos/device_comms/opcua_client/node/uniopcua.py
#	unilabos/registry/devices/post_process_station.yaml
This commit is contained in:
ZiWei
2025-12-25 13:55:22 +08:00
70 changed files with 233242 additions and 712 deletions

View File

@@ -5,7 +5,6 @@ from typing import Any, Union, List, Dict, Callable, Optional, Tuple
from pydantic import BaseModel
from opcua import Client, ua
from opcua.ua import NodeClass
import pandas as pd
import os
@@ -13,7 +12,7 @@ from unilabos.device_comms.opcua_client.node.uniopcua import Base as OpcUaNodeBa
from unilabos.device_comms.opcua_client.node.uniopcua import Variable, Method, NodeType, DataType
from unilabos.device_comms.universal_driver import UniversalDriver
from unilabos.utils.log import logger
from unilabos.devices.workstation.post_process.decks import post_process_deck
class OpcUaNode(BaseModel):
name: str
@@ -117,6 +116,8 @@ class BaseClient(UniversalDriver):
_variables_to_find: Dict[str, Dict[str, Any]] = {}
_name_mapping: Dict[str, str] = {} # 英文名到中文名的映射
_reverse_mapping: Dict[str, str] = {} # 中文名到英文名的映射
# 直接缓存已找到的 ua.Node 对象,避免因字符串 NodeId 格式导致订阅失败
_found_node_objects: Dict[str, Any] = {}
def __init__(self):
super().__init__()
@@ -210,6 +211,8 @@ class BaseClient(UniversalDriver):
if node_type == NodeType.VARIABLE:
self._node_registry[node_name] = Variable(self.client, node_name, node_id_str, data_type)
logger.info(f"✓ 找到变量节点: '{node_name}', NodeId: {node_id_str}, DataType: {data_type}")
# 缓存真实的 ua.Node 对象用于订阅
self._found_node_objects[node_name] = node
elif node_type == NodeType.METHOD:
# 对于方法节点需要获取父节点ID
parent_node = node.get_parent()
@@ -499,6 +502,8 @@ class BaseClient(UniversalDriver):
err = result_dict.get("error")
print(f"读取 {node_name} 返回值 = {val} (类型: {type(val).__name__}), 错误 = {err}")
print(f"读取 {node_name} 返回值 = {val} (类型: {type(val).__name__}, 错误 = {err}")
return val, err
except Exception as e:
print(f"解析读取结果失败: {e}, 原始结果: {result_str}")
@@ -1171,13 +1176,51 @@ class BaseClient(UniversalDriver):
class OpcUaClient(BaseClient):
def __init__(self, url: str, config_path: str = None, username: str = None, password: str = None, refresh_interval: float = 1.0):
def __init__(
self,
url: str,
deck: Optional[Union[post_process_deck, Dict[str, Any]]] = None,
config_path: str = None,
username: str = None,
password: str = None,
use_subscription: bool = True,
cache_timeout: float = 5.0,
subscription_interval: int = 500,
*args,
**kwargs,
):
# 降低OPCUA库的日志级别
import logging
logging.getLogger("opcua").setLevel(logging.WARNING)
super().__init__()
# ===== 关键修改:参照 BioyondWorkstation 处理 deck =====
super().__init__()
# 处理 deck 参数
if deck is None:
self.deck = post_process_deck(setup=True)
elif isinstance(deck, dict):
self.deck = post_process_deck(setup=True)
elif hasattr(deck, 'children'):
self.deck = deck
else:
raise ValueError(f"deck 参数类型不支持: {type(deck)}")
if self.deck is None:
raise ValueError("Deck 配置不能为空")
# 统计仓库信息
warehouse_count = 0
if hasattr(self.deck, 'children'):
warehouse_count = len(self.deck.children)
logger.info(f"Deck 初始化完成,加载 {warehouse_count} 个资源")
# OPC UA 客户端初始化
client = Client(url)
if username and password:
@@ -1185,92 +1228,152 @@ class OpcUaClient(BaseClient):
client.set_password(password)
self._set_client(client)
self._connect()
# 节点值缓存和刷新相关属性
self._node_values = {} # 缓存节点值
self._refresh_interval = refresh_interval # 刷新间隔(秒)
self._refresh_running = False
self._refresh_thread = None
# 订阅相关属性
self._use_subscription = use_subscription
self._subscription = None
self._subscription_handles = {}
self._subscription_interval = subscription_interval
# 缓存相关属性
self._node_values = {} # 修改为支持时间戳的缓存结构
self._cache_timeout = cache_timeout
# 连接状态监控
self._connection_check_interval = 30.0 # 连接检查间隔(秒)
self._connection_monitor_running = False
self._connection_monitor_thread = None
# 添加线程锁保护OPC UA客户端的并发访问
import threading
self._client_lock = threading.RLock()
# 连接到服务器
self._connect()
# 如果提供了配置文件路径,则加载配置并注册工作流
if config_path:
self.load_config(config_path)
# 延迟启动节点值刷新线程,确保节点查找完成
# 注意刷新线程会在所有节点注册后由load_config调用
# 暂时不在这里启动,避免在节点未找到时就开始刷新
# self.start_node_refresh()
# 启动连接监控
self._start_connection_monitor()
def _register_nodes_as_attributes(self):
"""将所有节点注册为实例属性可以通过self.node_name访问"""
for node_name, node in self._node_registry.items():
# 检查node_id是否有效
if not node.node_id or node.node_id == "":
logger.warning(f"⚠ 节点 '{node_name}' 的 node_id 为空,跳过注册为属性")
continue
# 检查是否有对应的英文名称
eng_name = self._reverse_mapping.get(node_name)
if eng_name:
# 如果有对应的英文名称,使用英文名称作为属性名
attr_name = eng_name
else:
# 如果没有对应的英文名称,使用原始名称,但替换空格和特殊字符
attr_name = node_name.replace(' ', '_').replace('-', '_')
def _connect(self) -> None:
"""连接到OPC UA服务器"""
logger.info('尝试连接到 OPC UA 服务器...')
if self.client:
try:
self.client.connect()
logger.info('✓ 客户端已连接!')
# 创建获取节点值的属性方法,使用中文名称获取节点值
def create_property_getter(node_key):
def getter(self):
# 优先从缓存获取值
if node_key in self._node_values:
return self._node_values[node_key]
# 缓存中没有则直接读取
value, _ = self.use_node(node_key).read()
return value
return getter
# 连接后开始查找节点
if self._variables_to_find:
self._find_nodes()
# 使用property装饰器将方法注册为类属性
setattr(OpcUaClient, attr_name, property(create_property_getter(node_name)))
logger.debug(f"已注册节点 '{node_name}' 为属性 '{attr_name}', NodeId: {node.node_id}")
# 如果启用订阅模式,设置订阅
if self._use_subscription:
self._setup_subscriptions()
else:
logger.info("订阅模式已禁用,将使用按需读取模式")
def refresh_node_values(self):
"""刷新所有节点的值到缓存"""
if not self.client:
logger.warning("客户端未初始化,无法刷新节点值")
except Exception as e:
logger.error(f'客户端连接失败: {e}')
raise
else:
raise ValueError('客户端未初始化')
class SubscriptionHandler:
"""freeopcua订阅处理器必须实现 datachange_notification 方法"""
def __init__(self, outer):
self.outer = outer
def datachange_notification(self, node, val, data):
# 委托给外层类的处理函数
try:
self.outer._on_subscription_datachange(node, val, data)
except Exception as e:
logger.error(f"订阅数据回调处理失败: {e}")
# 可选:事件通知占位,避免库调用时报缺失
def event_notification(self, event):
pass
def _setup_subscriptions(self):
"""设置 OPC UA 订阅"""
if not self.client or not self._use_subscription:
return
# 使用锁保护客户端访问
with self._client_lock:
try:
# 简单检查连接状态,如果不连接会抛出异常
self.client.get_namespace_array()
logger.info(f"开始设置订阅 (发布间隔: {self._subscription_interval}ms)...")
# 创建订阅
handler = OpcUaClient.SubscriptionHandler(self)
self._subscription = self.client.create_subscription(
self._subscription_interval,
handler
)
# 为所有变量节点创建监控项
subscribed_count = 0
skipped_count = 0
for node_name, node in self._node_registry.items():
# 只为变量节点创建订阅
if node.type == NodeType.VARIABLE and node.node_id:
try:
# 优先使用在查找阶段缓存的真实 ua.Node 对象
ua_node = self._found_node_objects.get(node_name)
if ua_node is None:
ua_node = self.client.get_node(node.node_id)
handle = self._subscription.subscribe_data_change(ua_node)
self._subscription_handles[node_name] = handle
subscribed_count += 1
logger.debug(f"✓ 已订阅节点: {node_name}")
except Exception as e:
skipped_count += 1
logger.warning(f"✗ 订阅节点 {node_name} 失败: {e}")
else:
skipped_count += 1
logger.info(f"订阅设置完成: 成功 {subscribed_count} 个, 跳过 {skipped_count}")
except Exception as e:
logger.warning(f"客户端连接异常,无法刷新节点值: {e}")
return
logger.error(f"设置订阅失败: {e}")
traceback.print_exc()
# 订阅失败时回退到按需读取模式
self._use_subscription = False
logger.warning("订阅模式设置失败,已自动切换到按需读取模式")
for node_name, node in self._node_registry.items():
try:
# 跳过node_id为空的节点
if not node.node_id or node.node_id == "":
continue
def _on_subscription_datachange(self, node, val, data):
"""订阅数据变化处理器(供内部 SubscriptionHandler 调用)"""
try:
node_id = str(node.nodeid)
current_time = time.time()
# 查找对应的节点名称
for node_name, node_obj in self._node_registry.items():
if node_obj.node_id == node_id:
self._node_values[node_name] = {
'value': val,
'timestamp': current_time,
'source': 'subscription'
}
logger.debug(f"订阅更新: {node_name} = {val}")
break
except Exception as e:
logger.error(f"处理订阅数据失败: {e}")
if hasattr(node, 'read'):
value, error = node.read()
if not error:
self._node_values[node_name] = value
#logger.debug(f"已刷新节点 '{node_name}' 的值: {value}")
except Exception as e:
# 降低日志级别,避免刷新线程产生大量错误日志
logger.debug(f"刷新节点 '{node_name}' 失败: {e}")
def get_node_value(self, name, use_cache=True, force_read=False):
"""
获取节点值(智能缓存版本)
def get_node_value(self, name):
"""获取节点值,支持中文名英文名"""
# 如果提供的是英文名,转换为中文名
参数:
name: 节点名称(支持中文名英文名
use_cache: 是否使用缓存
force_read: 是否强制从服务器读取(忽略缓存)
"""
# 处理名称映射
if name in self._name_mapping:
chinese_name = self._name_mapping[name]
# 优先从缓存获取值
@@ -1290,15 +1393,63 @@ class OpcUaClient(BaseClient):
else:
raise ValueError(f"未找到名称为 '{name}' 的节点")
elif name in self._node_registry:
chinese_name = name
else:
raise ValueError(f"未找到名称为 '{name}' 的节点")
# 如果强制读取,直接从服务器读取
if force_read:
with self._client_lock:
value, _ = self.use_node(chinese_name).read()
# 更新缓存
self._node_values[chinese_name] = {
'value': value,
'timestamp': time.time(),
'source': 'forced_read'
}
return value
# 检查缓存
if use_cache and chinese_name in self._node_values:
cache_entry = self._node_values[chinese_name]
cache_age = time.time() - cache_entry['timestamp']
# 如果是订阅模式,缓存永久有效(由订阅更新)
# 如果是按需读取模式,检查缓存超时
if cache_entry.get('source') == 'subscription' or cache_age < self._cache_timeout:
logger.debug(f"从缓存读取: {chinese_name} = {cache_entry['value']} (age: {cache_age:.2f}s, source: {cache_entry.get('source', 'unknown')})")
return cache_entry['value']
# 缓存过期或不存在,从服务器读取
with self._client_lock:
try:
value, error = self.use_node(chinese_name).read()
if not error:
# 更新缓存
self._node_values[chinese_name] = {
'value': value,
'timestamp': time.time(),
'source': 'on_demand_read'
}
return value
else:
logger.warning(f"读取节点 {chinese_name} 失败")
return None
except Exception as e:
logger.error(f"读取节点 {chinese_name} 出错: {e}")
return None
def set_node_value(self, name, value):
"""设置节点值,支持中文名和英文名"""
# 如果提供的是英文名,转换为中文名
"""
设置节点值
写入成功后会立即更新本地缓存
"""
# 处理名称映射
if name in self._name_mapping:
chinese_name = self._name_mapping[name]
node = self.use_node(chinese_name)
# 如果提供的是中文名,直接使用
elif name in self._node_registry:
node = self.use_node(name)
chinese_name = name
else:
raise ValueError(f"未找到名称为 '{name}' 的节点")
@@ -1317,6 +1468,50 @@ class OpcUaClient(BaseClient):
logger.info(f"节点值刷新线程已启动,刷新间隔: {self._refresh_interval}")
while self._refresh_running:
with self._client_lock:
try:
node = self.use_node(chinese_name)
error = node.write(value)
if not error:
# 写入成功,立即更新缓存
self._node_values[chinese_name] = {
'value': value,
'timestamp': time.time(),
'source': 'write'
}
logger.debug(f"写入成功: {chinese_name} = {value}")
return True
else:
logger.warning(f"写入节点 {chinese_name} 失败")
return False
except Exception as e:
logger.error(f"写入节点 {chinese_name} 出错: {e}")
return False
def _check_connection(self) -> bool:
"""检查连接状态"""
try:
with self._client_lock:
if self.client:
# 尝试获取命名空间数组来验证连接
self.client.get_namespace_array()
return True
except Exception as e:
logger.warning(f"连接检查失败: {e}")
return False
return False
def _connection_monitor_worker(self):
"""连接监控线程工作函数"""
self._connection_monitor_running = True
logger.info(f"连接监控线程已启动 (检查间隔: {self._connection_check_interval}秒)")
reconnect_attempts = 0
max_reconnect_attempts = 5
while self._connection_monitor_running:
try:
self.refresh_node_values()
except Exception as e:
@@ -1329,6 +1524,49 @@ class OpcUaClient(BaseClient):
"""启动节点值刷新线程"""
if self._refresh_thread is not None and self._refresh_thread.is_alive():
logger.warning("节点值刷新线程已在运行")
# 检查连接状态
if not self._check_connection():
logger.warning("检测到连接断开,尝试重新连接...")
reconnect_attempts += 1
if reconnect_attempts <= max_reconnect_attempts:
try:
# 尝试重新连接
with self._client_lock:
if self.client:
try:
self.client.disconnect()
except:
pass
self.client.connect()
logger.info("✓ 重新连接成功")
# 重新设置订阅
if self._use_subscription:
self._setup_subscriptions()
reconnect_attempts = 0
except Exception as e:
logger.error(f"重新连接失败 (尝试 {reconnect_attempts}/{max_reconnect_attempts}): {e}")
time.sleep(5) # 重连失败后等待5秒
else:
logger.error(f"达到最大重连次数 ({max_reconnect_attempts}),停止重连")
self._connection_monitor_running = False
else:
# 连接正常,重置重连计数
reconnect_attempts = 0
except Exception as e:
logger.error(f"连接监控出错: {e}")
# 等待下次检查
time.sleep(self._connection_check_interval)
def _start_connection_monitor(self):
"""启动连接监控线程"""
if self._connection_monitor_thread is not None and self._connection_monitor_thread.is_alive():
logger.warning("连接监控线程已在运行")
return
import threading
@@ -1342,6 +1580,94 @@ class OpcUaClient(BaseClient):
self._refresh_thread.join(timeout=2.0)
logger.info("节点值刷新线程已停止")
self._connection_monitor_thread = threading.Thread(
target=self._connection_monitor_worker,
daemon=True,
name="OpcUaConnectionMonitor"
)
self._connection_monitor_thread.start()
def _stop_connection_monitor(self):
"""停止连接监控线程"""
self._connection_monitor_running = False
if self._connection_monitor_thread and self._connection_monitor_thread.is_alive():
self._connection_monitor_thread.join(timeout=2.0)
logger.info("连接监控线程已停止")
def read_node(self, node_name: str) -> str:
"""
读取节点值的便捷方法(使用缓存)
返回JSON格式字符串
"""
try:
# 使用get_node_value方法自动处理缓存
value = self.get_node_value(node_name, use_cache=True)
# 获取缓存信息
chinese_name = self._name_mapping.get(node_name, node_name)
cache_info = self._node_values.get(chinese_name, {})
result = {
"value": value,
"error": False,
"node_name": node_name,
"timestamp": time.time(),
"cache_age": time.time() - cache_info.get('timestamp', time.time()),
"source": cache_info.get('source', 'unknown')
}
return json.dumps(result)
except Exception as e:
logger.error(f"读取节点 {node_name} 失败: {e}")
result = {
"value": None,
"error": True,
"node_name": node_name,
"error_message": str(e),
"timestamp": time.time()
}
return json.dumps(result)
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
current_time = time.time()
stats = {
'total_cached_nodes': len(self._node_values),
'subscription_nodes': 0,
'on_demand_nodes': 0,
'expired_nodes': 0,
'cache_timeout': self._cache_timeout,
'using_subscription': self._use_subscription
}
for node_name, cache_entry in self._node_values.items():
source = cache_entry.get('source', 'unknown')
cache_age = current_time - cache_entry['timestamp']
if source == 'subscription':
stats['subscription_nodes'] += 1
elif source in ['on_demand_read', 'forced_read', 'write']:
stats['on_demand_nodes'] += 1
if cache_age > self._cache_timeout:
stats['expired_nodes'] += 1
return stats
def print_cache_stats(self):
"""打印缓存统计信息"""
stats = self.get_cache_stats()
print("\n" + "="*80)
print("缓存统计信息")
print("="*80)
print(f"总缓存节点数: {stats['total_cached_nodes']}")
print(f"订阅模式: {'启用' if stats['using_subscription'] else '禁用'}")
print(f" - 订阅更新节点: {stats['subscription_nodes']}")
print(f" - 按需读取节点: {stats['on_demand_nodes']}")
print(f" - 已过期节点: {stats['expired_nodes']}")
print(f"缓存超时时间: {stats['cache_timeout']}")
print("="*80 + "\n")
def load_config(self, config_path: str) -> None:
"""从JSON配置文件加载并注册工作流"""
try:
@@ -1350,46 +1676,43 @@ class OpcUaClient(BaseClient):
# 处理节点注册
if "register_node_list_from_csv_path" in config_data:
# 获取配置文件所在目录
config_dir = os.path.dirname(os.path.abspath(config_path))
# 处理CSV路径如果是相对路径则相对于配置文件所在目录
if "path" in config_data["register_node_list_from_csv_path"]:
csv_path = config_data["register_node_list_from_csv_path"]["path"]
if not os.path.isabs(csv_path):
# 转换为绝对路径
csv_path = os.path.join(config_dir, csv_path)
config_data["register_node_list_from_csv_path"]["path"] = csv_path
# 直接使用字典
self.register_node_list_from_csv_path(**config_data["register_node_list_from_csv_path"])
# 立即执行节点查找(如果客户端已连接)
if self.client and self._variables_to_find:
logger.info("CSV加载完成开始查找服务器节点...")
self._find_nodes()
# 处理工作流创建
if "create_flow" in config_data:
# 直接传递字典列表
self.create_workflow_from_json(config_data["create_flow"])
# 将工作流注册为实例方法
self.register_workflows_as_methods()
# 将所有节点注册为属性(只注册已找到的节点)
self._register_nodes_as_attributes()
# 打印节点注册统计
# 打印统计信息
found_count = len(self._node_registry)
total_count = len(self._variables_to_find)
if found_count < total_count:
logger.warning(f"节点查找完成:找到 {found_count}/{total_count} 个节点")
logger.warning(f"{total_count - found_count} 个节点未找到,这些节点的操作将会失败")
else:
logger.info(f"✓ 节点查找完成:所有 {found_count} 个节点均已找到")
# 现在启动节点值刷新线程
# self.start_node_refresh()
# 如果使用订阅模式,重新设置订阅(确保新节点被订阅)
if self._use_subscription and found_count > 0:
self._setup_subscriptions()
logger.info(f"成功从 {config_path} 加载配置")
except Exception as e:
@@ -1488,9 +1811,71 @@ class OpcUaClient(BaseClient):
# 停止刷新线程
self.stop_node_refresh()
"""断开连接并清理资源"""
logger.info("正在断开连接...")
# 停止连接监控
self._stop_connection_monitor()
# 删除订阅
if self._subscription:
try:
with self._client_lock:
self._subscription.delete()
logger.info("订阅已删除")
except Exception as e:
logger.warning(f"删除订阅失败: {e}")
# 断开客户端连接
if self.client:
self.client.disconnect()
logger.info("OPC UA client disconnected")
try:
with self._client_lock:
self.client.disconnect()
logger.info("✓ OPC UA 客户端已断开连接")
except Exception as e:
logger.error(f"断开连接失败: {e}")
def _register_nodes_as_attributes(self):
"""将所有节点注册为实例属性"""
for node_name, node in self._node_registry.items():
if not node.node_id or node.node_id == "":
logger.warning(f"⚠ 节点 '{node_name}' 的 node_id 为空,跳过注册为属性")
continue
eng_name = self._reverse_mapping.get(node_name)
attr_name = eng_name if eng_name else node_name.replace(' ', '_').replace('-', '_')
def create_property_getter(node_key):
def getter(self):
return self.get_node_value(node_key, use_cache=True)
return getter
setattr(OpcUaClient, attr_name, property(create_property_getter(node_name)))
logger.debug(f"已注册节点 '{node_name}' 为属性 '{attr_name}'")
def post_init(self, ros_node):
"""ROS2 节点就绪后的初始化"""
if not (hasattr(self, 'deck') and self.deck):
return
if not (hasattr(ros_node, 'resource_tracker') and ros_node.resource_tracker):
logger.warning("resource_tracker 不存在,无法注册 deck")
return
# 1. 本地注册(必需)
ros_node.resource_tracker.add_resource(self.deck)
# 2. 上传云端
try:
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
ROS2DeviceNode.run_async_func(
ros_node.update_resource,
True,
resources=[self.deck]
)
logger.info("Deck 已上传到云端")
except Exception as e:
logger.error(f"上传失败: {e}")
if __name__ == '__main__':
@@ -1523,6 +1908,7 @@ if __name__ == '__main__':
grab_complete = client.get_node_value("grab_complete")
reaction_tank = client.get_node_value("reaction_tank_number")
raw_tank = client.get_node_value("raw_tank_number")
print(f"\n执行后状态检查 (使用英文节点名):")
print(f" - 抓取完成状态: {grab_complete}")
print(f" - 当前反应罐号码: {reaction_tank}")