Files
Uni-Lab-OS/unilabos/device_comms/opcua_client/node/uniopcua.py
Xuwznln 4c8022ee95 Workstation yb merge dev ready 260113 (#216)
* feat(bioyond): 添加计算实验设计功能,支持化合物配比和滴定比例参数

* feat(bioyond): 添加测量小瓶功能,支持基本参数配置

* feat(bioyond): 添加测量小瓶配置,支持新设备参数

* feat(bioyond): 更新仓库布局和尺寸,支持竖向排列的测量小瓶和试剂存放堆栈

* feat(bioyond): 优化任务创建流程,确保无论成功与否都清理任务队列以避免重复累积

* feat(bioyond): 添加设置反应器温度功能,支持温度范围和异常处理

* feat(bioyond): 调整反应器位置配置,统一坐标格式

* feat(bioyond): 添加调度器启动功能,支持任务队列执行并处理异常

* feat(bioyond): 优化调度器启动功能,添加异常处理并更新相关配置

* feat(opcua): 增强节点ID解析兼容性和数据类型处理

改进节点ID解析逻辑以支持多种格式,包括字符串和数字标识符
添加数据类型转换处理,确保写入值时类型匹配
优化错误提示信息,便于调试节点连接问题

* feat(registry): 新增后处理站的设备配置文件

添加后处理站的YAML配置文件,包含动作映射、状态类型和设备描述

* 添加调度器启动功能,合并物料参数配置,优化物料参数处理逻辑

* 添加从 Bioyond 系统自动同步工作流序列的功能,并更新相关配置

* fix:兼容 BioyondReactionStation 中 workflow_sequence 被重写为 property

* fix:同步工作流序列

* feat: remove commented workflow synchronization from `reaction_station.py`.

* 添加时间约束功能及相关配置

* fix:自动更新物料缓存功能,添加物料时更新缓存并在删除时移除缓存项

* fix:在添加物料时处理字符串和字典返回值,确保正确更新缓存

* fix:更新奔曜错误处理报送为物料变更报送,调整日志记录和响应消息

* feat:添加实验报告简化功能,去除冗余信息并保留关键信息

* feat: 添加任务状态事件发布功能,监控并报告任务运行、超时、完成和错误状态

* fix: 修复添加物料时数据格式错误

* Refactor bioyond_dispensing_station and reaction_station_bioyond YAML configurations

- Removed redundant action value mappings from bioyond_dispensing_station.
- Updated goal properties in bioyond_dispensing_station to use enums for target_stack and other parameters.
- Changed data types for end_point and start_point in reaction_station_bioyond to use string enums (Start, End).
- Simplified descriptions and updated measurement units from μL to mL where applicable.
- Removed unused commands from reaction_station_bioyond to streamline the configuration.

* fix:Change the material unit from μL to mL

* fix:refresh_material_cache

* feat: 动态获取工作流步骤ID,优化工作流配置

* feat: 添加清空服务端所有非核心工作流功能

* fix:修复Bottle类的序列化和反序列化方法

* feat:增强材料缓存更新逻辑,支持处理返回数据中的详细信息

* Add debug log

* feat(workstation): update bioyond config migration and coin cell material search logic

- Migrate bioyond_cell config to JSON structure and remove global variable dependencies
- Implement material search confirmation dialog auto-handling
- Add documentation: 20260113_物料搜寻确认弹窗自动处理功能.md and 20260113_配置迁移修改总结.md

* Refactor module paths for Bioyond devices in YAML configuration files

- Updated the module path for BioyondDispensingStation in bioyond_dispensing_station.yaml to reflect the new directory structure.
- Updated the module path for BioyondReactionStation and BioyondReactor in reaction_station_bioyond.yaml to align with the revised organization of the codebase.

* fix: WareHouse 的不可哈希类型错误,优化父节点去重逻辑

* refactor: Move config from module to instance initialization

* fix: 修正 reaction_station 目录名拼写错误

* feat: Integrate material search logic and cleanup deprecated files

- Update coin_cell_assembly.py with material search dialog handling
- Update YB_warehouses.py with latest warehouse configurations
- Remove outdated documentation and test data files

* Refactor: Use instance attributes for action names and workflow step IDs

* refactor: Split tipbox storage into left and right warehouses

* refactor: Merge tipbox storage left and right into single warehouse

---------

Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>
Co-authored-by: Andy6M <xieqiming1132@qq.com>
2026-01-17 15:44:18 +08:00

305 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
from enum import Enum
from abc import ABC, abstractmethod
from typing import Tuple, Union, Optional, Any, List
from opcua import Client, Node, ua
from opcua.ua import NodeId, NodeClass, VariantType
class DataType(Enum):
BOOLEAN = VariantType.Boolean
SBYTE = VariantType.SByte
BYTE = VariantType.Byte
INT16 = VariantType.Int16
UINT16 = VariantType.UInt16
INT32 = VariantType.Int32
UINT32 = VariantType.UInt32
INT64 = VariantType.Int64
UINT64 = VariantType.UInt64
FLOAT = VariantType.Float
DOUBLE = VariantType.Double
STRING = VariantType.String
DATETIME = VariantType.DateTime
BYTESTRING = VariantType.ByteString
class NodeType(Enum):
VARIABLE = NodeClass.Variable
OBJECT = NodeClass.Object
METHOD = NodeClass.Method
OBJECTTYPE = NodeClass.ObjectType
VARIABLETYPE = NodeClass.VariableType
REFERENCETYPE = NodeClass.ReferenceType
DATATYPE = NodeClass.DataType
VIEW = NodeClass.View
class Base(ABC):
def __init__(self, client: Client, name: str, node_id: str, typ: NodeType, data_type: DataType):
self._node_id: str = node_id
self._client = client
self._name = name
self._type = typ
self._data_type = data_type
self._node: Optional[Node] = None
def _get_node(self) -> Node:
if self._node is None:
try:
# 尝试多种 NodeId 字符串格式解析,兼容不同服务器/库的输出
# 可能的格式示例: 'ns=2;i=1234', 'ns=2;s=SomeString',
# 'StringNodeId(ns=4;s=OPC|变量名)', 'NumericNodeId(ns=2;i=1234)' 等
import re
nid = self._node_id
# 如果已经是 NodeId/Node 对象(库用户可能传入),直接使用
try:
from opcua.ua import NodeId as UaNodeId
if isinstance(nid, UaNodeId):
self._node = self._client.get_node(nid)
return self._node
except Exception:
# 若导入或类型判断失败,则继续下一步
pass
# 直接以字符串形式处理
if isinstance(nid, str):
nid = nid.strip()
# 处理包含类名的格式,如 'StringNodeId(ns=4;s=...)' 或 'NumericNodeId(ns=2;i=...)'
# 提取括号内的内容
match_wrapped = re.match(r'(String|Numeric|Byte|Guid|TwoByteNode|FourByteNode)NodeId\((.*)\)', nid)
if match_wrapped:
# 提取括号内的实际 node_id 字符串
nid = match_wrapped.group(2).strip()
# 常见短格式 'ns=2;i=1234' 或 'ns=2;s=SomeString'
if re.match(r'^ns=\d+;[is]=', nid):
self._node = self._client.get_node(nid)
else:
# 尝试提取 ns 和 i 或 s
# 对于字符串标识符,可能包含特殊字符,使用非贪婪匹配
m_num = re.search(r'ns=(\d+);i=(\d+)', nid)
m_str = re.search(r'ns=(\d+);s=(.+?)(?:\)|$)', nid)
if m_num:
ns = int(m_num.group(1))
identifier = int(m_num.group(2))
node_id = NodeId(identifier, ns)
self._node = self._client.get_node(node_id)
elif m_str:
ns = int(m_str.group(1))
identifier = m_str.group(2).strip()
# 对于字符串标识符,直接使用字符串格式
node_id_str = f"ns={ns};s={identifier}"
self._node = self._client.get_node(node_id_str)
else:
# 回退:尝试直接传入字符串(有些实现接受其它格式)
try:
self._node = self._client.get_node(self._node_id)
except Exception as e:
# 输出更详细的错误信息供调试
print(f"获取节点失败(尝试直接字符串): {self._node_id}, 错误: {e}")
raise
else:
# 非字符串,尝试直接使用
self._node = self._client.get_node(self._node_id)
except Exception as e:
print(f"获取节点失败: {self._node_id}, 错误: {e}")
# 添加额外提示,帮助定位 BadNodeIdUnknown 问题
print("提示: 请确认该 node_id 是否来自当前连接的服务器地址空间," \
"以及 CSV/配置中名称与服务器 BrowseName 是否匹配。")
raise
return self._node
@abstractmethod
def read(self) -> Tuple[Any, bool]:
"""读取节点值,返回(值, 是否出错)"""
pass
@abstractmethod
def write(self, value: Any) -> bool:
"""写入节点值,返回是否出错"""
pass
@property
def type(self) -> NodeType:
return self._type
@property
def node_id(self) -> str:
return self._node_id
@property
def name(self) -> str:
return self._name
class Variable(Base):
def __init__(self, client: Client, name: str, node_id: str, data_type: DataType):
super().__init__(client, name, node_id, NodeType.VARIABLE, data_type)
def read(self) -> Tuple[Any, bool]:
try:
value = self._get_node().get_value()
return value, False
except Exception as e:
print(f"读取变量 {self._name} 失败: {e}")
return None, True
def write(self, value: Any) -> bool:
try:
# 如果声明了数据类型,则尝试转换并使用对应的 Variant 写入
coerced = value
try:
if self._data_type is not None:
# 基于声明的数据类型做简单类型转换
dt = self._data_type
if dt in (DataType.SBYTE, DataType.BYTE, DataType.INT16, DataType.UINT16,
DataType.INT32, DataType.UINT32, DataType.INT64, DataType.UINT64):
# 数值类型 -> int
if isinstance(value, str):
coerced = int(value)
else:
coerced = int(value)
elif dt in (DataType.FLOAT, DataType.DOUBLE):
if isinstance(value, str):
coerced = float(value)
else:
coerced = float(value)
elif dt == DataType.BOOLEAN:
if isinstance(value, str):
v = value.strip().lower()
if v in ("true", "1", "yes", "on"):
coerced = True
elif v in ("false", "0", "no", "off"):
coerced = False
else:
coerced = bool(value)
else:
coerced = bool(value)
elif dt == DataType.STRING or dt == DataType.BYTESTRING or dt == DataType.DATETIME:
coerced = str(value)
# 使用 ua.Variant 明确指定 VariantType
try:
variant = ua.Variant(coerced, dt.value)
self._get_node().set_value(variant)
except Exception:
# 回退:有些 set_value 实现接受 (value, variant_type)
try:
self._get_node().set_value(coerced, dt.value)
except Exception:
# 最后回退到直接写入(保持兼容性)
self._get_node().set_value(coerced)
else:
# 未声明数据类型,直接写入
self._get_node().set_value(value)
except Exception:
# 若在转换或按数据类型写入失败,尝试直接写入原始值并让上层捕获错误
self._get_node().set_value(value)
return False
except Exception as e:
print(f"写入变量 {self._name} 失败: {e}")
return True
class Method(Base):
def __init__(self, client: Client, name: str, node_id: str, parent_node_id: str, data_type: DataType):
super().__init__(client, name, node_id, NodeType.METHOD, data_type)
self._parent_node_id = parent_node_id
self._parent_node = None
def _get_parent_node(self) -> Node:
if self._parent_node is None:
try:
# 处理父节点ID使用与_get_node相同的解析逻辑
import re
nid = self._parent_node_id
# 如果已经是 NodeId 对象,直接使用
try:
from opcua.ua import NodeId as UaNodeId
if isinstance(nid, UaNodeId):
self._parent_node = self._client.get_node(nid)
return self._parent_node
except Exception:
pass
# 字符串处理
if isinstance(nid, str):
nid = nid.strip()
# 处理包含类名的格式
match_wrapped = re.match(r'(String|Numeric|Byte|Guid|TwoByteNode|FourByteNode)NodeId\((.*)\)', nid)
if match_wrapped:
nid = match_wrapped.group(2).strip()
# 常见短格式
if re.match(r'^ns=\d+;[is]=', nid):
self._parent_node = self._client.get_node(nid)
else:
# 提取 ns 和 i 或 s
m_num = re.search(r'ns=(\d+);i=(\d+)', nid)
m_str = re.search(r'ns=(\d+);s=(.+?)(?:\)|$)', nid)
if m_num:
ns = int(m_num.group(1))
identifier = int(m_num.group(2))
node_id = NodeId(identifier, ns)
self._parent_node = self._client.get_node(node_id)
elif m_str:
ns = int(m_str.group(1))
identifier = m_str.group(2).strip()
node_id_str = f"ns={ns};s={identifier}"
self._parent_node = self._client.get_node(node_id_str)
else:
# 回退
self._parent_node = self._client.get_node(self._parent_node_id)
else:
self._parent_node = self._client.get_node(self._parent_node_id)
except Exception as e:
print(f"获取父节点失败: {self._parent_node_id}, 错误: {e}")
raise
return self._parent_node
def read(self) -> Tuple[Any, bool]:
"""方法节点不支持读取操作"""
return None, True
def write(self, value: Any) -> bool:
"""方法节点不支持写入操作"""
return True
def call(self, *args) -> Tuple[Any, bool]:
"""调用方法,返回(返回值, 是否出错)"""
try:
result = self._get_parent_node().call_method(self._get_node(), *args)
return result, False
except Exception as e:
print(f"调用方法 {self._name} 失败: {e}")
return None, True
class Object(Base):
def __init__(self, client: Client, name: str, node_id: str):
super().__init__(client, name, node_id, NodeType.OBJECT, None)
def read(self) -> Tuple[Any, bool]:
"""对象节点不支持直接读取操作"""
return None, True
def write(self, value: Any) -> bool:
"""对象节点不支持直接写入操作"""
return True
def get_children(self) -> Tuple[List[Node], bool]:
"""获取子节点列表,返回(子节点列表, 是否出错)"""
try:
children = self._get_node().get_children()
return children, False
except Exception as e:
print(f"获取对象 {self._name} 的子节点失败: {e}")
return [], True