From fb6ee795775d1cd790b6d8976417f78ac147a94e Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Wed, 26 Nov 2025 19:57:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(opcua):=20=E5=A2=9E=E5=BC=BA=E8=8A=82?= =?UTF-8?q?=E7=82=B9ID=E8=A7=A3=E6=9E=90=E5=85=BC=E5=AE=B9=E6=80=A7?= =?UTF-8?q?=E5=92=8C=E6=95=B0=E6=8D=AE=E7=B1=BB=E5=9E=8B=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 改进节点ID解析逻辑以支持多种格式,包括字符串和数字标识符 添加数据类型转换处理,确保写入值时类型匹配 优化错误提示信息,便于调试节点连接问题 --- unilabos/device_comms/opcua_client/client.py | 721 +++++++++++------- .../opcua_client/node/uniopcua.py | 194 ++++- 2 files changed, 603 insertions(+), 312 deletions(-) diff --git a/unilabos/device_comms/opcua_client/client.py b/unilabos/device_comms/opcua_client/client.py index 011ce07..0df37fb 100644 --- a/unilabos/device_comms/opcua_client/client.py +++ b/unilabos/device_comms/opcua_client/client.py @@ -125,6 +125,9 @@ class BaseClient(UniversalDriver): # 初始化名称映射字典 self._name_mapping = {} self._reverse_mapping = {} + # 初始化线程锁(在子类中会被重新创建,这里提供默认实现) + import threading + self._client_lock = threading.RLock() def _set_client(self, client: Optional[Client]) -> None: if client is None: @@ -137,7 +140,7 @@ class BaseClient(UniversalDriver): try: self.client.connect() logger.info('client connected!') - + # 连接后开始查找节点 if self._variables_to_find: self._find_nodes() @@ -146,32 +149,45 @@ class BaseClient(UniversalDriver): raise else: raise ValueError('client is not initialized') - + def _find_nodes(self) -> None: """查找服务器中的节点""" if not self.client: raise ValueError('client is not connected') - - logger.info('开始查找节点...') + + logger.info(f'开始查找 {len(self._variables_to_find)} 个节点...') try: # 获取根节点 root = self.client.get_root_node() objects = root.get_child(["0:Objects"]) - + + # 记录查找前的状态 + before_count = len(self._node_registry) + # 查找节点 self._find_nodes_recursive(objects) - + + # 记录查找后的状态 + after_count = len(self._node_registry) + newly_found = after_count - before_count + + logger.info(f"本次查找新增 {newly_found} 个节点,当前共 {after_count} 个") + # 检查是否所有节点都已找到 not_found = [] for var_name, var_info in self._variables_to_find.items(): if var_name not in self._node_registry: not_found.append(var_name) - + if not_found: - logger.warning(f"以下节点未找到: {', '.join(not_found)}") + logger.warning(f"⚠ 以下 {len(not_found)} 个节点未找到: {', '.join(not_found[:10])}{'...' if len(not_found) > 10 else ''}") + logger.warning(f"提示:请检查这些节点名称是否与服务器的 BrowseName 完全匹配(包括大小写、空格等)") + # 提供一个示例来帮助调试 + if not_found: + logger.info(f"尝试在服务器中查找第一个未找到的节点 '{not_found[0]}' 的相似节点...") else: - logger.info("所有节点均已找到") - + logger.info(f"✓ 所有 {len(self._variables_to_find)} 个节点均已找到并注册") + except Exception as e: logger.error(f"查找节点失败: {e}") traceback.print_exc() @@ -182,28 +198,29 @@ class BaseClient(UniversalDriver): # 获取当前节点的浏览名称 browse_name = node.get_browse_name() node_name = browse_name.Name - + # 检查是否是我们要找的变量 if node_name in self._variables_to_find and node_name not in self._node_registry: var_info = self._variables_to_find[node_name] node_type = var_info.get("node_type") data_type = var_info.get("data_type") - + node_id_str = str(node.nodeid) + # 根据节点类型创建相应的对象 if node_type == NodeType.VARIABLE: - self._node_registry[node_name] = Variable(self.client, node_name, str(node.nodeid), data_type) - logger.info(f"找到变量节点: {node_name}") + self._node_registry[node_name] = Variable(self.client, node_name, node_id_str, data_type) + logger.info(f"✓ 找到变量节点: '{node_name}', NodeId: {node_id_str}, DataType: {data_type}") elif node_type == NodeType.METHOD: # 对于方法节点,需要获取父节点ID parent_node = node.get_parent() parent_node_id = str(parent_node.nodeid) - self._node_registry[node_name] = Method(self.client, node_name, str(node.nodeid), parent_node_id, data_type) - logger.info(f"找到方法节点: {node_name}") - + self._node_registry[node_name] = Method(self.client, node_name, node_id_str, parent_node_id, data_type) + logger.info(f"✓ 找到方法节点: '{node_name}', NodeId: {node_id_str}, ParentId: {parent_node_id}") + # 递归处理子节点 for child in node.get_children(): self._find_nodes_recursive(child) - + except Exception as e: # 忽略处理单个节点时的错误,继续处理其他节点 pass @@ -218,50 +235,50 @@ class BaseClient(UniversalDriver): df = pd.read_csv(file_path) df = df.drop_duplicates(subset='Name', keep='first') # 重复的数据应该报错 nodes = [] - + # 检查是否包含英文名称列和节点语言列 has_english_name = 'EnglishName' in df.columns has_node_language = 'NodeLanguage' in df.columns - + # 如果存在英文名称列,创建名称映射字典 name_mapping = {} reverse_mapping = {} - + for _, row in df.iterrows(): name = row.get('Name') node_type_str = row.get('NodeType') data_type_str = row.get('DataType') - + # 获取英文名称和节点语言(如果有) english_name = row.get('EnglishName') if has_english_name else None node_language = row.get('NodeLanguage') if has_node_language else 'English' # 默认为英文 - + # 如果有英文名称,添加到映射字典 if english_name and not pd.isna(english_name) and node_language == 'Chinese': name_mapping[english_name] = name reverse_mapping[name] = english_name - + if not name or not node_type_str: logger.warning(f"跳过无效行: 名称或节点类型缺失") continue - + # 只支持VARIABLE和METHOD两种类型 if node_type_str not in ['VARIABLE', 'METHOD']: logger.warning(f"不支持的节点类型: {node_type_str},仅支持VARIABLE和METHOD") continue - + try: node_type = NodeType[node_type_str] except KeyError: logger.warning(f"无效的节点类型: {node_type_str}") continue - + # 对于VARIABLE节点,必须指定数据类型 if node_type == NodeType.VARIABLE: if not data_type_str or pd.isna(data_type_str): logger.warning(f"变量节点 {name} 必须指定数据类型") continue - + try: data_type = DataType[data_type_str] except KeyError: @@ -275,14 +292,14 @@ class BaseClient(UniversalDriver): data_type = DataType[data_type_str] except KeyError: logger.warning(f"无效的数据类型: {data_type_str},将使用默认值") - + # 创建节点对象,节点ID留空,将通过自动查找功能获取 nodes.append(OpcUaNode( name=name, node_type=node_type, data_type=data_type )) - + # 返回节点列表和名称映射字典 return nodes, name_mapping, reverse_mapping @@ -296,15 +313,19 @@ class BaseClient(UniversalDriver): if name in self._name_mapping: chinese_name = self._name_mapping[name] if chinese_name in self._node_registry: - return self._node_registry[chinese_name] + node = self._node_registry[chinese_name] + logger.debug(f"使用节点: '{name}' -> '{chinese_name}', NodeId: {node.node_id}") + return node elif chinese_name in self._variables_to_find: logger.warning(f"节点 {chinese_name} (英文名: {name}) 尚未找到,尝试重新查找") if self.client: self._find_nodes() if chinese_name in self._node_registry: - return self._node_registry[chinese_name] + node = self._node_registry[chinese_name] + logger.info(f"重新查找成功: '{chinese_name}', NodeId: {node.node_id}") + return node raise ValueError(f'节点 {chinese_name} (英文名: {name}) 未注册或未找到') - + # 直接使用原始名称查找 if name not in self._node_registry: if name in self._variables_to_find: @@ -312,9 +333,14 @@ class BaseClient(UniversalDriver): if self.client: self._find_nodes() if name in self._node_registry: - return self._node_registry[name] + node = self._node_registry[name] + logger.info(f"重新查找成功: '{name}', NodeId: {node.node_id}") + return node + logger.error(f"❌ 节点 '{name}' 未注册或未找到。已注册节点: {list(self._node_registry.keys())[:5]}...") raise ValueError(f'节点 {name} 未注册或未找到') - return self._node_registry[name] + node = self._node_registry[name] + logger.debug(f"使用节点: '{name}', NodeId: {node.node_id}") + return node def get_node_registry(self) -> Dict[str, OpcUaNodeBase]: return self._node_registry @@ -335,30 +361,32 @@ class BaseClient(UniversalDriver): return self logger.info(f'开始注册 {len(node_list)} 个节点...') + new_nodes_count = 0 for node in node_list: if node is None: continue - + if node.name in self._node_registry: - logger.info(f'节点 {node.name} 已存在') + logger.debug(f'节点 "{node.name}" 已存在于注册表') exist = self._node_registry[node.name] if exist.type != node.node_type: raise ValueError(f'节点 {node.name} 类型 {node.node_type} 与已存在的类型 {exist.type} 不一致') continue - + # 将节点添加到待查找列表 self._variables_to_find[node.name] = { "node_type": node.node_type, "data_type": node.data_type } - logger.info(f'添加节点 {node.name} 到待查找列表') + new_nodes_count += 1 + logger.debug(f'添加节点 "{node.name}" ({node.node_type}) 到待查找列表') + + logger.info(f'节点注册完成:新增 {new_nodes_count} 个待查找节点,总计 {len(self._variables_to_find)} 个') - logger.info('节点注册完成') - # 如果客户端已连接,立即开始查找 if self.client: self._find_nodes() - + return self def run_opcua_workflow(self, workflow: OpcUaWorkflow) -> None: @@ -449,7 +477,7 @@ class BaseClient(UniversalDriver): def create_node_function(self, func_name: str = None, node_name: str = None, mode: str = None, value: Any = None, **kwargs) -> Callable[[Callable[[str], OpcUaNodeBase]], bool]: def execute_node_function(use_node: Callable[[str], OpcUaNodeBase]) -> Union[bool, Tuple[Any, bool]]: target_node = use_node(node_name) - + # 检查是否有对应的参数值可用 current_value = value if hasattr(self, '_workflow_params') and func_name in self._workflow_params: @@ -457,19 +485,19 @@ class BaseClient(UniversalDriver): print(f"使用参数值 {func_name} = {current_value}") else: print(f"执行 {node_name}, {type(target_node).__name__}, {target_node.node_id}, {mode}, {current_value}") - + if mode == 'read': result_str = self.read_node(node_name) - + try: # 将字符串转换为字典 result_str = result_str.replace("'", '"') # 替换单引号为双引号以便JSON解析 result_dict = json.loads(result_str) - + # 从字典获取值和错误标志 val = result_dict.get("value") err = result_dict.get("error") - + print(f"读取 {node_name} 返回值 = {val} (类型: {type(val).__name__}), 错误 = {err}") return val, err except Exception as e: @@ -479,7 +507,7 @@ class BaseClient(UniversalDriver): # 构造完整的JSON输入,包含node_name和value input_json = json.dumps({"node_name": node_name, "value": current_value}) result_str = self.write_node(input_json) - + try: # 解析返回的字符串为字典 result_str = result_str.replace("'", '"') # 替换单引号为双引号以便JSON解析 @@ -496,19 +524,19 @@ class BaseClient(UniversalDriver): print(f"调用方法 {node_name} 参数 = {args}, 返回值 = {result}") return result return False - + if func_name is None: func_name = f"{node_name}_{mode}_{str(value)}" - + print(f"创建 node function: {mode}, {func_name}") self.function_name[func_name] = execute_node_function - + return execute_node_function - + def create_init_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None): """ 创建初始化函数 - + 参数: func_name: 函数名称 write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2} @@ -516,17 +544,27 @@ class BaseClient(UniversalDriver): """ if write_nodes is None: raise ValueError("必须提供write_nodes参数") - + def execute_init_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool: + """根据 _workflow_params 为各节点写入真实数值。 + + 约定: + - write_nodes 为 list 时: 节点名 == 参数名,从 _workflow_params[node_name] 取值; + - write_nodes 为 dict 时: + * value 为字符串且在 _workflow_params 中: 当作参数名去取值; + * 否则 value 视为常量直接写入。 + """ + + params = getattr(self, "_workflow_params", {}) or {} + if isinstance(write_nodes, list): - # 处理节点列表 + # 节点列表形式: 节点名与参数名一致 for node_name in write_nodes: - # 尝试从参数中获取同名参数的值 - current_value = True # 默认值 - if hasattr(self, '_workflow_params') and node_name in self._workflow_params: - current_value = self._workflow_params[node_name] - print(f"初始化函数: 从参数获取值 {node_name} = {current_value}") - + if node_name not in params: + print(f"初始化函数: 参数中未找到 {node_name}, 跳过写入") + continue + + current_value = params[node_name] print(f"初始化函数: 写入节点 {node_name} = {current_value}") input_json = json.dumps({"node_name": node_name, "value": current_value}) result_str = self.write_node(input_json) @@ -538,14 +576,15 @@ class BaseClient(UniversalDriver): except Exception as e: print(f"初始化函数: 解析写入结果失败: {e}, 原始结果: {result_str}") elif isinstance(write_nodes, dict): - # 处理节点字典,使用指定的值 + # 映射形式: 节点名 -> 参数名或常量 for node_name, node_value in write_nodes.items(): - # 检查值是否是字符串类型的参数名 - current_value = node_value - if isinstance(node_value, str) and hasattr(self, '_workflow_params') and node_value in self._workflow_params: - current_value = self._workflow_params[node_value] + if isinstance(node_value, str) and node_value in params: + current_value = params[node_value] print(f"初始化函数: 从参数获取值 {node_value} = {current_value}") - + else: + current_value = node_value + print(f"初始化函数: 使用常量值 写入 {node_name} = {current_value}") + print(f"初始化函数: 写入节点 {node_name} = {current_value}") input_json = json.dumps({"node_name": node_name, "value": current_value}) result_str = self.write_node(input_json) @@ -557,25 +596,25 @@ class BaseClient(UniversalDriver): except Exception as e: print(f"初始化函数: 解析写入结果失败: {e}, 原始结果: {result_str}") return True - + if func_name is None: func_name = f"init_function_{str(time.time())}" - + print(f"创建初始化函数: {func_name}") self.function_name[func_name] = execute_init_function return execute_init_function - + def create_stop_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None): """ 创建停止函数 - + 参数: func_name: 函数名称 write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2} """ if write_nodes is None: raise ValueError("必须提供write_nodes参数") - + def execute_stop_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool: if isinstance(write_nodes, list): # 处理节点列表,默认值都是False @@ -605,25 +644,25 @@ class BaseClient(UniversalDriver): except Exception as e: print(f"停止函数: 解析写入结果失败: {e}, 原始结果: {result_str}") return True - + if func_name is None: func_name = f"stop_function_{str(time.time())}" - + print(f"创建停止函数: {func_name}") self.function_name[func_name] = execute_stop_function return execute_stop_function - + def create_cleanup_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None): """ 创建清理函数 - + 参数: func_name: 函数名称 write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2} """ if write_nodes is None: raise ValueError("必须提供write_nodes参数") - + def execute_cleanup_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool: if isinstance(write_nodes, list): # 处理节点列表,默认值都是False @@ -653,10 +692,10 @@ class BaseClient(UniversalDriver): except Exception as e: print(f"清理函数: 解析写入结果失败: {e}, 原始结果: {result_str}") return True - + if func_name is None: func_name = f"cleanup_function_{str(time.time())}" - + print(f"创建清理函数: {func_name}") self.function_name[func_name] = execute_cleanup_function return execute_cleanup_function @@ -664,7 +703,7 @@ class BaseClient(UniversalDriver): def create_start_function(self, func_name: str, stop_condition_expression: str = "True", write_nodes: Union[Dict[str, Any], List[str]] = None, condition_nodes: Union[Dict[str, str], List[str]] = None): """ 创建开始函数 - + 参数: func_name: 函数名称 stop_condition_expression: 停止条件表达式,可直接引用节点名称 @@ -672,20 +711,20 @@ class BaseClient(UniversalDriver): condition_nodes: 条件节点列表 [节点名1, 节点名2] """ def execute_start_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool: - # 直接处理写入节点 + """开始函数: 写入触发节点, 然后轮询条件节点直到满足停止条件。""" + + params = getattr(self, "_workflow_params", {}) or {} + + # 先处理写入节点(触发位等) if write_nodes: if isinstance(write_nodes, list): - # 处理节点列表,默认值都是True - for i, node_name in enumerate(write_nodes): - # 尝试获取与节点对应的参数值 - param_name = f"write_{i}" - - # 获取参数值(如果有) - current_value = True # 默认值 - if hasattr(self, '_workflow_params') and param_name in self._workflow_params: - current_value = self._workflow_params[param_name] - - # 直接写入节点 + # 列表形式: 节点名与参数名一致, 若无参数则直接写 True + for node_name in write_nodes: + if node_name in params: + current_value = params[node_name] + else: + current_value = True + print(f"直接写入节点 {node_name} = {current_value}") input_json = json.dumps({"node_name": node_name, "value": current_value}) result_str = self.write_node(input_json) @@ -697,14 +736,13 @@ class BaseClient(UniversalDriver): except Exception as e: print(f"解析直接写入结果失败: {e}, 原始结果: {result_str}") elif isinstance(write_nodes, dict): - # 处理节点字典,值是指定的 + # 字典形式: 节点名 -> 常量值(如 True/False) for node_name, node_value in write_nodes.items(): - # 尝试获取参数值(如果节点名与参数名匹配) - current_value = node_value # 使用指定的默认值 - if hasattr(self, '_workflow_params') and node_name in self._workflow_params: - current_value = self._workflow_params[node_name] - - # 直接写入节点 + if node_name in params: + current_value = params[node_name] + else: + current_value = node_value + print(f"直接写入节点 {node_name} = {current_value}") input_json = json.dumps({"node_name": node_name, "value": current_value}) result_str = self.write_node(input_json) @@ -715,16 +753,16 @@ class BaseClient(UniversalDriver): print(f"直接写入 {node_name} = {current_value}, 结果: {success}") except Exception as e: print(f"解析直接写入结果失败: {e}, 原始结果: {result_str}") - + # 如果没有条件节点,立即返回 if not condition_nodes: return True - + # 处理条件检查和等待 while True: next_loop = False condition_source = {} - + # 直接读取条件节点 if isinstance(condition_nodes, list): # 处理节点列表 @@ -732,16 +770,17 @@ class BaseClient(UniversalDriver): # 直接读取节点 result_str = self.read_node(node_name) try: + time.sleep(1) result_str = result_str.replace("'", '"') result_dict = json.loads(result_str) read_res = result_dict.get("value") read_err = result_dict.get("error", False) print(f"直接读取 {node_name} 返回值 = {read_res}, 错误 = {read_err}") - + if read_err: next_loop = True break - + # 将节点值存入条件源字典,使用节点名称作为键 condition_source[node_name] = read_res # 为了向后兼容,也保留read_i格式 @@ -762,11 +801,11 @@ class BaseClient(UniversalDriver): read_res = result_dict.get("value") read_err = result_dict.get("error", False) print(f"直接读取 {node_name} 返回值 = {read_res}, 错误 = {read_err}") - + if read_err: next_loop = True break - + # 将节点值存入条件源字典 condition_source[node_name] = read_res # 也保存使用函数名作为键 @@ -775,13 +814,13 @@ class BaseClient(UniversalDriver): print(f"解析直接读取结果失败: {e}, 原始结果: {result_str}") next_loop = True break - + if not next_loop: if stop_condition_expression: # 添加调试信息 print(f"条件源数据: {condition_source}") condition_source["__RESULT"] = None - + # 确保安全地执行条件表达式 try: # 先尝试使用eval更安全的方式计算表达式 @@ -795,10 +834,10 @@ class BaseClient(UniversalDriver): except Exception as e2: print(f"使用exec执行表达式也失败: {e2}") condition_source["__RESULT"] = False - + res = condition_source["__RESULT"] print(f"取得计算结果: {res}, 条件表达式: {stop_condition_expression}") - + if res: print("满足停止条件,结束工作流") break @@ -807,21 +846,21 @@ class BaseClient(UniversalDriver): break else: time.sleep(0.3) - + return True - + self.function_name[func_name] = execute_start_function return execute_start_function create_action_from_json = None - + def create_action_from_json(self, data: Union[Dict, Any]) -> WorkflowAction: """ 从JSON配置创建工作流动作 - + 参数: data: 动作JSON数据 - + 返回: WorkflowAction对象 """ @@ -832,7 +871,7 @@ class BaseClient(UniversalDriver): stop_function = None init_function = None cleanup_function = None - + # 提取start_function相关信息 if hasattr(data, "start_function") and data.start_function: start_function = data.start_function @@ -846,31 +885,31 @@ class BaseClient(UniversalDriver): write_nodes = start_function["write_nodes"] if "condition_nodes" in start_function: condition_nodes = start_function["condition_nodes"] - + # 提取stop_function信息 if hasattr(data, "stop_function") and data.stop_function: stop_function = data.stop_function elif isinstance(data, dict) and data.get("stop_function"): stop_function = data.get("stop_function") - + # 提取init_function信息 if hasattr(data, "init_function") and data.init_function: init_function = data.init_function elif isinstance(data, dict) and data.get("init_function"): init_function = data.get("init_function") - + # 提取cleanup_function信息 if hasattr(data, "cleanup_function") and data.cleanup_function: cleanup_function = data.cleanup_function elif isinstance(data, dict) and data.get("cleanup_function"): cleanup_function = data.get("cleanup_function") - + # 创建工作流动作组件 init = None start = None stop = None cleanup = None - + # 处理init function if init_function: init_params = {"func_name": init_function.get("func_name")} @@ -879,9 +918,9 @@ class BaseClient(UniversalDriver): else: # 如果没有write_nodes,创建一个空字典 init_params["write_nodes"] = {} - + init = self.create_init_function(**init_params) - + # 处理start function if start_function: start_params = { @@ -891,7 +930,7 @@ class BaseClient(UniversalDriver): "condition_nodes": condition_nodes } start = self.create_start_function(**start_params) - + # 处理stop function if stop_function: stop_params = { @@ -899,7 +938,7 @@ class BaseClient(UniversalDriver): "write_nodes": stop_function.get("write_nodes", {}) } stop = self.create_stop_function(**stop_params) - + # 处理cleanup function if cleanup_function: cleanup_params = { @@ -907,22 +946,22 @@ class BaseClient(UniversalDriver): "write_nodes": cleanup_function.get("write_nodes", {}) } cleanup = self.create_cleanup_function(**cleanup_params) - + return WorkflowAction(init=init, start=start, stop=stop, cleanup=cleanup) - + workflow_name: Dict[str, OpcUaWorkflowModel] = {} def create_workflow_from_json(self, data: List[Dict]) -> None: """ 从JSON配置创建工作流程序 - + 参数: data: 工作流配置列表 """ for ind, flow_dict in enumerate(data): print(f"正在创建 workflow {ind}, {flow_dict['name']}") actions = [] - + for i in flow_dict["action"]: if isinstance(i, str): print(f"沿用已有 workflow 作为 action: {i}") @@ -931,14 +970,14 @@ class BaseClient(UniversalDriver): print("创建 action") # 直接将字典转换为SimplifiedActionJson对象或直接使用字典 action = self.create_action_from_json(i) - + actions.append(action) - + # 获取参数 parameters = flow_dict.get("parameters", []) - + flow_instance = OpcUaWorkflowModel( - name=flow_dict["name"], + name=flow_dict["name"], actions=actions, parameters=parameters, description=flow_dict.get("description", "") @@ -963,19 +1002,19 @@ class BaseClient(UniversalDriver): register_params = data.register_node_list_from_csv_path create_flow = data.create_flow execute_flow = data.execute_flow if hasattr(data, "execute_flow") else [] - + # 注册节点 if register_params: print(f"注册节点 csv: {register_params}") self.register_node_list_from_csv_path(**register_params) - + # 创建工作流 print("创建工作流") self.create_workflow_from_json(create_flow) - + # 注册工作流为实例方法 self.register_workflows_as_methods() - + # 如果存在execute_flow字段,则执行指定的工作流(向后兼容) if execute_flow: print("执行工作流") @@ -987,12 +1026,12 @@ class BaseClient(UniversalDriver): # 获取工作流的参数信息(如果存在) workflow_params = getattr(workflow, 'parameters', []) or [] workflow_desc = getattr(workflow, 'description', None) or f"执行工作流: {workflow_name}" - + # 创建执行工作流的方法 def create_workflow_method(wf_name=workflow_name, wf=workflow, params=workflow_params): def workflow_method(*args, **kwargs): logger.info(f"执行工作流: {wf_name}, 参数: {args}, {kwargs}") - + # 处理传入的参数 if params and (args or kwargs): # 将位置参数转换为关键字参数 @@ -1000,31 +1039,31 @@ class BaseClient(UniversalDriver): for i, param_name in enumerate(params): if i < len(args): params_dict[param_name] = args[i] - + # 合并关键字参数 params_dict.update(kwargs) - + # 保存参数,供节点函数使用 self._workflow_params = params_dict else: self._workflow_params = {} - + # 执行工作流 result = self.run_opcua_workflow_model(wf) - + # 清理参数 self._workflow_params = {} - + return result - + # 设置方法的文档字符串 workflow_method.__doc__ = workflow_desc if params: param_doc = ", ".join(params) workflow_method.__doc__ += f"\n参数: {param_doc}" - + return workflow_method - + # 注册为实例方法 method = create_workflow_method() setattr(self, workflow_name, method) @@ -1035,32 +1074,34 @@ class BaseClient(UniversalDriver): 读取节点值的便捷方法 返回包含result字段的字典 """ - try: - node = self.use_node(node_name) - value, error = node.read() - - # 创建结果字典 - result = { - "value": value, - "error": error, - "node_name": node_name, - "timestamp": time.time() - } - - # 返回JSON字符串 - return json.dumps(result) - except Exception as e: - logger.error(f"读取节点 {node_name} 失败: {e}") - # 创建错误结果字典 - result = { - "value": None, - "error": True, - "node_name": node_name, - "error_message": str(e), - "timestamp": time.time() - } - return json.dumps(result) - + # 使用锁保护客户端访问 + with self._client_lock: + try: + node = self.use_node(node_name) + value, error = node.read() + + # 创建结果字典 + result = { + "value": value, + "error": error, + "node_name": node_name, + "timestamp": time.time() + } + + # 返回JSON字符串 + return json.dumps(result) + except Exception as e: + logger.error(f"读取节点 {node_name} 失败: {e}") + # 创建错误结果字典 + result = { + "value": None, + "error": True, + "node_name": node_name, + "error_message": str(e), + "timestamp": time.time() + } + return json.dumps(result) + def write_node(self, json_input: str) -> str: """ 写入节点值的便捷方法 @@ -1068,48 +1109,50 @@ class BaseClient(UniversalDriver): eg:'{\"node_name\":\"反应罐号码\",\"value\":\"2\"}' 返回JSON格式的字符串,包含操作结果 """ - try: - # 解析JSON格式的输入 - if not isinstance(json_input, str): - json_input = str(json_input) - + # 使用锁保护客户端访问 + with self._client_lock: try: - input_data = json.loads(json_input) - if not isinstance(input_data, dict): - return json.dumps({"error": True, "error_message": "输入必须是包含node_name和value的JSON对象", "success": False}) - - # 从JSON中提取节点名称和值 - node_name = input_data.get("node_name") - value = input_data.get("value") - - if node_name is None: - return json.dumps({"error": True, "error_message": "JSON中缺少node_name字段", "success": False}) - except json.JSONDecodeError as e: - return json.dumps({"error": True, "error_message": f"JSON解析错误: {str(e)}", "success": False}) - - node = self.use_node(node_name) - error = node.write(value) - - # 创建结果字典 - result = { - "value": value, - "error": error, - "node_name": node_name, - "timestamp": time.time(), - "success": not error - } - - return json.dumps(result) - except Exception as e: - logger.error(f"写入节点失败: {e}") - result = { - "error": True, - "error_message": str(e), - "timestamp": time.time(), - "success": False - } - return json.dumps(result) - + # 解析JSON格式的输入 + if not isinstance(json_input, str): + json_input = str(json_input) + + try: + input_data = json.loads(json_input) + if not isinstance(input_data, dict): + return json.dumps({"error": True, "error_message": "输入必须是包含node_name和value的JSON对象", "success": False}) + + # 从JSON中提取节点名称和值 + node_name = input_data.get("node_name") + value = input_data.get("value") + + if node_name is None: + return json.dumps({"error": True, "error_message": "JSON中缺少node_name字段", "success": False}) + except json.JSONDecodeError as e: + return json.dumps({"error": True, "error_message": f"JSON解析错误: {str(e)}", "success": False}) + + node = self.use_node(node_name) + error = node.write(value) + + # 创建结果字典 + result = { + "value": value, + "error": error, + "node_name": node_name, + "timestamp": time.time(), + "success": not error + } + + return json.dumps(result) + except Exception as e: + logger.error(f"写入节点失败: {e}") + result = { + "error": True, + "error_message": str(e), + "timestamp": time.time(), + "success": False + } + return json.dumps(result) + def call_method(self, node_name: str, *args) -> Tuple[Any, bool]: """ 调用方法节点的便捷方法 @@ -1132,34 +1175,45 @@ class OpcUaClient(BaseClient): # 降低OPCUA库的日志级别 import logging logging.getLogger("opcua").setLevel(logging.WARNING) - + super().__init__() - + client = Client(url) - + if username and password: client.set_user(username) client.set_password(password) - + self._set_client(client) self._connect() - + # 节点值缓存和刷新相关属性 self._node_values = {} # 缓存节点值 self._refresh_interval = refresh_interval # 刷新间隔(秒) self._refresh_running = False self._refresh_thread = None - + + # 添加线程锁,保护OPC UA客户端的并发访问 + import threading + self._client_lock = threading.RLock() + # 如果提供了配置文件路径,则加载配置并注册工作流 if config_path: self.load_config(config_path) - - # 启动节点值刷新线程 - self.start_node_refresh() - + + # 延迟启动节点值刷新线程,确保节点查找完成 + # 注意:刷新线程会在所有节点注册后由load_config调用 + # 暂时不在这里启动,避免在节点未找到时就开始刷新 + # self.start_node_refresh() + def _register_nodes_as_attributes(self): """将所有节点注册为实例属性,可以通过self.node_name访问""" for node_name, node in self._node_registry.items(): + # 检查node_id是否有效 + if not node.node_id or node.node_id == "": + logger.warning(f"⚠ 节点 '{node_name}' 的 node_id 为空,跳过注册为属性") + continue + # 检查是否有对应的英文名称 eng_name = self._reverse_mapping.get(node_name) if eng_name: @@ -1168,7 +1222,7 @@ class OpcUaClient(BaseClient): else: # 如果没有对应的英文名称,使用原始名称,但替换空格和特殊字符 attr_name = node_name.replace(' ', '_').replace('-', '_') - + # 创建获取节点值的属性方法,使用中文名称获取节点值 def create_property_getter(node_key): def getter(self): @@ -1179,34 +1233,41 @@ class OpcUaClient(BaseClient): value, _ = self.use_node(node_key).read() return value return getter - + # 使用property装饰器将方法注册为类属性 setattr(OpcUaClient, attr_name, property(create_property_getter(node_name))) - logger.info(f"已注册节点 '{node_name}' 为属性 '{attr_name}'") - + logger.debug(f"已注册节点 '{node_name}' 为属性 '{attr_name}', NodeId: {node.node_id}") + def refresh_node_values(self): """刷新所有节点的值到缓存""" if not self.client: logger.warning("客户端未初始化,无法刷新节点值") return - - try: - # 简单检查连接状态,如果不连接会抛出异常 - self.client.get_namespace_array() - except Exception as e: - logger.warning(f"客户端连接异常,无法刷新节点值: {e}") - return - - for node_name, node in self._node_registry.items(): + + # 使用锁保护客户端访问 + with self._client_lock: try: - if hasattr(node, 'read'): - value, error = node.read() - if not error: - self._node_values[node_name] = value - #logger.debug(f"已刷新节点 '{node_name}' 的值: {value}") + # 简单检查连接状态,如果不连接会抛出异常 + self.client.get_namespace_array() except Exception as e: - logger.error(f"刷新节点 '{node_name}' 失败: {e}") - + logger.warning(f"客户端连接异常,无法刷新节点值: {e}") + return + + for node_name, node in self._node_registry.items(): + try: + # 跳过node_id为空的节点 + if not node.node_id or node.node_id == "": + continue + + if hasattr(node, 'read'): + value, error = node.read() + if not error: + self._node_values[node_name] = value + #logger.debug(f"已刷新节点 '{node_name}' 的值: {value}") + except Exception as e: + # 降低日志级别,避免刷新线程产生大量错误日志 + logger.debug(f"刷新节点 '{node_name}' 失败: {e}") + def get_node_value(self, name): """获取节点值,支持中文名和英文名""" # 如果提供的是英文名,转换为中文名 @@ -1228,7 +1289,7 @@ class OpcUaClient(BaseClient): return value else: raise ValueError(f"未找到名称为 '{name}' 的节点") - + def set_node_value(self, name, value): """设置节点值,支持中文名和英文名""" # 如果提供的是英文名,转换为中文名 @@ -1240,7 +1301,7 @@ class OpcUaClient(BaseClient): node = self.use_node(name) else: raise ValueError(f"未找到名称为 '{name}' 的节点") - + # 写入值 error = node.write(value) if not error: @@ -1249,49 +1310,49 @@ class OpcUaClient(BaseClient): self._node_values[node.name] = value return True return False - + def _refresh_worker(self): """节点值刷新线程的工作函数""" self._refresh_running = True logger.info(f"节点值刷新线程已启动,刷新间隔: {self._refresh_interval}秒") - + while self._refresh_running: try: self.refresh_node_values() except Exception as e: logger.error(f"节点值刷新过程出错: {e}") - + # 等待下一次刷新 time.sleep(self._refresh_interval) - + def start_node_refresh(self): """启动节点值刷新线程""" if self._refresh_thread is not None and self._refresh_thread.is_alive(): logger.warning("节点值刷新线程已在运行") return - + import threading self._refresh_thread = threading.Thread(target=self._refresh_worker, daemon=True) self._refresh_thread.start() - + def stop_node_refresh(self): """停止节点值刷新线程""" self._refresh_running = False if self._refresh_thread and self._refresh_thread.is_alive(): self._refresh_thread.join(timeout=2.0) logger.info("节点值刷新线程已停止") - + def load_config(self, config_path: str) -> None: """从JSON配置文件加载并注册工作流""" try: with open(config_path, 'r', encoding='utf-8') as f: config_data = json.load(f) - + # 处理节点注册 if "register_node_list_from_csv_path" in config_data: # 获取配置文件所在目录 config_dir = os.path.dirname(os.path.abspath(config_path)) - + # 处理CSV路径,如果是相对路径,则相对于配置文件所在目录 if "path" in config_data["register_node_list_from_csv_path"]: csv_path = config_data["register_node_list_from_csv_path"]["path"] @@ -1299,29 +1360,134 @@ class OpcUaClient(BaseClient): # 转换为绝对路径 csv_path = os.path.join(config_dir, csv_path) config_data["register_node_list_from_csv_path"]["path"] = csv_path - + # 直接使用字典 self.register_node_list_from_csv_path(**config_data["register_node_list_from_csv_path"]) - + + # 立即执行节点查找(如果客户端已连接) + if self.client and self._variables_to_find: + logger.info("CSV加载完成,开始查找服务器节点...") + self._find_nodes() + # 处理工作流创建 if "create_flow" in config_data: # 直接传递字典列表 self.create_workflow_from_json(config_data["create_flow"]) # 将工作流注册为实例方法 self.register_workflows_as_methods() - - # 将所有节点注册为属性 + + # 将所有节点注册为属性(只注册已找到的节点) self._register_nodes_as_attributes() - + + # 打印节点注册统计 + found_count = len(self._node_registry) + total_count = len(self._variables_to_find) + if found_count < total_count: + logger.warning(f"节点查找完成:找到 {found_count}/{total_count} 个节点") + logger.warning(f"有 {total_count - found_count} 个节点未找到,这些节点的操作将会失败") + else: + logger.info(f"✓ 节点查找完成:所有 {found_count} 个节点均已找到") + + # 现在启动节点值刷新线程 + # self.start_node_refresh() + logger.info(f"成功从 {config_path} 加载配置") except Exception as e: logger.error(f"加载配置文件 {config_path} 失败: {e}") traceback.print_exc() - + + def print_node_registry_status(self): + """打印节点注册状态,用于调试""" + print("\n" + "="*80) + print("节点注册状态诊断报告") + print("="*80) + print(f"\n待查找节点总数: {len(self._variables_to_find)}") + print(f"已找到节点总数: {len(self._node_registry)}") + print(f"未找到节点总数: {len(self._variables_to_find) - len(self._node_registry)}") + + # 显示已找到的节点(前10个) + if self._node_registry: + print(f"\n✓ 已找到的节点 (显示前10个):") + for i, (name, node) in enumerate(list(self._node_registry.items())[:10]): + eng_name = self._reverse_mapping.get(name, "") + eng_info = f" ({eng_name})" if eng_name else "" + print(f" {i+1}. '{name}'{eng_info}") + print(f" NodeId: {node.node_id}") + print(f" Type: {node.type}") + + # 显示未找到的节点 + not_found = [name for name in self._variables_to_find if name not in self._node_registry] + if not_found: + print(f"\n✗ 未找到的节点 (显示前20个):") + for i, name in enumerate(not_found[:20]): + eng_name = self._reverse_mapping.get(name, "") + eng_info = f" ({eng_name})" if eng_name else "" + node_info = self._variables_to_find[name] + print(f" {i+1}. '{name}'{eng_info} - {node_info['node_type']}") + + print("\n" + "="*80) + print("提示:") + print("1. 如果大量节点未找到,请检查CSV中的节点名称是否与服务器BrowseName完全匹配") + print("2. 可以使用 client.browse_server_nodes() 查看服务器的实际节点结构") + print("3. 节点名称区分大小写,且包括所有空格和特殊字符") + print("="*80 + "\n") + + def browse_server_nodes(self, max_depth=3, start_path=["0:Objects"]): + """浏览服务器节点树,用于调试和对比""" + if not self.client: + print("客户端未连接") + return + + print("\n" + "="*80) + print(f"服务器节点浏览 (最大深度: {max_depth})") + print("="*80 + "\n") + + try: + root = self.client.get_root_node() + start_node = root.get_child(start_path) + self._browse_node_recursive(start_node, depth=0, max_depth=max_depth) + except Exception as e: + print(f"浏览失败: {e}") + traceback.print_exc() + + def _browse_node_recursive(self, node, depth=0, max_depth=3): + """递归浏览节点""" + if depth > max_depth: + return + + try: + browse_name = node.get_browse_name() + node_class = node.get_node_class() + indent = " " * depth + + # 显示节点信息 + print(f"{indent}├─ {browse_name.Name}") + print(f"{indent}│ NodeId: {str(node.nodeid)}") + print(f"{indent}│ NodeClass: {node_class}") + + # 如果是变量,显示数据类型 + if node_class == NodeClass.Variable: + try: + data_type = node.get_data_type() + print(f"{indent}│ DataType: {data_type}") + except: + pass + + # 递归处理子节点(限制数量避免输出过多) + if depth < max_depth: + children = node.get_children() + for i, child in enumerate(children[:20]): # 每层最多显示20个子节点 + self._browse_node_recursive(child, depth + 1, max_depth) + if len(children) > 20: + print(f"{indent} ... ({len(children) - 20} more children)") + except Exception as e: + # 忽略单个节点的错误 + pass + def disconnect(self): # 停止刷新线程 self.stop_node_refresh() - + if self.client: self.client.disconnect() logger.info("OPC UA client disconnected") @@ -1329,52 +1495,53 @@ class OpcUaClient(BaseClient): if __name__ == '__main__': # 示例用法 - + # 使用配置文件创建客户端并自动注册工作流 import os current_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(current_dir, "opcua_huairou.json") - + # 创建OPC UA客户端并加载配置 try: client = OpcUaClient( - url="opc.tcp://localhost:4840/freeopcua/server/", # 替换为实际的OPC UA服务器地址 - config_path=config_path # 传入配置文件路径 + url="opc.tcp://192.168.1.88:4840/freeopcua/server/", # 替换为实际的OPC UA服务器地址 + config_path="D:\\Uni-Lab-OS\\unilabos\\device_comms\\opcua_client\\opcua_huairou.json" # 传入配置文件路径 ) - + # 列出所有已注册的工作流 print("\n已注册的工作流:") for workflow_name in client.workflow_name: print(f" - {workflow_name}") - + # 测试trigger_grab_action工作流 - 使用英文参数名 print("\n测试trigger_grab_action工作流 - 使用英文参数名:") - client.trigger_grab_action(reaction_tank_number=2, raw_tank_number=3) - + client.trigger_grab_action(reaction_tank_number=2, raw_tank_number=2) + # client.set_node_value("reaction_tank_number", 2) + + # 读取节点值 - 使用英文节点名 grab_complete = client.get_node_value("grab_complete") reaction_tank = client.get_node_value("reaction_tank_number") raw_tank = client.get_node_value("raw_tank_number") - print(f"\n执行后状态检查 (使用英文节点名):") print(f" - 抓取完成状态: {grab_complete}") print(f" - 当前反应罐号码: {reaction_tank}") print(f" - 当前原料罐号码: {raw_tank}") - + # 测试节点值写入 - 使用英文节点名 print("\n测试节点值写入 (使用英文节点名):") success = client.set_node_value("atomization_fast_speed", 150.5) print(f" - 写入搅拌浆雾化快速 = 150.5, 结果: {success}") - + # 读取写入的值 atomization_speed = client.get_node_value("atomization_fast_speed") print(f" - 读取搅拌浆雾化快速: {atomization_speed}") - + # 断开连接 client.disconnect() - + except Exception as e: print(f"错误: {e}") traceback.print_exc() - - + + diff --git a/unilabos/device_comms/opcua_client/node/uniopcua.py b/unilabos/device_comms/opcua_client/node/uniopcua.py index ce16cfc..a06d780 100644 --- a/unilabos/device_comms/opcua_client/node/uniopcua.py +++ b/unilabos/device_comms/opcua_client/node/uniopcua.py @@ -3,7 +3,7 @@ from enum import Enum from abc import ABC, abstractmethod from typing import Tuple, Union, Optional, Any, List -from opcua import Client, Node +from opcua import Client, Node, ua from opcua.ua import NodeId, NodeClass, VariantType @@ -43,27 +43,72 @@ class Base(ABC): self._type = typ self._data_type = data_type self._node: Optional[Node] = None - + def _get_node(self) -> Node: if self._node is None: try: - # 检查是否是NumericNodeId(ns=X;i=Y)格式 - if "NumericNodeId" in self._node_id: - # 从字符串中提取命名空间和标识符 - import re - match = re.search(r'ns=(\d+);i=(\d+)', self._node_id) - if match: - ns = int(match.group(1)) - identifier = int(match.group(2)) - node_id = NodeId(identifier, ns) - self._node = self._client.get_node(node_id) + # 尝试多种 NodeId 字符串格式解析,兼容不同服务器/库的输出 + # 可能的格式示例: 'ns=2;i=1234', 'ns=2;s=SomeString', + # 'StringNodeId(ns=4;s=OPC|变量名)', 'NumericNodeId(ns=2;i=1234)' 等 + import re + + nid = self._node_id + # 如果已经是 NodeId/Node 对象(库用户可能传入),直接使用 + try: + from opcua.ua import NodeId as UaNodeId + if isinstance(nid, UaNodeId): + self._node = self._client.get_node(nid) + return self._node + except Exception: + # 若导入或类型判断失败,则继续下一步 + pass + + # 直接以字符串形式处理 + if isinstance(nid, str): + nid = nid.strip() + + # 处理包含类名的格式,如 'StringNodeId(ns=4;s=...)' 或 'NumericNodeId(ns=2;i=...)' + # 提取括号内的内容 + match_wrapped = re.match(r'(String|Numeric|Byte|Guid|TwoByteNode|FourByteNode)NodeId\((.*)\)', nid) + if match_wrapped: + # 提取括号内的实际 node_id 字符串 + nid = match_wrapped.group(2).strip() + + # 常见短格式 'ns=2;i=1234' 或 'ns=2;s=SomeString' + if re.match(r'^ns=\d+;[is]=', nid): + self._node = self._client.get_node(nid) else: - raise ValueError(f"无法解析节点ID: {self._node_id}") + # 尝试提取 ns 和 i 或 s + # 对于字符串标识符,可能包含特殊字符,使用非贪婪匹配 + m_num = re.search(r'ns=(\d+);i=(\d+)', nid) + m_str = re.search(r'ns=(\d+);s=(.+?)(?:\)|$)', nid) + if m_num: + ns = int(m_num.group(1)) + identifier = int(m_num.group(2)) + node_id = NodeId(identifier, ns) + self._node = self._client.get_node(node_id) + elif m_str: + ns = int(m_str.group(1)) + identifier = m_str.group(2).strip() + # 对于字符串标识符,直接使用字符串格式 + node_id_str = f"ns={ns};s={identifier}" + self._node = self._client.get_node(node_id_str) + else: + # 回退:尝试直接传入字符串(有些实现接受其它格式) + try: + self._node = self._client.get_node(self._node_id) + except Exception as e: + # 输出更详细的错误信息供调试 + print(f"获取节点失败(尝试直接字符串): {self._node_id}, 错误: {e}") + raise else: - # 直接使用节点ID字符串 + # 非字符串,尝试直接使用 self._node = self._client.get_node(self._node_id) except Exception as e: print(f"获取节点失败: {self._node_id}, 错误: {e}") + # 添加额外提示,帮助定位 BadNodeIdUnknown 问题 + print("提示: 请确认该 node_id 是否来自当前连接的服务器地址空间," \ + "以及 CSV/配置中名称与服务器 BrowseName 是否匹配。") raise return self._node @@ -71,16 +116,16 @@ class Base(ABC): def read(self) -> Tuple[Any, bool]: """读取节点值,返回(值, 是否出错)""" pass - + @abstractmethod def write(self, value: Any) -> bool: """写入节点值,返回是否出错""" pass - + @property def type(self) -> NodeType: return self._type - + @property def node_id(self) -> str: return self._node_id @@ -104,7 +149,56 @@ class Variable(Base): def write(self, value: Any) -> bool: try: - self._get_node().set_value(value) + # 如果声明了数据类型,则尝试转换并使用对应的 Variant 写入 + coerced = value + try: + if self._data_type is not None: + # 基于声明的数据类型做简单类型转换 + dt = self._data_type + if dt in (DataType.SBYTE, DataType.BYTE, DataType.INT16, DataType.UINT16, + DataType.INT32, DataType.UINT32, DataType.INT64, DataType.UINT64): + # 数值类型 -> int + if isinstance(value, str): + coerced = int(value) + else: + coerced = int(value) + elif dt in (DataType.FLOAT, DataType.DOUBLE): + if isinstance(value, str): + coerced = float(value) + else: + coerced = float(value) + elif dt == DataType.BOOLEAN: + if isinstance(value, str): + v = value.strip().lower() + if v in ("true", "1", "yes", "on"): + coerced = True + elif v in ("false", "0", "no", "off"): + coerced = False + else: + coerced = bool(value) + else: + coerced = bool(value) + elif dt == DataType.STRING or dt == DataType.BYTESTRING or dt == DataType.DATETIME: + coerced = str(value) + + # 使用 ua.Variant 明确指定 VariantType + try: + variant = ua.Variant(coerced, dt.value) + self._get_node().set_value(variant) + except Exception: + # 回退:有些 set_value 实现接受 (value, variant_type) + try: + self._get_node().set_value(coerced, dt.value) + except Exception: + # 最后回退到直接写入(保持兼容性) + self._get_node().set_value(coerced) + else: + # 未声明数据类型,直接写入 + self._get_node().set_value(value) + except Exception: + # 若在转换或按数据类型写入失败,尝试直接写入原始值并让上层捕获错误 + self._get_node().set_value(value) + return False except Exception as e: print(f"写入变量 {self._name} 失败: {e}") @@ -116,24 +210,54 @@ class Method(Base): super().__init__(client, name, node_id, NodeType.METHOD, data_type) self._parent_node_id = parent_node_id self._parent_node = None - + def _get_parent_node(self) -> Node: if self._parent_node is None: try: - # 检查是否是NumericNodeId(ns=X;i=Y)格式 - if "NumericNodeId" in self._parent_node_id: - # 从字符串中提取命名空间和标识符 - import re - match = re.search(r'ns=(\d+);i=(\d+)', self._parent_node_id) - if match: - ns = int(match.group(1)) - identifier = int(match.group(2)) - node_id = NodeId(identifier, ns) - self._parent_node = self._client.get_node(node_id) + # 处理父节点ID,使用与_get_node相同的解析逻辑 + import re + + nid = self._parent_node_id + + # 如果已经是 NodeId 对象,直接使用 + try: + from opcua.ua import NodeId as UaNodeId + if isinstance(nid, UaNodeId): + self._parent_node = self._client.get_node(nid) + return self._parent_node + except Exception: + pass + + # 字符串处理 + if isinstance(nid, str): + nid = nid.strip() + + # 处理包含类名的格式 + match_wrapped = re.match(r'(String|Numeric|Byte|Guid|TwoByteNode|FourByteNode)NodeId\((.*)\)', nid) + if match_wrapped: + nid = match_wrapped.group(2).strip() + + # 常见短格式 + if re.match(r'^ns=\d+;[is]=', nid): + self._parent_node = self._client.get_node(nid) else: - raise ValueError(f"无法解析父节点ID: {self._parent_node_id}") + # 提取 ns 和 i 或 s + m_num = re.search(r'ns=(\d+);i=(\d+)', nid) + m_str = re.search(r'ns=(\d+);s=(.+?)(?:\)|$)', nid) + if m_num: + ns = int(m_num.group(1)) + identifier = int(m_num.group(2)) + node_id = NodeId(identifier, ns) + self._parent_node = self._client.get_node(node_id) + elif m_str: + ns = int(m_str.group(1)) + identifier = m_str.group(2).strip() + node_id_str = f"ns={ns};s={identifier}" + self._parent_node = self._client.get_node(node_id_str) + else: + # 回退 + self._parent_node = self._client.get_node(self._parent_node_id) else: - # 直接使用节点ID字符串 self._parent_node = self._client.get_node(self._parent_node_id) except Exception as e: print(f"获取父节点失败: {self._parent_node_id}, 错误: {e}") @@ -147,7 +271,7 @@ class Method(Base): def write(self, value: Any) -> bool: """方法节点不支持写入操作""" return True - + def call(self, *args) -> Tuple[Any, bool]: """调用方法,返回(返回值, 是否出错)""" try: @@ -161,7 +285,7 @@ class Method(Base): class Object(Base): def __init__(self, client: Client, name: str, node_id: str): super().__init__(client, name, node_id, NodeType.OBJECT, None) - + def read(self) -> Tuple[Any, bool]: """对象节点不支持直接读取操作""" return None, True @@ -169,7 +293,7 @@ class Object(Base): def write(self, value: Any) -> bool: """对象节点不支持直接写入操作""" return True - + def get_children(self) -> Tuple[List[Node], bool]: """获取子节点列表,返回(子节点列表, 是否出错)""" try: @@ -177,4 +301,4 @@ class Object(Base): return children, False except Exception as e: print(f"获取对象 {self._name} 的子节点失败: {e}") - return [], True \ No newline at end of file + return [], True