diff --git a/opcua_config.json b/opcua_config.json
new file mode 100644
index 0000000..16771a3
--- /dev/null
+++ b/opcua_config.json
@@ -0,0 +1,98 @@
+{
+ "register_node_list_from_csv_path": {
+ "path": "simple_opcua_nodes.csv"
+ },
+ "create_flow": [
+ {
+ "name": "温度控制流程",
+ "action": [
+ {
+ "name": "温度控制动作",
+ "node_function_to_create": [
+ {
+ "func_name": "read_temperature",
+ "node_name": "Temperature",
+ "mode": "read"
+ },
+ {
+ "func_name": "read_heating_status",
+ "node_name": "HeatingStatus",
+ "mode": "read"
+ },
+ {
+ "func_name": "set_heating",
+ "node_name": "HeatingEnabled",
+ "mode": "write",
+ "value": true
+ }
+ ],
+ "create_init_function": {
+ "func_name": "init_setpoint",
+ "node_name": "Setpoint",
+ "mode": "write",
+ "value": 25.0
+ },
+ "create_start_function": {
+ "func_name": "start_heating_control",
+ "node_name": "HeatingEnabled",
+ "mode": "write",
+ "write_functions": [
+ "set_heating"
+ ],
+ "condition_functions": [
+ "read_temperature",
+ "read_heating_status"
+ ],
+ "stop_condition_expression": "read_temperature >= 25.0 and read_heating_status"
+ },
+ "create_stop_function": {
+ "func_name": "stop_heating",
+ "node_name": "HeatingEnabled",
+ "mode": "write",
+ "value": false
+ },
+ "create_cleanup_function": null
+ }
+ ]
+ },
+ {
+ "name": "报警重置流程",
+ "action": [
+ {
+ "name": "报警重置动作",
+ "node_function_to_create": [
+ {
+ "func_name": "reset_alarm",
+ "node_name": "ResetAlarm",
+ "mode": "call",
+ "value": []
+ }
+ ],
+ "create_init_function": null,
+ "create_start_function": {
+ "func_name": "start_reset_alarm",
+ "node_name": "ResetAlarm",
+ "mode": "call",
+ "write_functions": [],
+ "condition_functions": [
+ "reset_alarm"
+ ],
+ "stop_condition_expression": "True"
+ },
+ "create_stop_function": null,
+ "create_cleanup_function": null
+ }
+ ]
+ },
+ {
+ "name": "完整控制流程",
+ "action": [
+ "温度控制流程",
+ "报警重置流程"
+ ]
+ }
+ ],
+ "execute_flow": [
+ "完整控制流程"
+ ]
+}
\ No newline at end of file
diff --git a/python b/python
new file mode 100644
index 0000000..e69de29
diff --git a/recipes/ros-humble-unilabos-msgs/bld_ament_cmake.bat b/recipes/ros-humble-unilabos-msgs/bld_ament_cmake.bat
new file mode 100644
index 0000000..9bf0155
--- /dev/null
+++ b/recipes/ros-humble-unilabos-msgs/bld_ament_cmake.bat
@@ -0,0 +1,41 @@
+:: Generated by vinca http://github.com/RoboStack/vinca.
+:: DO NOT EDIT!
+setlocal EnableDelayedExpansion
+
+set "PYTHONPATH=%LIBRARY_PREFIX%\lib\site-packages;%SP_DIR%"
+
+:: MSVC is preferred.
+set CC=cl.exe
+set CXX=cl.exe
+
+rd /s /q build
+mkdir build
+pushd build
+
+:: set "CMAKE_GENERATOR=Ninja"
+
+:: try to fix long paths issues by using default generator
+set "CMAKE_GENERATOR=Visual Studio %VS_MAJOR% %VS_YEAR%"
+set "SP_DIR_FORWARDSLASHES=%SP_DIR:\=/%"
+
+set PYTHON="%PREFIX%\python.exe"
+
+cmake ^
+ -G "%CMAKE_GENERATOR%" ^
+ -DCMAKE_INSTALL_PREFIX=%LIBRARY_PREFIX% ^
+ -DCMAKE_BUILD_TYPE=Release ^
+ -DCMAKE_INSTALL_SYSTEM_RUNTIME_LIBS_SKIP=True ^
+ -DPYTHON_EXECUTABLE=%PYTHON% ^
+ -DPython_EXECUTABLE=%PYTHON% ^
+ -DPython3_EXECUTABLE=%PYTHON% ^
+ -DSETUPTOOLS_DEB_LAYOUT=OFF ^
+ -DBUILD_SHARED_LIBS=ON ^
+ -DBUILD_TESTING=OFF ^
+ -DCMAKE_OBJECT_PATH_MAX=255 ^
+ -DPYTHON_INSTALL_DIR=%SP_DIR_FORWARDSLASHES% ^
+ --compile-no-warning-as-error ^
+ %SRC_DIR%\%PKG_NAME%\src\work
+if errorlevel 1 exit 1
+
+cmake --build . --config Release --target install
+if errorlevel 1 exit 1
diff --git a/recipes/ros-humble-unilabos-msgs/build_ament_cmake.sh b/recipes/ros-humble-unilabos-msgs/build_ament_cmake.sh
new file mode 100644
index 0000000..52baa99
--- /dev/null
+++ b/recipes/ros-humble-unilabos-msgs/build_ament_cmake.sh
@@ -0,0 +1,71 @@
+# Generated by vinca http://github.com/RoboStack/vinca.
+# DO NOT EDIT!
+
+rm -rf build
+mkdir build
+cd build
+
+# necessary for correctly linking SIP files (from python_qt_bindings)
+export LINK=$CXX
+
+if [[ "$CONDA_BUILD_CROSS_COMPILATION" != "1" ]]; then
+ PYTHON_EXECUTABLE=$PREFIX/bin/python
+ PKG_CONFIG_EXECUTABLE=$PREFIX/bin/pkg-config
+ OSX_DEPLOYMENT_TARGET="10.15"
+else
+ PYTHON_EXECUTABLE=$BUILD_PREFIX/bin/python
+ PKG_CONFIG_EXECUTABLE=$BUILD_PREFIX/bin/pkg-config
+ OSX_DEPLOYMENT_TARGET="11.0"
+fi
+
+echo "USING PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}"
+echo "USING PKG_CONFIG_EXECUTABLE=${PKG_CONFIG_EXECUTABLE}"
+
+export ROS_PYTHON_VERSION=`$PYTHON_EXECUTABLE -c "import sys; print('%i.%i' % (sys.version_info[0:2]))"`
+echo "Using Python ${ROS_PYTHON_VERSION}"
+# Fix up SP_DIR which for some reason might contain a path to a wrong Python version
+FIXED_SP_DIR=$(echo $SP_DIR | sed -E "s/python[0-9]+\.[0-9]+/python$ROS_PYTHON_VERSION/")
+echo "Using site-package dir ${FIXED_SP_DIR}"
+
+# see https://github.com/conda-forge/cross-python-feedstock/issues/24
+if [[ "$CONDA_BUILD_CROSS_COMPILATION" == "1" ]]; then
+ find $PREFIX/lib/cmake -type f -exec sed -i "s~\${_IMPORT_PREFIX}/lib/python${ROS_PYTHON_VERSION}/site-packages~${BUILD_PREFIX}/lib/python${ROS_PYTHON_VERSION}/site-packages~g" {} + || true
+ find $PREFIX/share/rosidl* -type f -exec sed -i "s~$PREFIX/lib/python${ROS_PYTHON_VERSION}/site-packages~${BUILD_PREFIX}/lib/python${ROS_PYTHON_VERSION}/site-packages~g" {} + || true
+ find $PREFIX/share/rosidl* -type f -exec sed -i "s~\${_IMPORT_PREFIX}/lib/python${ROS_PYTHON_VERSION}/site-packages~${BUILD_PREFIX}/lib/python${ROS_PYTHON_VERSION}/site-packages~g" {} + || true
+ find $PREFIX/lib/cmake -type f -exec sed -i "s~message(FATAL_ERROR \"The imported target~message(WARNING \"The imported target~g" {} + || true
+fi
+
+if [[ $target_platform =~ linux.* ]]; then
+ export CFLAGS="${CFLAGS} -D__STDC_FORMAT_MACROS=1"
+ export CXXFLAGS="${CXXFLAGS} -D__STDC_FORMAT_MACROS=1"
+fi;
+
+# Needed for qt-gui-cpp ..
+if [[ $target_platform =~ linux.* ]]; then
+ ln -s $GCC ${BUILD_PREFIX}/bin/gcc
+ ln -s $GXX ${BUILD_PREFIX}/bin/g++
+fi;
+
+cmake \
+ -G "Ninja" \
+ -DCMAKE_INSTALL_PREFIX=$PREFIX \
+ -DCMAKE_PREFIX_PATH=$PREFIX \
+ -DAMENT_PREFIX_PATH=$PREFIX \
+ -DCMAKE_INSTALL_LIBDIR=lib \
+ -DCMAKE_BUILD_TYPE=Release \
+ -DPYTHON_EXECUTABLE=$PYTHON_EXECUTABLE \
+ -DPython_EXECUTABLE=$PYTHON_EXECUTABLE \
+ -DPython3_EXECUTABLE=$PYTHON_EXECUTABLE \
+ -DPython3_FIND_STRATEGY=LOCATION \
+ -DPKG_CONFIG_EXECUTABLE=$PKG_CONFIG_EXECUTABLE \
+ -DPYTHON_INSTALL_DIR=$FIXED_SP_DIR \
+ -DSETUPTOOLS_DEB_LAYOUT=OFF \
+ -DCATKIN_SKIP_TESTING=$SKIP_TESTING \
+ -DCMAKE_INSTALL_SYSTEM_RUNTIME_LIBS_SKIP=True \
+ -DBUILD_SHARED_LIBS=ON \
+ -DBUILD_TESTING=OFF \
+ -DCMAKE_OSX_DEPLOYMENT_TARGET=$OSX_DEPLOYMENT_TARGET \
+ --compile-no-warning-as-error \
+ $SRC_DIR/$PKG_NAME/src/work
+
+cmake --build . --config Release --target install
diff --git a/recipes/ros-humble-unilabos-msgs/recipe.yaml b/recipes/ros-humble-unilabos-msgs/recipe.yaml
new file mode 100644
index 0000000..e476d1b
--- /dev/null
+++ b/recipes/ros-humble-unilabos-msgs/recipe.yaml
@@ -0,0 +1,61 @@
+package:
+ name: ros-humble-unilabos-msgs
+ version: 0.9.7
+source:
+ path: ../../unilabos_msgs
+ folder: ros-humble-unilabos-msgs/src/work
+
+build:
+ script:
+ sel(win): bld_ament_cmake.bat
+ sel(unix): build_ament_cmake.sh
+ number: 5
+about:
+ home: https://www.ros.org/
+ license: BSD-3-Clause
+ summary: |
+ Robot Operating System
+
+extra:
+ recipe-maintainers:
+ - ros-forge
+
+requirements:
+ build:
+ - "{{ compiler('cxx') }}"
+ - "{{ compiler('c') }}"
+ - sel(linux64): sysroot_linux-64 2.17
+ - ninja
+ - setuptools
+ - sel(unix): make
+ - sel(unix): coreutils
+ - sel(osx): tapi
+ - sel(build_platform != target_platform): pkg-config
+ - cmake
+ - cython
+ - sel(win): vs2022_win-64
+ - sel(build_platform != target_platform): python
+ - sel(build_platform != target_platform): cross-python_{{ target_platform }}
+ - sel(build_platform != target_platform): numpy
+ host:
+ - numpy
+ - pip
+ - sel(build_platform == target_platform): pkg-config
+ - robostack-staging::ros-humble-action-msgs
+ - robostack-staging::ros-humble-ament-cmake
+ - robostack-staging::ros-humble-ament-lint-auto
+ - robostack-staging::ros-humble-ament-lint-common
+ - robostack-staging::ros-humble-ros-environment
+ - robostack-staging::ros-humble-ros-workspace
+ - robostack-staging::ros-humble-rosidl-default-generators
+ - robostack-staging::ros-humble-std-msgs
+ - robostack-staging::ros-humble-geometry-msgs
+ - robostack-staging::ros2-distro-mutex=0.5.*
+ run:
+ - robostack-staging::ros-humble-action-msgs
+ - robostack-staging::ros-humble-ros-workspace
+ - robostack-staging::ros-humble-rosidl-default-runtime
+ - robostack-staging::ros-humble-std-msgs
+ - robostack-staging::ros-humble-geometry-msgs
+# - robostack-staging::ros2-distro-mutex=0.6.*
+ - sel(osx and x86_64): __osx >={{ MACOSX_DEPLOYMENT_TARGET|default('10.14') }}
diff --git a/test/experiments/opcua_example.json b/test/experiments/opcua_example.json
new file mode 100644
index 0000000..a563628
--- /dev/null
+++ b/test/experiments/opcua_example.json
@@ -0,0 +1,19 @@
+{
+ "nodes": [
+ {
+ "id": "id",
+ "name": "name",
+ "children": [
+ ],
+ "parent": null,
+ "type": "device",
+ "class": "opcua_example",
+ "config": {
+ "url": "url",
+ "config_path": "unilabos/device_comms/opcua_client/opcua_workflow_example.json"
+ },
+ "data": {
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/unilabos/device_comms/opcua_client/README.md b/unilabos/device_comms/opcua_client/README.md
new file mode 100644
index 0000000..a32735e
--- /dev/null
+++ b/unilabos/device_comms/opcua_client/README.md
@@ -0,0 +1,19 @@
+# OPC UA 通用客户端
+
+本模块提供了一个通用的 OPC UA 客户端实现,可以通过外部配置(CSV文件)来定义节点,并通过JSON配置来执行工作流。
+
+## 特点
+
+- 支持通过 CSV 文件配置 OPC UA 节点(只需提供名称、类型和数据类型,支持节点为中文名,需指定NodeLanguage)
+- 自动查找服务器中的节点,无需知道确切的节点ID
+- 提供工作流机制
+- 支持通过 JSON 配置创建工作流
+
+## 使用方法
+
+step1: 准备opcua_nodes.csv文件
+step2: 编写opcua_workflow_example.json,以定义工作流。指定opcua_nodes.csv
+step3: 编写工作流对应action
+step4: 编写opcua_example.yaml注册表
+step5: 编写opcua_example.json组态图。指定opcua_workflow_example.json定义工作流文件
+
diff --git a/unilabos/device_comms/opcua_client/__init__.py b/unilabos/device_comms/opcua_client/__init__.py
new file mode 100644
index 0000000..97e8bdc
--- /dev/null
+++ b/unilabos/device_comms/opcua_client/__init__.py
@@ -0,0 +1,12 @@
+from unilabos.device_comms.opcua_client.client_o import OpcUaClient, BaseClient
+from unilabos.device_comms.opcua_client.node.uniopcua import Variable, Method, Object, NodeType, DataType
+
+__all__ = [
+ 'OpcUaClient',
+ 'BaseClient',
+ 'Variable',
+ 'Method',
+ 'Object',
+ 'NodeType',
+ 'DataType',
+]
\ No newline at end of file
diff --git a/unilabos/device_comms/opcua_client/client.py b/unilabos/device_comms/opcua_client/client.py
new file mode 100644
index 0000000..011ce07
--- /dev/null
+++ b/unilabos/device_comms/opcua_client/client.py
@@ -0,0 +1,1380 @@
+import json
+import time
+import traceback
+from typing import Any, Union, List, Dict, Callable, Optional, Tuple
+from pydantic import BaseModel
+
+from opcua import Client, ua
+from opcua.ua import NodeClass
+import pandas as pd
+import os
+
+from unilabos.device_comms.opcua_client.node.uniopcua import Base as OpcUaNodeBase
+from unilabos.device_comms.opcua_client.node.uniopcua import Variable, Method, NodeType, DataType
+from unilabos.device_comms.universal_driver import UniversalDriver
+from unilabos.utils.log import logger
+
+
+class OpcUaNode(BaseModel):
+ name: str
+ node_type: NodeType
+ node_id: str = ""
+ data_type: Optional[DataType] = None
+ parent_node_id: Optional[str] = None
+
+
+class OpcUaWorkflow(BaseModel):
+ name: str
+ actions: List[
+ Union[
+ "OpcUaWorkflow",
+ Callable[
+ [Callable[[str], OpcUaNodeBase]],
+ None
+ ]]
+ ]
+
+
+class Action(BaseModel):
+ name: str
+ rw: bool # read是0 write是1
+
+
+class WorkflowAction(BaseModel):
+ init: Optional[Callable[[Callable[[str], OpcUaNodeBase]], bool]] = None
+ start: Optional[Callable[[Callable[[str], OpcUaNodeBase]], bool]] = None
+ stop: Optional[Callable[[Callable[[str], OpcUaNodeBase]], bool]] = None
+ cleanup: Optional[Callable[[Callable[[str], OpcUaNodeBase]], None]] = None
+
+
+class OpcUaWorkflowModel(BaseModel):
+ name: str
+ actions: List[Union["OpcUaWorkflowModel", WorkflowAction]]
+ parameters: Optional[List[str]] = None
+ description: Optional[str] = None
+
+
+""" 前后端Json解析用 """
+class NodeFunctionJson(BaseModel):
+ func_name: str
+ node_name: str
+ mode: str # read, write, call
+ value: Any = None
+
+
+class InitFunctionJson(NodeFunctionJson):
+ pass
+
+
+class StartFunctionJson(NodeFunctionJson):
+ write_functions: List[str]
+ condition_functions: List[str]
+ stop_condition_expression: str
+
+
+class StopFunctionJson(NodeFunctionJson):
+ pass
+
+
+class CleanupFunctionJson(NodeFunctionJson):
+ pass
+
+
+class ActionJson(BaseModel):
+ node_function_to_create: List[NodeFunctionJson]
+ create_init_function: Optional[InitFunctionJson] = None
+ create_start_function: Optional[StartFunctionJson] = None
+ create_stop_function: Optional[StopFunctionJson] = None
+ create_cleanup_function: Optional[CleanupFunctionJson] = None
+
+
+class SimplifiedActionJson(BaseModel):
+ """简化的动作JSON格式,直接定义节点列表和函数"""
+ nodes: Optional[Dict[str, Dict[str, Any]]] = None # 节点定义,格式为 {func_name: {node_name, mode, value}}
+ init_function: Optional[Dict[str, Any]] = None
+ start_function: Optional[Dict[str, Any]] = None
+ stop_function: Optional[Dict[str, Any]] = None
+ cleanup_function: Optional[Dict[str, Any]] = None
+
+
+class WorkflowCreateJson(BaseModel):
+ name: str
+ action: List[Union[ActionJson, SimplifiedActionJson, 'WorkflowCreateJson', str]]
+ parameters: Optional[List[str]] = None
+ description: Optional[str] = None
+
+
+class ExecuteProcedureJson(BaseModel):
+ register_node_list_from_csv_path: Optional[Dict[str, Any]] = None
+ create_flow: List[WorkflowCreateJson]
+ execute_flow: List[str]
+
+
+class BaseClient(UniversalDriver):
+ client: Optional[Client] = None
+ _node_registry: Dict[str, OpcUaNodeBase] = {}
+ DEFAULT_ADDRESS_PATH = ""
+ _variables_to_find: Dict[str, Dict[str, Any]] = {}
+ _name_mapping: Dict[str, str] = {} # 英文名到中文名的映射
+ _reverse_mapping: Dict[str, str] = {} # 中文名到英文名的映射
+
+ def __init__(self):
+ super().__init__()
+ # 自动查找节点功能默认开启
+ self._auto_find_nodes = True
+ # 初始化名称映射字典
+ self._name_mapping = {}
+ self._reverse_mapping = {}
+
+ def _set_client(self, client: Optional[Client]) -> None:
+ if client is None:
+ raise ValueError('client is not valid')
+ self.client = client
+
+ def _connect(self) -> None:
+ logger.info('try to connect client...')
+ if self.client:
+ try:
+ self.client.connect()
+ logger.info('client connected!')
+
+ # 连接后开始查找节点
+ if self._variables_to_find:
+ self._find_nodes()
+ except Exception as e:
+ logger.error(f'client connect failed: {e}')
+ raise
+ else:
+ raise ValueError('client is not initialized')
+
+ def _find_nodes(self) -> None:
+ """查找服务器中的节点"""
+ if not self.client:
+ raise ValueError('client is not connected')
+
+ logger.info('开始查找节点...')
+ try:
+ # 获取根节点
+ root = self.client.get_root_node()
+ objects = root.get_child(["0:Objects"])
+
+ # 查找节点
+ self._find_nodes_recursive(objects)
+
+ # 检查是否所有节点都已找到
+ not_found = []
+ for var_name, var_info in self._variables_to_find.items():
+ if var_name not in self._node_registry:
+ not_found.append(var_name)
+
+ if not_found:
+ logger.warning(f"以下节点未找到: {', '.join(not_found)}")
+ else:
+ logger.info("所有节点均已找到")
+
+ except Exception as e:
+ logger.error(f"查找节点失败: {e}")
+ traceback.print_exc()
+
+ def _find_nodes_recursive(self, node) -> None:
+ """递归查找节点"""
+ try:
+ # 获取当前节点的浏览名称
+ browse_name = node.get_browse_name()
+ node_name = browse_name.Name
+
+ # 检查是否是我们要找的变量
+ if node_name in self._variables_to_find and node_name not in self._node_registry:
+ var_info = self._variables_to_find[node_name]
+ node_type = var_info.get("node_type")
+ data_type = var_info.get("data_type")
+
+ # 根据节点类型创建相应的对象
+ if node_type == NodeType.VARIABLE:
+ self._node_registry[node_name] = Variable(self.client, node_name, str(node.nodeid), data_type)
+ logger.info(f"找到变量节点: {node_name}")
+ elif node_type == NodeType.METHOD:
+ # 对于方法节点,需要获取父节点ID
+ parent_node = node.get_parent()
+ parent_node_id = str(parent_node.nodeid)
+ self._node_registry[node_name] = Method(self.client, node_name, str(node.nodeid), parent_node_id, data_type)
+ logger.info(f"找到方法节点: {node_name}")
+
+ # 递归处理子节点
+ for child in node.get_children():
+ self._find_nodes_recursive(child)
+
+ except Exception as e:
+ # 忽略处理单个节点时的错误,继续处理其他节点
+ pass
+
+ @classmethod
+ def load_csv(cls, file_path: str) -> List[OpcUaNode]:
+ """
+ 从CSV文件加载节点定义
+ CSV文件需包含Name,NodeType,DataType列
+ 可选包含EnglishName和NodeLanguage列
+ """
+ df = pd.read_csv(file_path)
+ df = df.drop_duplicates(subset='Name', keep='first') # 重复的数据应该报错
+ nodes = []
+
+ # 检查是否包含英文名称列和节点语言列
+ has_english_name = 'EnglishName' in df.columns
+ has_node_language = 'NodeLanguage' in df.columns
+
+ # 如果存在英文名称列,创建名称映射字典
+ name_mapping = {}
+ reverse_mapping = {}
+
+ for _, row in df.iterrows():
+ name = row.get('Name')
+ node_type_str = row.get('NodeType')
+ data_type_str = row.get('DataType')
+
+ # 获取英文名称和节点语言(如果有)
+ english_name = row.get('EnglishName') if has_english_name else None
+ node_language = row.get('NodeLanguage') if has_node_language else 'English' # 默认为英文
+
+ # 如果有英文名称,添加到映射字典
+ if english_name and not pd.isna(english_name) and node_language == 'Chinese':
+ name_mapping[english_name] = name
+ reverse_mapping[name] = english_name
+
+ if not name or not node_type_str:
+ logger.warning(f"跳过无效行: 名称或节点类型缺失")
+ continue
+
+ # 只支持VARIABLE和METHOD两种类型
+ if node_type_str not in ['VARIABLE', 'METHOD']:
+ logger.warning(f"不支持的节点类型: {node_type_str},仅支持VARIABLE和METHOD")
+ continue
+
+ try:
+ node_type = NodeType[node_type_str]
+ except KeyError:
+ logger.warning(f"无效的节点类型: {node_type_str}")
+ continue
+
+ # 对于VARIABLE节点,必须指定数据类型
+ if node_type == NodeType.VARIABLE:
+ if not data_type_str or pd.isna(data_type_str):
+ logger.warning(f"变量节点 {name} 必须指定数据类型")
+ continue
+
+ try:
+ data_type = DataType[data_type_str]
+ except KeyError:
+ logger.warning(f"无效的数据类型: {data_type_str}")
+ continue
+ else:
+ # 对于METHOD节点,数据类型可选
+ data_type = None
+ if data_type_str and not pd.isna(data_type_str):
+ try:
+ data_type = DataType[data_type_str]
+ except KeyError:
+ logger.warning(f"无效的数据类型: {data_type_str},将使用默认值")
+
+ # 创建节点对象,节点ID留空,将通过自动查找功能获取
+ nodes.append(OpcUaNode(
+ name=name,
+ node_type=node_type,
+ data_type=data_type
+ ))
+
+ # 返回节点列表和名称映射字典
+ return nodes, name_mapping, reverse_mapping
+
+ def use_node(self, name: str) -> OpcUaNodeBase:
+ """
+ 获取已注册的节点
+ 如果节点尚未找到,会尝试再次查找
+ 支持使用英文名称访问中文节点
+ """
+ # 检查是否使用英文名称访问中文节点
+ if name in self._name_mapping:
+ chinese_name = self._name_mapping[name]
+ if chinese_name in self._node_registry:
+ return self._node_registry[chinese_name]
+ elif chinese_name in self._variables_to_find:
+ logger.warning(f"节点 {chinese_name} (英文名: {name}) 尚未找到,尝试重新查找")
+ if self.client:
+ self._find_nodes()
+ if chinese_name in self._node_registry:
+ return self._node_registry[chinese_name]
+ raise ValueError(f'节点 {chinese_name} (英文名: {name}) 未注册或未找到')
+
+ # 直接使用原始名称查找
+ if name not in self._node_registry:
+ if name in self._variables_to_find:
+ logger.warning(f"节点 {name} 尚未找到,尝试重新查找")
+ if self.client:
+ self._find_nodes()
+ if name in self._node_registry:
+ return self._node_registry[name]
+ raise ValueError(f'节点 {name} 未注册或未找到')
+ return self._node_registry[name]
+
+ def get_node_registry(self) -> Dict[str, OpcUaNodeBase]:
+ return self._node_registry
+
+ def register_node_list_from_csv_path(self, path: str = None) -> "BaseClient":
+ """从CSV文件注册节点"""
+ if path is None:
+ path = self.DEFAULT_ADDRESS_PATH
+ nodes, name_mapping, reverse_mapping = self.load_csv(path)
+ self._name_mapping.update(name_mapping)
+ self._reverse_mapping.update(reverse_mapping)
+ return self.register_node_list(nodes)
+
+ def register_node_list(self, node_list: List[OpcUaNode]) -> "BaseClient":
+ """注册节点列表"""
+ if not node_list or len(node_list) == 0:
+ logger.warning('节点列表为空')
+ return self
+
+ logger.info(f'开始注册 {len(node_list)} 个节点...')
+ for node in node_list:
+ if node is None:
+ continue
+
+ if node.name in self._node_registry:
+ logger.info(f'节点 {node.name} 已存在')
+ exist = self._node_registry[node.name]
+ if exist.type != node.node_type:
+ raise ValueError(f'节点 {node.name} 类型 {node.node_type} 与已存在的类型 {exist.type} 不一致')
+ continue
+
+ # 将节点添加到待查找列表
+ self._variables_to_find[node.name] = {
+ "node_type": node.node_type,
+ "data_type": node.data_type
+ }
+ logger.info(f'添加节点 {node.name} 到待查找列表')
+
+ logger.info('节点注册完成')
+
+ # 如果客户端已连接,立即开始查找
+ if self.client:
+ self._find_nodes()
+
+ return self
+
+ def run_opcua_workflow(self, workflow: OpcUaWorkflow) -> None:
+ if not self.client:
+ raise ValueError('client is not connected')
+
+ logger.info(f'start to run workflow {workflow.name}...')
+
+ for action in workflow.actions:
+ if isinstance(action, OpcUaWorkflow):
+ self.run_opcua_workflow(action)
+ elif callable(action):
+ action(self.use_node)
+ else:
+ raise ValueError(f'invalid action {action}')
+
+ def call_lifecycle_fn(
+ self,
+ workflow: OpcUaWorkflowModel,
+ fn: Optional[Callable[[Callable], bool]],
+ ) -> bool:
+ if not fn:
+ raise ValueError('fn is not valid in call_lifecycle_fn')
+ try:
+ result = fn(self.use_node)
+ # 处理函数返回值可能是元组的情况
+ if isinstance(result, tuple) and len(result) == 2:
+ # 第二个元素是错误标志,True表示出错,False表示成功
+ value, error_flag = result
+ return not error_flag # 转换成True表示成功,False表示失败
+ return result
+ except Exception as e:
+ traceback.print_exc()
+ logger.error(f'execute {workflow.name} lifecycle failed, err: {e}')
+ return False
+
+ def run_opcua_workflow_model(self, workflow: OpcUaWorkflowModel) -> bool:
+ if not self.client:
+ raise ValueError('client is not connected')
+
+ logger.info(f'start to run workflow {workflow.name}...')
+
+ for action in workflow.actions:
+ if isinstance(action, OpcUaWorkflowModel):
+ if self.run_opcua_workflow_model(action):
+ logger.info(f"{action.name} workflow done.")
+ continue
+ else:
+ logger.error(f"{action.name} workflow failed")
+ return False
+ elif isinstance(action, WorkflowAction):
+ init = action.init
+ start = action.start
+ stop = action.stop
+ cleanup = action.cleanup
+ if not init and not start and not stop:
+ raise ValueError(f'invalid action {action}')
+
+ is_err = False
+ try:
+ if init and not self.call_lifecycle_fn(workflow, init):
+ raise ValueError(f"{workflow.name} init action failed")
+ if not self.call_lifecycle_fn(workflow, start):
+ raise ValueError(f"{workflow.name} start action failed")
+ if not self.call_lifecycle_fn(workflow, stop):
+ raise ValueError(f"{workflow.name} stop action failed")
+ logger.info(f"{workflow.name} action done.")
+ except Exception as e:
+ is_err = True
+ traceback.print_exc()
+ logger.error(f"{workflow.name} action failed, err: {e}")
+ finally:
+ logger.info(f"{workflow.name} try to run cleanup")
+ if cleanup:
+ self.call_lifecycle_fn(workflow, cleanup)
+ else:
+ logger.info(f"{workflow.name} cleanup is not defined")
+ if is_err:
+ return False
+ return True
+ else:
+ raise ValueError(f'invalid action type {type(action)}')
+
+ return True
+
+ function_name: Dict[str, Callable[[Callable[[str], OpcUaNodeBase]], bool]] = {}
+
+ def create_node_function(self, func_name: str = None, node_name: str = None, mode: str = None, value: Any = None, **kwargs) -> Callable[[Callable[[str], OpcUaNodeBase]], bool]:
+ def execute_node_function(use_node: Callable[[str], OpcUaNodeBase]) -> Union[bool, Tuple[Any, bool]]:
+ target_node = use_node(node_name)
+
+ # 检查是否有对应的参数值可用
+ current_value = value
+ if hasattr(self, '_workflow_params') and func_name in self._workflow_params:
+ current_value = self._workflow_params[func_name]
+ print(f"使用参数值 {func_name} = {current_value}")
+ else:
+ print(f"执行 {node_name}, {type(target_node).__name__}, {target_node.node_id}, {mode}, {current_value}")
+
+ if mode == 'read':
+ result_str = self.read_node(node_name)
+
+ try:
+ # 将字符串转换为字典
+ result_str = result_str.replace("'", '"') # 替换单引号为双引号以便JSON解析
+ result_dict = json.loads(result_str)
+
+ # 从字典获取值和错误标志
+ val = result_dict.get("value")
+ err = result_dict.get("error")
+
+ print(f"读取 {node_name} 返回值 = {val} (类型: {type(val).__name__}), 错误 = {err}")
+ return val, err
+ except Exception as e:
+ print(f"解析读取结果失败: {e}, 原始结果: {result_str}")
+ return None, True
+ elif mode == 'write':
+ # 构造完整的JSON输入,包含node_name和value
+ input_json = json.dumps({"node_name": node_name, "value": current_value})
+ result_str = self.write_node(input_json)
+
+ try:
+ # 解析返回的字符串为字典
+ result_str = result_str.replace("'", '"') # 替换单引号为双引号以便JSON解析
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"写入 {node_name} = {current_value}, 结果 = {success}")
+ return success
+ except Exception as e:
+ print(f"解析写入结果失败: {e}, 原始结果: {result_str}")
+ return False
+ elif mode == 'call' and hasattr(target_node, 'call'):
+ args = current_value if isinstance(current_value, list) else [current_value]
+ result = target_node.call(*args)
+ print(f"调用方法 {node_name} 参数 = {args}, 返回值 = {result}")
+ return result
+ return False
+
+ if func_name is None:
+ func_name = f"{node_name}_{mode}_{str(value)}"
+
+ print(f"创建 node function: {mode}, {func_name}")
+ self.function_name[func_name] = execute_node_function
+
+ return execute_node_function
+
+ def create_init_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None):
+ """
+ 创建初始化函数
+
+ 参数:
+ func_name: 函数名称
+ write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2}
+ 值可以是具体值,也可以是参数名称字符串(将从_workflow_params中查找)
+ """
+ if write_nodes is None:
+ raise ValueError("必须提供write_nodes参数")
+
+ def execute_init_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool:
+ if isinstance(write_nodes, list):
+ # 处理节点列表
+ for node_name in write_nodes:
+ # 尝试从参数中获取同名参数的值
+ current_value = True # 默认值
+ if hasattr(self, '_workflow_params') and node_name in self._workflow_params:
+ current_value = self._workflow_params[node_name]
+ print(f"初始化函数: 从参数获取值 {node_name} = {current_value}")
+
+ print(f"初始化函数: 写入节点 {node_name} = {current_value}")
+ input_json = json.dumps({"node_name": node_name, "value": current_value})
+ result_str = self.write_node(input_json)
+ try:
+ result_str = result_str.replace("'", '"')
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"初始化函数: 写入结果 = {success}")
+ except Exception as e:
+ print(f"初始化函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
+ elif isinstance(write_nodes, dict):
+ # 处理节点字典,使用指定的值
+ for node_name, node_value in write_nodes.items():
+ # 检查值是否是字符串类型的参数名
+ current_value = node_value
+ if isinstance(node_value, str) and hasattr(self, '_workflow_params') and node_value in self._workflow_params:
+ current_value = self._workflow_params[node_value]
+ print(f"初始化函数: 从参数获取值 {node_value} = {current_value}")
+
+ print(f"初始化函数: 写入节点 {node_name} = {current_value}")
+ input_json = json.dumps({"node_name": node_name, "value": current_value})
+ result_str = self.write_node(input_json)
+ try:
+ result_str = result_str.replace("'", '"')
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"初始化函数: 写入结果 = {success}")
+ except Exception as e:
+ print(f"初始化函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
+ return True
+
+ if func_name is None:
+ func_name = f"init_function_{str(time.time())}"
+
+ print(f"创建初始化函数: {func_name}")
+ self.function_name[func_name] = execute_init_function
+ return execute_init_function
+
+ def create_stop_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None):
+ """
+ 创建停止函数
+
+ 参数:
+ func_name: 函数名称
+ write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2}
+ """
+ if write_nodes is None:
+ raise ValueError("必须提供write_nodes参数")
+
+ def execute_stop_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool:
+ if isinstance(write_nodes, list):
+ # 处理节点列表,默认值都是False
+ for node_name in write_nodes:
+ # 直接写入False
+ print(f"停止函数: 写入节点 {node_name} = False")
+ input_json = json.dumps({"node_name": node_name, "value": False})
+ result_str = self.write_node(input_json)
+ try:
+ result_str = result_str.replace("'", '"')
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"停止函数: 写入结果 = {success}")
+ except Exception as e:
+ print(f"停止函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
+ elif isinstance(write_nodes, dict):
+ # 处理节点字典,使用指定的值
+ for node_name, node_value in write_nodes.items():
+ print(f"停止函数: 写入节点 {node_name} = {node_value}")
+ input_json = json.dumps({"node_name": node_name, "value": node_value})
+ result_str = self.write_node(input_json)
+ try:
+ result_str = result_str.replace("'", '"')
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"停止函数: 写入结果 = {success}")
+ except Exception as e:
+ print(f"停止函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
+ return True
+
+ if func_name is None:
+ func_name = f"stop_function_{str(time.time())}"
+
+ print(f"创建停止函数: {func_name}")
+ self.function_name[func_name] = execute_stop_function
+ return execute_stop_function
+
+ def create_cleanup_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None):
+ """
+ 创建清理函数
+
+ 参数:
+ func_name: 函数名称
+ write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2}
+ """
+ if write_nodes is None:
+ raise ValueError("必须提供write_nodes参数")
+
+ def execute_cleanup_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool:
+ if isinstance(write_nodes, list):
+ # 处理节点列表,默认值都是False
+ for node_name in write_nodes:
+ # 直接写入False
+ print(f"清理函数: 写入节点 {node_name} = False")
+ input_json = json.dumps({"node_name": node_name, "value": False})
+ result_str = self.write_node(input_json)
+ try:
+ result_str = result_str.replace("'", '"')
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"清理函数: 写入结果 = {success}")
+ except Exception as e:
+ print(f"清理函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
+ elif isinstance(write_nodes, dict):
+ # 处理节点字典,使用指定的值
+ for node_name, node_value in write_nodes.items():
+ print(f"清理函数: 写入节点 {node_name} = {node_value}")
+ input_json = json.dumps({"node_name": node_name, "value": node_value})
+ result_str = self.write_node(input_json)
+ try:
+ result_str = result_str.replace("'", '"')
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"清理函数: 写入结果 = {success}")
+ except Exception as e:
+ print(f"清理函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
+ return True
+
+ if func_name is None:
+ func_name = f"cleanup_function_{str(time.time())}"
+
+ print(f"创建清理函数: {func_name}")
+ self.function_name[func_name] = execute_cleanup_function
+ return execute_cleanup_function
+
+ def create_start_function(self, func_name: str, stop_condition_expression: str = "True", write_nodes: Union[Dict[str, Any], List[str]] = None, condition_nodes: Union[Dict[str, str], List[str]] = None):
+ """
+ 创建开始函数
+
+ 参数:
+ func_name: 函数名称
+ stop_condition_expression: 停止条件表达式,可直接引用节点名称
+ write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2}
+ condition_nodes: 条件节点列表 [节点名1, 节点名2]
+ """
+ def execute_start_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool:
+ # 直接处理写入节点
+ if write_nodes:
+ if isinstance(write_nodes, list):
+ # 处理节点列表,默认值都是True
+ for i, node_name in enumerate(write_nodes):
+ # 尝试获取与节点对应的参数值
+ param_name = f"write_{i}"
+
+ # 获取参数值(如果有)
+ current_value = True # 默认值
+ if hasattr(self, '_workflow_params') and param_name in self._workflow_params:
+ current_value = self._workflow_params[param_name]
+
+ # 直接写入节点
+ print(f"直接写入节点 {node_name} = {current_value}")
+ input_json = json.dumps({"node_name": node_name, "value": current_value})
+ result_str = self.write_node(input_json)
+ try:
+ result_str = result_str.replace("'", '"')
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"直接写入 {node_name} = {current_value}, 结果: {success}")
+ except Exception as e:
+ print(f"解析直接写入结果失败: {e}, 原始结果: {result_str}")
+ elif isinstance(write_nodes, dict):
+ # 处理节点字典,值是指定的
+ for node_name, node_value in write_nodes.items():
+ # 尝试获取参数值(如果节点名与参数名匹配)
+ current_value = node_value # 使用指定的默认值
+ if hasattr(self, '_workflow_params') and node_name in self._workflow_params:
+ current_value = self._workflow_params[node_name]
+
+ # 直接写入节点
+ print(f"直接写入节点 {node_name} = {current_value}")
+ input_json = json.dumps({"node_name": node_name, "value": current_value})
+ result_str = self.write_node(input_json)
+ try:
+ result_str = result_str.replace("'", '"')
+ result = json.loads(result_str)
+ success = result.get("success", False)
+ print(f"直接写入 {node_name} = {current_value}, 结果: {success}")
+ except Exception as e:
+ print(f"解析直接写入结果失败: {e}, 原始结果: {result_str}")
+
+ # 如果没有条件节点,立即返回
+ if not condition_nodes:
+ return True
+
+ # 处理条件检查和等待
+ while True:
+ next_loop = False
+ condition_source = {}
+
+ # 直接读取条件节点
+ if isinstance(condition_nodes, list):
+ # 处理节点列表
+ for i, node_name in enumerate(condition_nodes):
+ # 直接读取节点
+ result_str = self.read_node(node_name)
+ try:
+ result_str = result_str.replace("'", '"')
+ result_dict = json.loads(result_str)
+ read_res = result_dict.get("value")
+ read_err = result_dict.get("error", False)
+ print(f"直接读取 {node_name} 返回值 = {read_res}, 错误 = {read_err}")
+
+ if read_err:
+ next_loop = True
+ break
+
+ # 将节点值存入条件源字典,使用节点名称作为键
+ condition_source[node_name] = read_res
+ # 为了向后兼容,也保留read_i格式
+ condition_source[f"read_{i}"] = read_res
+ except Exception as e:
+ print(f"解析直接读取结果失败: {e}, 原始结果: {result_str}")
+ read_res, read_err = None, True
+ next_loop = True
+ break
+ elif isinstance(condition_nodes, dict):
+ # 处理节点字典
+ for condition_func, node_name in condition_nodes.items():
+ # 直接读取节点
+ result_str = self.read_node(node_name)
+ try:
+ result_str = result_str.replace("'", '"')
+ result_dict = json.loads(result_str)
+ read_res = result_dict.get("value")
+ read_err = result_dict.get("error", False)
+ print(f"直接读取 {node_name} 返回值 = {read_res}, 错误 = {read_err}")
+
+ if read_err:
+ next_loop = True
+ break
+
+ # 将节点值存入条件源字典
+ condition_source[node_name] = read_res
+ # 也保存使用函数名作为键
+ condition_source[condition_func] = read_res
+ except Exception as e:
+ print(f"解析直接读取结果失败: {e}, 原始结果: {result_str}")
+ next_loop = True
+ break
+
+ if not next_loop:
+ if stop_condition_expression:
+ # 添加调试信息
+ print(f"条件源数据: {condition_source}")
+ condition_source["__RESULT"] = None
+
+ # 确保安全地执行条件表达式
+ try:
+ # 先尝试使用eval更安全的方式计算表达式
+ result = eval(stop_condition_expression, {}, condition_source)
+ condition_source["__RESULT"] = result
+ except Exception as e:
+ print(f"使用eval执行表达式失败: {e}")
+ try:
+ # 回退到exec方式
+ exec(f"__RESULT = {stop_condition_expression}", {}, condition_source)
+ except Exception as e2:
+ print(f"使用exec执行表达式也失败: {e2}")
+ condition_source["__RESULT"] = False
+
+ res = condition_source["__RESULT"]
+ print(f"取得计算结果: {res}, 条件表达式: {stop_condition_expression}")
+
+ if res:
+ print("满足停止条件,结束工作流")
+ break
+ else:
+ # 如果没有停止条件,直接退出
+ break
+ else:
+ time.sleep(0.3)
+
+ return True
+
+ self.function_name[func_name] = execute_start_function
+ return execute_start_function
+
+ create_action_from_json = None
+
+ def create_action_from_json(self, data: Union[Dict, Any]) -> WorkflowAction:
+ """
+ 从JSON配置创建工作流动作
+
+ 参数:
+ data: 动作JSON数据
+
+ 返回:
+ WorkflowAction对象
+ """
+ # 初始化所需变量
+ start_function = None
+ write_nodes = {}
+ condition_nodes = []
+ stop_function = None
+ init_function = None
+ cleanup_function = None
+
+ # 提取start_function相关信息
+ if hasattr(data, "start_function") and data.start_function:
+ start_function = data.start_function
+ if "write_nodes" in start_function:
+ write_nodes = start_function["write_nodes"]
+ if "condition_nodes" in start_function:
+ condition_nodes = start_function["condition_nodes"]
+ elif isinstance(data, dict) and data.get("start_function"):
+ start_function = data.get("start_function")
+ if "write_nodes" in start_function:
+ write_nodes = start_function["write_nodes"]
+ if "condition_nodes" in start_function:
+ condition_nodes = start_function["condition_nodes"]
+
+ # 提取stop_function信息
+ if hasattr(data, "stop_function") and data.stop_function:
+ stop_function = data.stop_function
+ elif isinstance(data, dict) and data.get("stop_function"):
+ stop_function = data.get("stop_function")
+
+ # 提取init_function信息
+ if hasattr(data, "init_function") and data.init_function:
+ init_function = data.init_function
+ elif isinstance(data, dict) and data.get("init_function"):
+ init_function = data.get("init_function")
+
+ # 提取cleanup_function信息
+ if hasattr(data, "cleanup_function") and data.cleanup_function:
+ cleanup_function = data.cleanup_function
+ elif isinstance(data, dict) and data.get("cleanup_function"):
+ cleanup_function = data.get("cleanup_function")
+
+ # 创建工作流动作组件
+ init = None
+ start = None
+ stop = None
+ cleanup = None
+
+ # 处理init function
+ if init_function:
+ init_params = {"func_name": init_function.get("func_name")}
+ if "write_nodes" in init_function:
+ init_params["write_nodes"] = init_function["write_nodes"]
+ else:
+ # 如果没有write_nodes,创建一个空字典
+ init_params["write_nodes"] = {}
+
+ init = self.create_init_function(**init_params)
+
+ # 处理start function
+ if start_function:
+ start_params = {
+ "func_name": start_function.get("func_name"),
+ "stop_condition_expression": start_function.get("stop_condition_expression", "True"),
+ "write_nodes": write_nodes,
+ "condition_nodes": condition_nodes
+ }
+ start = self.create_start_function(**start_params)
+
+ # 处理stop function
+ if stop_function:
+ stop_params = {
+ "func_name": stop_function.get("func_name"),
+ "write_nodes": stop_function.get("write_nodes", {})
+ }
+ stop = self.create_stop_function(**stop_params)
+
+ # 处理cleanup function
+ if cleanup_function:
+ cleanup_params = {
+ "func_name": cleanup_function.get("func_name"),
+ "write_nodes": cleanup_function.get("write_nodes", {})
+ }
+ cleanup = self.create_cleanup_function(**cleanup_params)
+
+ return WorkflowAction(init=init, start=start, stop=stop, cleanup=cleanup)
+
+ workflow_name: Dict[str, OpcUaWorkflowModel] = {}
+
+ def create_workflow_from_json(self, data: List[Dict]) -> None:
+ """
+ 从JSON配置创建工作流程序
+
+ 参数:
+ data: 工作流配置列表
+ """
+ for ind, flow_dict in enumerate(data):
+ print(f"正在创建 workflow {ind}, {flow_dict['name']}")
+ actions = []
+
+ for i in flow_dict["action"]:
+ if isinstance(i, str):
+ print(f"沿用已有 workflow 作为 action: {i}")
+ action = self.workflow_name[i]
+ else:
+ print("创建 action")
+ # 直接将字典转换为SimplifiedActionJson对象或直接使用字典
+ action = self.create_action_from_json(i)
+
+ actions.append(action)
+
+ # 获取参数
+ parameters = flow_dict.get("parameters", [])
+
+ flow_instance = OpcUaWorkflowModel(
+ name=flow_dict["name"],
+ actions=actions,
+ parameters=parameters,
+ description=flow_dict.get("description", "")
+ )
+ print(f"创建完成 workflow: {flow_dict['name']}")
+ self.workflow_name[flow_dict["name"]] = flow_instance
+
+ def execute_workflow_from_json(self, data: List[str]) -> None:
+ for i in data:
+ print(f"正在执行 workflow: {i}")
+ self.run_opcua_workflow_model(self.workflow_name[i])
+
+ def execute_procedure_from_json(self, data: Union[ExecuteProcedureJson, Dict]) -> None:
+ """从JSON配置执行工作流程序"""
+ if isinstance(data, dict):
+ # 处理字典类型
+ register_params = data.get("register_node_list_from_csv_path")
+ create_flow = data.get("create_flow", [])
+ execute_flow = data.get("execute_flow", [])
+ else:
+ # 处理Pydantic模型类型
+ register_params = data.register_node_list_from_csv_path
+ create_flow = data.create_flow
+ execute_flow = data.execute_flow if hasattr(data, "execute_flow") else []
+
+ # 注册节点
+ if register_params:
+ print(f"注册节点 csv: {register_params}")
+ self.register_node_list_from_csv_path(**register_params)
+
+ # 创建工作流
+ print("创建工作流")
+ self.create_workflow_from_json(create_flow)
+
+ # 注册工作流为实例方法
+ self.register_workflows_as_methods()
+
+ # 如果存在execute_flow字段,则执行指定的工作流(向后兼容)
+ if execute_flow:
+ print("执行工作流")
+ self.execute_workflow_from_json(execute_flow)
+
+ def register_workflows_as_methods(self) -> None:
+ """将工作流注册为实例方法"""
+ for workflow_name, workflow in self.workflow_name.items():
+ # 获取工作流的参数信息(如果存在)
+ workflow_params = getattr(workflow, 'parameters', []) or []
+ workflow_desc = getattr(workflow, 'description', None) or f"执行工作流: {workflow_name}"
+
+ # 创建执行工作流的方法
+ def create_workflow_method(wf_name=workflow_name, wf=workflow, params=workflow_params):
+ def workflow_method(*args, **kwargs):
+ logger.info(f"执行工作流: {wf_name}, 参数: {args}, {kwargs}")
+
+ # 处理传入的参数
+ if params and (args or kwargs):
+ # 将位置参数转换为关键字参数
+ params_dict = {}
+ for i, param_name in enumerate(params):
+ if i < len(args):
+ params_dict[param_name] = args[i]
+
+ # 合并关键字参数
+ params_dict.update(kwargs)
+
+ # 保存参数,供节点函数使用
+ self._workflow_params = params_dict
+ else:
+ self._workflow_params = {}
+
+ # 执行工作流
+ result = self.run_opcua_workflow_model(wf)
+
+ # 清理参数
+ self._workflow_params = {}
+
+ return result
+
+ # 设置方法的文档字符串
+ workflow_method.__doc__ = workflow_desc
+ if params:
+ param_doc = ", ".join(params)
+ workflow_method.__doc__ += f"\n参数: {param_doc}"
+
+ return workflow_method
+
+ # 注册为实例方法
+ method = create_workflow_method()
+ setattr(self, workflow_name, method)
+ logger.info(f"已将工作流 '{workflow_name}' 注册为实例方法")
+
+ def read_node(self, node_name: str) -> Dict[str, Any]:
+ """
+ 读取节点值的便捷方法
+ 返回包含result字段的字典
+ """
+ try:
+ node = self.use_node(node_name)
+ value, error = node.read()
+
+ # 创建结果字典
+ result = {
+ "value": value,
+ "error": error,
+ "node_name": node_name,
+ "timestamp": time.time()
+ }
+
+ # 返回JSON字符串
+ return json.dumps(result)
+ except Exception as e:
+ logger.error(f"读取节点 {node_name} 失败: {e}")
+ # 创建错误结果字典
+ result = {
+ "value": None,
+ "error": True,
+ "node_name": node_name,
+ "error_message": str(e),
+ "timestamp": time.time()
+ }
+ return json.dumps(result)
+
+ def write_node(self, json_input: str) -> str:
+ """
+ 写入节点值的便捷方法
+ 接受单个JSON格式的字符串作为输入,包含节点名称和值
+ eg:'{\"node_name\":\"反应罐号码\",\"value\":\"2\"}'
+ 返回JSON格式的字符串,包含操作结果
+ """
+ try:
+ # 解析JSON格式的输入
+ if not isinstance(json_input, str):
+ json_input = str(json_input)
+
+ try:
+ input_data = json.loads(json_input)
+ if not isinstance(input_data, dict):
+ return json.dumps({"error": True, "error_message": "输入必须是包含node_name和value的JSON对象", "success": False})
+
+ # 从JSON中提取节点名称和值
+ node_name = input_data.get("node_name")
+ value = input_data.get("value")
+
+ if node_name is None:
+ return json.dumps({"error": True, "error_message": "JSON中缺少node_name字段", "success": False})
+ except json.JSONDecodeError as e:
+ return json.dumps({"error": True, "error_message": f"JSON解析错误: {str(e)}", "success": False})
+
+ node = self.use_node(node_name)
+ error = node.write(value)
+
+ # 创建结果字典
+ result = {
+ "value": value,
+ "error": error,
+ "node_name": node_name,
+ "timestamp": time.time(),
+ "success": not error
+ }
+
+ return json.dumps(result)
+ except Exception as e:
+ logger.error(f"写入节点失败: {e}")
+ result = {
+ "error": True,
+ "error_message": str(e),
+ "timestamp": time.time(),
+ "success": False
+ }
+ return json.dumps(result)
+
+ def call_method(self, node_name: str, *args) -> Tuple[Any, bool]:
+ """
+ 调用方法节点的便捷方法
+ 返回 (返回值, 是否出错)
+ """
+ try:
+ node = self.use_node(node_name)
+ if hasattr(node, 'call'):
+ return node.call(*args)
+ else:
+ logger.error(f"节点 {node_name} 不是方法节点")
+ return None, True
+ except Exception as e:
+ logger.error(f"调用方法 {node_name} 失败: {e}")
+ return None, True
+
+
+class OpcUaClient(BaseClient):
+ def __init__(self, url: str, config_path: str = None, username: str = None, password: str = None, refresh_interval: float = 1.0):
+ # 降低OPCUA库的日志级别
+ import logging
+ logging.getLogger("opcua").setLevel(logging.WARNING)
+
+ super().__init__()
+
+ client = Client(url)
+
+ if username and password:
+ client.set_user(username)
+ client.set_password(password)
+
+ self._set_client(client)
+ self._connect()
+
+ # 节点值缓存和刷新相关属性
+ self._node_values = {} # 缓存节点值
+ self._refresh_interval = refresh_interval # 刷新间隔(秒)
+ self._refresh_running = False
+ self._refresh_thread = None
+
+ # 如果提供了配置文件路径,则加载配置并注册工作流
+ if config_path:
+ self.load_config(config_path)
+
+ # 启动节点值刷新线程
+ self.start_node_refresh()
+
+ def _register_nodes_as_attributes(self):
+ """将所有节点注册为实例属性,可以通过self.node_name访问"""
+ for node_name, node in self._node_registry.items():
+ # 检查是否有对应的英文名称
+ eng_name = self._reverse_mapping.get(node_name)
+ if eng_name:
+ # 如果有对应的英文名称,使用英文名称作为属性名
+ attr_name = eng_name
+ else:
+ # 如果没有对应的英文名称,使用原始名称,但替换空格和特殊字符
+ attr_name = node_name.replace(' ', '_').replace('-', '_')
+
+ # 创建获取节点值的属性方法,使用中文名称获取节点值
+ def create_property_getter(node_key):
+ def getter(self):
+ # 优先从缓存获取值
+ if node_key in self._node_values:
+ return self._node_values[node_key]
+ # 缓存中没有则直接读取
+ value, _ = self.use_node(node_key).read()
+ return value
+ return getter
+
+ # 使用property装饰器将方法注册为类属性
+ setattr(OpcUaClient, attr_name, property(create_property_getter(node_name)))
+ logger.info(f"已注册节点 '{node_name}' 为属性 '{attr_name}'")
+
+ def refresh_node_values(self):
+ """刷新所有节点的值到缓存"""
+ if not self.client:
+ logger.warning("客户端未初始化,无法刷新节点值")
+ return
+
+ try:
+ # 简单检查连接状态,如果不连接会抛出异常
+ self.client.get_namespace_array()
+ except Exception as e:
+ logger.warning(f"客户端连接异常,无法刷新节点值: {e}")
+ return
+
+ for node_name, node in self._node_registry.items():
+ try:
+ if hasattr(node, 'read'):
+ value, error = node.read()
+ if not error:
+ self._node_values[node_name] = value
+ #logger.debug(f"已刷新节点 '{node_name}' 的值: {value}")
+ except Exception as e:
+ logger.error(f"刷新节点 '{node_name}' 失败: {e}")
+
+ def get_node_value(self, name):
+ """获取节点值,支持中文名和英文名"""
+ # 如果提供的是英文名,转换为中文名
+ if name in self._name_mapping:
+ chinese_name = self._name_mapping[name]
+ # 优先从缓存获取值
+ if chinese_name in self._node_values:
+ return self._node_values[chinese_name]
+ # 缓存中没有则直接读取
+ value, _ = self.use_node(chinese_name).read()
+ return value
+ # 如果提供的是中文名,直接使用
+ elif name in self._node_registry:
+ # 优先从缓存获取值
+ if name in self._node_values:
+ return self._node_values[name]
+ # 缓存中没有则直接读取
+ value, _ = self.use_node(name).read()
+ return value
+ else:
+ raise ValueError(f"未找到名称为 '{name}' 的节点")
+
+ def set_node_value(self, name, value):
+ """设置节点值,支持中文名和英文名"""
+ # 如果提供的是英文名,转换为中文名
+ if name in self._name_mapping:
+ chinese_name = self._name_mapping[name]
+ node = self.use_node(chinese_name)
+ # 如果提供的是中文名,直接使用
+ elif name in self._node_registry:
+ node = self.use_node(name)
+ else:
+ raise ValueError(f"未找到名称为 '{name}' 的节点")
+
+ # 写入值
+ error = node.write(value)
+ if not error:
+ # 更新缓存
+ if hasattr(node, 'name'):
+ self._node_values[node.name] = value
+ return True
+ return False
+
+ def _refresh_worker(self):
+ """节点值刷新线程的工作函数"""
+ self._refresh_running = True
+ logger.info(f"节点值刷新线程已启动,刷新间隔: {self._refresh_interval}秒")
+
+ while self._refresh_running:
+ try:
+ self.refresh_node_values()
+ except Exception as e:
+ logger.error(f"节点值刷新过程出错: {e}")
+
+ # 等待下一次刷新
+ time.sleep(self._refresh_interval)
+
+ def start_node_refresh(self):
+ """启动节点值刷新线程"""
+ if self._refresh_thread is not None and self._refresh_thread.is_alive():
+ logger.warning("节点值刷新线程已在运行")
+ return
+
+ import threading
+ self._refresh_thread = threading.Thread(target=self._refresh_worker, daemon=True)
+ self._refresh_thread.start()
+
+ def stop_node_refresh(self):
+ """停止节点值刷新线程"""
+ self._refresh_running = False
+ if self._refresh_thread and self._refresh_thread.is_alive():
+ self._refresh_thread.join(timeout=2.0)
+ logger.info("节点值刷新线程已停止")
+
+ def load_config(self, config_path: str) -> None:
+ """从JSON配置文件加载并注册工作流"""
+ try:
+ with open(config_path, 'r', encoding='utf-8') as f:
+ config_data = json.load(f)
+
+ # 处理节点注册
+ if "register_node_list_from_csv_path" in config_data:
+ # 获取配置文件所在目录
+ config_dir = os.path.dirname(os.path.abspath(config_path))
+
+ # 处理CSV路径,如果是相对路径,则相对于配置文件所在目录
+ if "path" in config_data["register_node_list_from_csv_path"]:
+ csv_path = config_data["register_node_list_from_csv_path"]["path"]
+ if not os.path.isabs(csv_path):
+ # 转换为绝对路径
+ csv_path = os.path.join(config_dir, csv_path)
+ config_data["register_node_list_from_csv_path"]["path"] = csv_path
+
+ # 直接使用字典
+ self.register_node_list_from_csv_path(**config_data["register_node_list_from_csv_path"])
+
+ # 处理工作流创建
+ if "create_flow" in config_data:
+ # 直接传递字典列表
+ self.create_workflow_from_json(config_data["create_flow"])
+ # 将工作流注册为实例方法
+ self.register_workflows_as_methods()
+
+ # 将所有节点注册为属性
+ self._register_nodes_as_attributes()
+
+ logger.info(f"成功从 {config_path} 加载配置")
+ except Exception as e:
+ logger.error(f"加载配置文件 {config_path} 失败: {e}")
+ traceback.print_exc()
+
+ def disconnect(self):
+ # 停止刷新线程
+ self.stop_node_refresh()
+
+ if self.client:
+ self.client.disconnect()
+ logger.info("OPC UA client disconnected")
+
+
+if __name__ == '__main__':
+ # 示例用法
+
+ # 使用配置文件创建客户端并自动注册工作流
+ import os
+ current_dir = os.path.dirname(os.path.abspath(__file__))
+ config_path = os.path.join(current_dir, "opcua_huairou.json")
+
+ # 创建OPC UA客户端并加载配置
+ try:
+ client = OpcUaClient(
+ url="opc.tcp://localhost:4840/freeopcua/server/", # 替换为实际的OPC UA服务器地址
+ config_path=config_path # 传入配置文件路径
+ )
+
+ # 列出所有已注册的工作流
+ print("\n已注册的工作流:")
+ for workflow_name in client.workflow_name:
+ print(f" - {workflow_name}")
+
+ # 测试trigger_grab_action工作流 - 使用英文参数名
+ print("\n测试trigger_grab_action工作流 - 使用英文参数名:")
+ client.trigger_grab_action(reaction_tank_number=2, raw_tank_number=3)
+
+ # 读取节点值 - 使用英文节点名
+ grab_complete = client.get_node_value("grab_complete")
+ reaction_tank = client.get_node_value("reaction_tank_number")
+ raw_tank = client.get_node_value("raw_tank_number")
+
+ print(f"\n执行后状态检查 (使用英文节点名):")
+ print(f" - 抓取完成状态: {grab_complete}")
+ print(f" - 当前反应罐号码: {reaction_tank}")
+ print(f" - 当前原料罐号码: {raw_tank}")
+
+ # 测试节点值写入 - 使用英文节点名
+ print("\n测试节点值写入 (使用英文节点名):")
+ success = client.set_node_value("atomization_fast_speed", 150.5)
+ print(f" - 写入搅拌浆雾化快速 = 150.5, 结果: {success}")
+
+ # 读取写入的值
+ atomization_speed = client.get_node_value("atomization_fast_speed")
+ print(f" - 读取搅拌浆雾化快速: {atomization_speed}")
+
+ # 断开连接
+ client.disconnect()
+
+ except Exception as e:
+ print(f"错误: {e}")
+ traceback.print_exc()
+
+
diff --git a/unilabos/device_comms/opcua_client/node/__init__.py b/unilabos/device_comms/opcua_client/node/__init__.py
new file mode 100644
index 0000000..968e905
--- /dev/null
+++ b/unilabos/device_comms/opcua_client/node/__init__.py
@@ -0,0 +1,10 @@
+from unilabos.device_comms.opcua_client.node.uniopcua import Variable, Method, Object, NodeType, DataType, Base
+
+__all__ = [
+ 'Variable',
+ 'Method',
+ 'Object',
+ 'NodeType',
+ 'DataType',
+ 'Base',
+]
\ No newline at end of file
diff --git a/unilabos/device_comms/opcua_client/node/uniopcua.py b/unilabos/device_comms/opcua_client/node/uniopcua.py
new file mode 100644
index 0000000..ce16cfc
--- /dev/null
+++ b/unilabos/device_comms/opcua_client/node/uniopcua.py
@@ -0,0 +1,180 @@
+# coding=utf-8
+from enum import Enum
+from abc import ABC, abstractmethod
+from typing import Tuple, Union, Optional, Any, List
+
+from opcua import Client, Node
+from opcua.ua import NodeId, NodeClass, VariantType
+
+
+class DataType(Enum):
+ BOOLEAN = VariantType.Boolean
+ SBYTE = VariantType.SByte
+ BYTE = VariantType.Byte
+ INT16 = VariantType.Int16
+ UINT16 = VariantType.UInt16
+ INT32 = VariantType.Int32
+ UINT32 = VariantType.UInt32
+ INT64 = VariantType.Int64
+ UINT64 = VariantType.UInt64
+ FLOAT = VariantType.Float
+ DOUBLE = VariantType.Double
+ STRING = VariantType.String
+ DATETIME = VariantType.DateTime
+ BYTESTRING = VariantType.ByteString
+
+
+class NodeType(Enum):
+ VARIABLE = NodeClass.Variable
+ OBJECT = NodeClass.Object
+ METHOD = NodeClass.Method
+ OBJECTTYPE = NodeClass.ObjectType
+ VARIABLETYPE = NodeClass.VariableType
+ REFERENCETYPE = NodeClass.ReferenceType
+ DATATYPE = NodeClass.DataType
+ VIEW = NodeClass.View
+
+
+class Base(ABC):
+ def __init__(self, client: Client, name: str, node_id: str, typ: NodeType, data_type: DataType):
+ self._node_id: str = node_id
+ self._client = client
+ self._name = name
+ self._type = typ
+ self._data_type = data_type
+ self._node: Optional[Node] = None
+
+ def _get_node(self) -> Node:
+ if self._node is None:
+ try:
+ # 检查是否是NumericNodeId(ns=X;i=Y)格式
+ if "NumericNodeId" in self._node_id:
+ # 从字符串中提取命名空间和标识符
+ import re
+ match = re.search(r'ns=(\d+);i=(\d+)', self._node_id)
+ if match:
+ ns = int(match.group(1))
+ identifier = int(match.group(2))
+ node_id = NodeId(identifier, ns)
+ self._node = self._client.get_node(node_id)
+ else:
+ raise ValueError(f"无法解析节点ID: {self._node_id}")
+ else:
+ # 直接使用节点ID字符串
+ self._node = self._client.get_node(self._node_id)
+ except Exception as e:
+ print(f"获取节点失败: {self._node_id}, 错误: {e}")
+ raise
+ return self._node
+
+ @abstractmethod
+ def read(self) -> Tuple[Any, bool]:
+ """读取节点值,返回(值, 是否出错)"""
+ pass
+
+ @abstractmethod
+ def write(self, value: Any) -> bool:
+ """写入节点值,返回是否出错"""
+ pass
+
+ @property
+ def type(self) -> NodeType:
+ return self._type
+
+ @property
+ def node_id(self) -> str:
+ return self._node_id
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+
+class Variable(Base):
+ def __init__(self, client: Client, name: str, node_id: str, data_type: DataType):
+ super().__init__(client, name, node_id, NodeType.VARIABLE, data_type)
+
+ def read(self) -> Tuple[Any, bool]:
+ try:
+ value = self._get_node().get_value()
+ return value, False
+ except Exception as e:
+ print(f"读取变量 {self._name} 失败: {e}")
+ return None, True
+
+ def write(self, value: Any) -> bool:
+ try:
+ self._get_node().set_value(value)
+ return False
+ except Exception as e:
+ print(f"写入变量 {self._name} 失败: {e}")
+ return True
+
+
+class Method(Base):
+ def __init__(self, client: Client, name: str, node_id: str, parent_node_id: str, data_type: DataType):
+ super().__init__(client, name, node_id, NodeType.METHOD, data_type)
+ self._parent_node_id = parent_node_id
+ self._parent_node = None
+
+ def _get_parent_node(self) -> Node:
+ if self._parent_node is None:
+ try:
+ # 检查是否是NumericNodeId(ns=X;i=Y)格式
+ if "NumericNodeId" in self._parent_node_id:
+ # 从字符串中提取命名空间和标识符
+ import re
+ match = re.search(r'ns=(\d+);i=(\d+)', self._parent_node_id)
+ if match:
+ ns = int(match.group(1))
+ identifier = int(match.group(2))
+ node_id = NodeId(identifier, ns)
+ self._parent_node = self._client.get_node(node_id)
+ else:
+ raise ValueError(f"无法解析父节点ID: {self._parent_node_id}")
+ else:
+ # 直接使用节点ID字符串
+ self._parent_node = self._client.get_node(self._parent_node_id)
+ except Exception as e:
+ print(f"获取父节点失败: {self._parent_node_id}, 错误: {e}")
+ raise
+ return self._parent_node
+
+ def read(self) -> Tuple[Any, bool]:
+ """方法节点不支持读取操作"""
+ return None, True
+
+ def write(self, value: Any) -> bool:
+ """方法节点不支持写入操作"""
+ return True
+
+ def call(self, *args) -> Tuple[Any, bool]:
+ """调用方法,返回(返回值, 是否出错)"""
+ try:
+ result = self._get_parent_node().call_method(self._get_node(), *args)
+ return result, False
+ except Exception as e:
+ print(f"调用方法 {self._name} 失败: {e}")
+ return None, True
+
+
+class Object(Base):
+ def __init__(self, client: Client, name: str, node_id: str):
+ super().__init__(client, name, node_id, NodeType.OBJECT, None)
+
+ def read(self) -> Tuple[Any, bool]:
+ """对象节点不支持直接读取操作"""
+ return None, True
+
+ def write(self, value: Any) -> bool:
+ """对象节点不支持直接写入操作"""
+ return True
+
+ def get_children(self) -> Tuple[List[Node], bool]:
+ """获取子节点列表,返回(子节点列表, 是否出错)"""
+ try:
+ children = self._get_node().get_children()
+ return children, False
+ except Exception as e:
+ print(f"获取对象 {self._name} 的子节点失败: {e}")
+ return [], True
\ No newline at end of file
diff --git a/unilabos/device_comms/opcua_client/opcua_nodes_example.csv b/unilabos/device_comms/opcua_client/opcua_nodes_example.csv
new file mode 100644
index 0000000..42458f1
--- /dev/null
+++ b/unilabos/device_comms/opcua_client/opcua_nodes_example.csv
@@ -0,0 +1,2 @@
+Name,EnglishName,NodeType,DataType,NodeLanguage
+中文名,EnglishName,VARIABLE,INT32,Chinese
diff --git a/unilabos/device_comms/opcua_client/opcua_workflow_example.json b/unilabos/device_comms/opcua_client/opcua_workflow_example.json
new file mode 100644
index 0000000..d728639
--- /dev/null
+++ b/unilabos/device_comms/opcua_client/opcua_workflow_example.json
@@ -0,0 +1,30 @@
+{
+ "register_node_list_from_csv_path": {
+ "path": "opcua_nodes_example.csv"
+ },
+ "create_flow": [
+ {
+ "name": "name",
+ "description": "description",
+ "parameters": ["parameter1", "parameter2"],
+ "action": [
+ {
+ "init_function": {
+ "func_name": "init_grab_params",
+ "write_nodes": ["parameter1", "parameter2"]
+ },
+ "start_function": {
+ "func_name": "start_grab",
+ "write_nodes": {"parameter_start": true},
+ "condition_nodes": ["parameter_condition"],
+ "stop_condition_expression": "parameter_condition == True"
+ },
+ "stop_function": {
+ "func_name": "stop_grab",
+ "write_nodes": {"parameter_start": false}
+ }
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/unilabos/device_comms/opcua_client/server.py b/unilabos/device_comms/opcua_client/server.py
new file mode 100644
index 0000000..481fd6b
--- /dev/null
+++ b/unilabos/device_comms/opcua_client/server.py
@@ -0,0 +1,311 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+OPC UA测试服务器
+用于测试OPC UA客户端功能,特别是temperature_control和valve_control工作流
+"""
+
+import sys
+import time
+import logging
+from opcua import Server, ua
+import threading
+
+# 设置日志
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+
+class OpcUaTestServer:
+ """OPC UA测试服务器类"""
+
+ def __init__(self, endpoint="opc.tcp://localhost:4840/freeopcua/server/"):
+ """
+ 初始化OPC UA服务器
+
+ Args:
+ endpoint: 服务器端点URL
+ """
+ self.server = Server()
+ self.server.set_endpoint(endpoint)
+
+ # 设置服务器名称
+ self.server.set_server_name("UniLabOS OPC UA Test Server")
+
+ # 设置服务器命名空间
+ self.idx = self.server.register_namespace("http://unilabos.com/opcua/test")
+
+ # 获取Objects节点
+ self.objects = self.server.get_objects_node()
+
+ # 创建设备对象
+ self.device = self.objects.add_object(self.idx, "TestDevice")
+
+ # 存储所有节点的字典
+ self.nodes = {}
+
+ # 初始化标志
+ self.running = False
+
+ # 控制标志
+ self.simulation_active = True
+
+ def add_variable(self, name, value, data_type=None):
+ """
+ 添加变量节点
+
+ Args:
+ name: 变量名称
+ value: 初始值
+ data_type: 数据类型 (可选)
+ """
+ if data_type is None:
+ var = self.device.add_variable(self.idx, name, value)
+ else:
+ var = self.device.add_variable(self.idx, name, value, data_type)
+
+ # 设置变量可写
+ var.set_writable()
+
+ # 存储节点
+ self.nodes[name] = var
+ logger.info(f"添加变量节点: {name}, 初始值: {value}")
+ return var
+
+ def add_method(self, name, callback, inputs=None, outputs=None):
+ """
+ 添加方法节点
+
+ Args:
+ name: 方法名称
+ callback: 回调函数
+ inputs: 输入参数列表 [(name, type), ...]
+ outputs: 输出参数列表 [(name, type), ...]
+ """
+ if inputs is None:
+ inputs = []
+ if outputs is None:
+ outputs = []
+
+ # 创建输入参数
+ input_args = []
+ for arg_name, arg_type in inputs:
+ input_args.append(ua.Argument())
+ input_args[-1].Name = arg_name
+ input_args[-1].DataType = arg_type
+ input_args[-1].ValueRank = -1
+
+ # 创建输出参数
+ output_args = []
+ for arg_name, arg_type in outputs:
+ output_args.append(ua.Argument())
+ output_args[-1].Name = arg_name
+ output_args[-1].DataType = arg_type
+ output_args[-1].ValueRank = -1
+
+ # 添加方法
+ method = self.device.add_method(
+ self.idx,
+ name,
+ callback,
+ input_args,
+ output_args
+ )
+
+ # 存储节点
+ self.nodes[name] = method
+ logger.info(f"添加方法节点: {name}")
+ return method
+
+ def start(self):
+ """启动服务器"""
+ if not self.running:
+ self.server.start()
+ self.running = True
+ logger.info("OPC UA服务器已启动")
+
+ # 启动模拟线程
+ self.simulation_thread = threading.Thread(target=self.run_simulation)
+ self.simulation_thread.daemon = True
+ self.simulation_thread.start()
+
+ def stop(self):
+ """停止服务器"""
+ if self.running:
+ self.simulation_active = False
+ if hasattr(self, 'simulation_thread'):
+ self.simulation_thread.join(timeout=2)
+ self.server.stop()
+ self.running = False
+ logger.info("OPC UA服务器已停止")
+
+ def get_node(self, name):
+ """获取节点"""
+ if name in self.nodes:
+ return self.nodes[name]
+ return None
+
+ def update_variable(self, name, value):
+ """更新变量值"""
+ if name in self.nodes:
+ self.nodes[name].set_value(value)
+ logger.debug(f"更新变量 {name} = {value}")
+ return True
+ logger.warning(f"变量 {name} 不存在")
+ return False
+
+ def run_simulation(self):
+ """运行模拟线程"""
+ logger.info("启动模拟线程")
+
+ temp = 20.0
+ valve_position = 0.0
+ flow_rate = 0.0
+
+ while self.simulation_active and self.running:
+ try:
+ # 温度控制模拟
+ heating_enabled = self.get_node("HeatingEnabled").get_value()
+ setpoint = self.get_node("Setpoint").get_value()
+
+ if heating_enabled:
+ self.update_variable("HeatingStatus", True)
+ if temp < setpoint:
+ temp += 0.5 # 加快温度上升速度
+ else:
+ temp -= 0.1
+ else:
+ self.update_variable("HeatingStatus", False)
+ if temp > 20.0:
+ temp -= 0.2
+
+ # 更新温度
+ self.update_variable("Temperature", round(temp, 2))
+
+ # 阀门控制模拟
+ valve_control = self.get_node("ValveControl").get_value()
+ valve_setpoint = self.get_node("ValveSetpoint").get_value()
+
+ if valve_control:
+ if valve_position < valve_setpoint:
+ valve_position += 5.0 # 加快阀门开启速度
+ if valve_position > valve_setpoint:
+ valve_position = valve_setpoint
+ else:
+ valve_position -= 1.0
+ if valve_position < 0:
+ valve_position = 0
+ else:
+ if valve_position > 0:
+ valve_position -= 5.0
+ if valve_position < 0:
+ valve_position = 0
+
+ # 更新阀门位置
+ self.update_variable("ValvePosition", round(valve_position, 2))
+
+ # 流量模拟 - 与阀门位置成正比
+ flow_rate = valve_position * 0.2 # 简单线性关系
+ self.update_variable("FlowRate", round(flow_rate, 2))
+
+ # 更新系统状态
+ status = []
+ if heating_enabled:
+ status.append("Heating")
+ if valve_control:
+ status.append("Valve_Open")
+
+ if status:
+ self.update_variable("SystemStatus", "_".join(status))
+ else:
+ self.update_variable("SystemStatus", "Idle")
+
+ # 每200毫秒更新一次
+ time.sleep(0.2)
+
+ except Exception as e:
+ logger.error(f"模拟线程错误: {e}")
+ time.sleep(1) # 出错时稍等一会再继续
+
+ logger.info("模拟线程已停止")
+
+def reset_alarm_callback(parent, *args):
+ """重置报警的回调函数"""
+ logger.info("调用了重置报警方法")
+ return True
+
+def start_process_callback(parent, *args):
+ """启动流程的回调函数"""
+ process_id = args[0] if args else 0
+ logger.info(f"启动流程 ID: {process_id}")
+ return process_id
+
+def stop_process_callback(parent, *args):
+ """停止流程的回调函数"""
+ process_id = args[0] if args else 0
+ logger.info(f"停止流程 ID: {process_id}")
+ return True
+
+def main():
+ """主函数"""
+ try:
+ # 创建服务器
+ server = OpcUaTestServer()
+
+ # 添加变量节点 - 温度控制相关
+ server.add_variable("Temperature", 20.0, ua.VariantType.Float)
+ server.add_variable("Setpoint", 22.0, ua.VariantType.Float)
+ server.add_variable("HeatingEnabled", False, ua.VariantType.Boolean)
+ server.add_variable("HeatingStatus", False, ua.VariantType.Boolean)
+
+ # 添加变量节点 - 阀门控制相关
+ server.add_variable("ValvePosition", 0.0, ua.VariantType.Float)
+ server.add_variable("ValveSetpoint", 0.0, ua.VariantType.Float)
+ server.add_variable("ValveControl", False, ua.VariantType.Boolean)
+ server.add_variable("FlowRate", 0.0, ua.VariantType.Float)
+
+ # 其他状态变量
+ server.add_variable("SystemStatus", "Idle", ua.VariantType.String)
+
+ # 添加方法节点
+ server.add_method(
+ "ResetAlarm",
+ reset_alarm_callback,
+ [],
+ [("Result", ua.VariantType.Boolean)]
+ )
+
+ server.add_method(
+ "StartProcess",
+ start_process_callback,
+ [("ProcessId", ua.VariantType.Int32)],
+ [("Result", ua.VariantType.Int32)]
+ )
+
+ server.add_method(
+ "StopProcess",
+ stop_process_callback,
+ [("ProcessId", ua.VariantType.Int32)],
+ [("Result", ua.VariantType.Boolean)]
+ )
+
+ # 启动服务器
+ server.start()
+ logger.info("服务器已启动,按Ctrl+C停止")
+
+ # 保持服务器运行
+ try:
+ while True:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ logger.info("收到键盘中断,正在停止服务器...")
+
+ # 停止服务器
+ server.stop()
+
+ except Exception as e:
+ logger.error(f"服务器错误: {e}")
+ import traceback
+ traceback.print_exc()
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/initial_positions.yaml b/unilabos/device_mesh/devices/benyao_arm/config/initial_positions.yaml
new file mode 100644
index 0000000..94fb9f5
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/initial_positions.yaml
@@ -0,0 +1,9 @@
+# Default initial positions for full_dev's ros2_control fake system
+
+initial_positions:
+ arm_base_joint: 0
+ arm_link_1_joint: 0
+ arm_link_2_joint: 0
+ arm_link_3_joint: 0
+ gripper_base_joint: 0
+ gripper_right_joint: 0.03
\ No newline at end of file
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/joint_limits.yaml b/unilabos/device_mesh/devices/benyao_arm/config/joint_limits.yaml
new file mode 100644
index 0000000..d4dffc3
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/joint_limits.yaml
@@ -0,0 +1,40 @@
+# joint_limits.yaml allows the dynamics properties specified in the URDF to be overwritten or augmented as needed
+
+# For beginners, we downscale velocity and acceleration limits.
+# You can always specify higher scaling factors (<= 1.0) in your motion requests. # Increase the values below to 1.0 to always move at maximum speed.
+default_velocity_scaling_factor: 0.1
+default_acceleration_scaling_factor: 0.1
+
+# Specific joint properties can be changed with the keys [max_position, min_position, max_velocity, max_acceleration]
+# Joint limits can be turned off with [has_velocity_limits, has_acceleration_limits]
+joint_limits:
+ arm_base_joint:
+ has_velocity_limits: true
+ max_velocity: 0
+ has_acceleration_limits: false
+ max_acceleration: 0
+ arm_link_1_joint:
+ has_velocity_limits: true
+ max_velocity: 0
+ has_acceleration_limits: false
+ max_acceleration: 0
+ arm_link_2_joint:
+ has_velocity_limits: true
+ max_velocity: 0
+ has_acceleration_limits: false
+ max_acceleration: 0
+ arm_link_3_joint:
+ has_velocity_limits: true
+ max_velocity: 0
+ has_acceleration_limits: false
+ max_acceleration: 0
+ gripper_base_joint:
+ has_velocity_limits: true
+ max_velocity: 0
+ has_acceleration_limits: false
+ max_acceleration: 0
+ gripper_right_joint:
+ has_velocity_limits: true
+ max_velocity: 0
+ has_acceleration_limits: false
+ max_acceleration: 0
\ No newline at end of file
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/kinematics.yaml b/unilabos/device_mesh/devices/benyao_arm/config/kinematics.yaml
new file mode 100644
index 0000000..c9a5d60
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/kinematics.yaml
@@ -0,0 +1,4 @@
+arm:
+ kinematics_solver: lma_kinematics_plugin/LMAKinematicsPlugin
+ kinematics_solver_search_resolution: 0.0050000000000000001
+ kinematics_solver_timeout: 0.0050000000000000001
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/macro.ros2_control.xacro b/unilabos/device_mesh/devices/benyao_arm/config/macro.ros2_control.xacro
new file mode 100644
index 0000000..70f8634
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/macro.ros2_control.xacro
@@ -0,0 +1,56 @@
+
+
+
+
+
+
+
+
+ mock_components/GenericSystem
+
+
+
+
+ ${initial_positions['arm_base_joint']}
+
+
+
+
+
+
+ ${initial_positions['arm_link_1_joint']}
+
+
+
+
+
+
+ ${initial_positions['arm_link_2_joint']}
+
+
+
+
+
+
+ ${initial_positions['arm_link_3_joint']}
+
+
+
+
+
+
+ ${initial_positions['gripper_base_joint']}
+
+
+
+
+
+
+ ${initial_positions['gripper_right_joint']}
+
+
+
+
+
+
+
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/macro.srdf.xacro b/unilabos/device_mesh/devices/benyao_arm/config/macro.srdf.xacro
new file mode 100644
index 0000000..aeb5677
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/macro.srdf.xacro
@@ -0,0 +1,46 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/move_group.json b/unilabos/device_mesh/devices/benyao_arm/config/move_group.json
new file mode 100644
index 0000000..5be9ad3
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/move_group.json
@@ -0,0 +1,14 @@
+{
+ "arm":
+ {
+ "joint_names": [
+ "arm_base_joint",
+ "arm_link_1_joint",
+ "arm_link_2_joint",
+ "arm_link_3_joint",
+ "gripper_base_joint"
+ ],
+ "base_link_name": "device_link",
+ "end_effector_name": "gripper_base"
+ }
+}
\ No newline at end of file
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/moveit_controllers.yaml b/unilabos/device_mesh/devices/benyao_arm/config/moveit_controllers.yaml
new file mode 100644
index 0000000..70a1b55
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/moveit_controllers.yaml
@@ -0,0 +1,29 @@
+# MoveIt uses this configuration for controller management
+
+moveit_controller_manager: moveit_simple_controller_manager/MoveItSimpleControllerManager
+
+moveit_simple_controller_manager:
+ controller_names:
+ - arm_controller
+ - gripper_controller
+
+ arm_controller:
+ type: FollowJointTrajectory
+ action_ns: follow_joint_trajectory
+ default: true
+ joints:
+ - arm_base_joint
+ - arm_link_1_joint
+ - arm_link_2_joint
+ - arm_link_3_joint
+ - gripper_base_joint
+ action_ns: follow_joint_trajectory
+ default: true
+ gripper_controller:
+ type: FollowJointTrajectory
+ action_ns: follow_joint_trajectory
+ default: true
+ joints:
+ - gripper_right_joint
+ action_ns: follow_joint_trajectory
+ default: true
\ No newline at end of file
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/moveit_planners.yaml b/unilabos/device_mesh/devices/benyao_arm/config/moveit_planners.yaml
new file mode 100644
index 0000000..8560e1c
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/moveit_planners.yaml
@@ -0,0 +1,2 @@
+planner_configs:
+ - ompl_interface/OMPLPlanner
\ No newline at end of file
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/pilz_cartesian_limits.yaml b/unilabos/device_mesh/devices/benyao_arm/config/pilz_cartesian_limits.yaml
new file mode 100644
index 0000000..b2997ca
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/pilz_cartesian_limits.yaml
@@ -0,0 +1,6 @@
+# Limits for the Pilz planner
+cartesian_limits:
+ max_trans_vel: 1.0
+ max_trans_acc: 2.25
+ max_trans_dec: -5.0
+ max_rot_vel: 1.57
diff --git a/unilabos/device_mesh/devices/benyao_arm/config/ros2_controllers.yaml b/unilabos/device_mesh/devices/benyao_arm/config/ros2_controllers.yaml
new file mode 100644
index 0000000..9c68cbf
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/config/ros2_controllers.yaml
@@ -0,0 +1,39 @@
+# This config file is used by ros2_control
+controller_manager:
+ ros__parameters:
+ update_rate: 100 # Hz
+
+ arm_controller:
+ type: joint_trajectory_controller/JointTrajectoryController
+
+
+ gripper_controller:
+ type: joint_trajectory_controller/JointTrajectoryController
+
+
+ joint_state_broadcaster:
+ type: joint_state_broadcaster/JointStateBroadcaster
+
+arm_controller:
+ ros__parameters:
+ joints:
+ - arm_base_joint
+ - arm_link_1_joint
+ - arm_link_2_joint
+ - arm_link_3_joint
+ - gripper_base_joint
+ command_interfaces:
+ - position
+ state_interfaces:
+ - position
+ - velocity
+
+gripper_controller:
+ ros__parameters:
+ joints:
+ - gripper_right_joint
+ command_interfaces:
+ - position
+ state_interfaces:
+ - position
+ - velocity
\ No newline at end of file
diff --git a/unilabos/device_mesh/devices/benyao_arm/joint_limit.yaml b/unilabos/device_mesh/devices/benyao_arm/joint_limit.yaml
new file mode 100644
index 0000000..b141262
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/joint_limit.yaml
@@ -0,0 +1,44 @@
+joint_limits:
+
+ arm_base_joint:
+ effort: 50
+ velocity: 1.0
+ lower: 0
+ upper: 1.5
+
+ arm_link_1_joint:
+ effort: 50
+ velocity: 1.0
+ lower: 0
+ upper: 0.6
+
+ arm_link_2_joint:
+ effort: 50
+ velocity: 1.0
+ lower: !degrees -95
+ upper: !degrees 95
+
+ arm_link_3_joint:
+ effort: 50
+ velocity: 1.0
+ lower: !degrees -195
+ upper: !degrees 195
+
+ gripper_base_joint:
+ effort: 50
+ velocity: 1.0
+ lower: !degrees -95
+ upper: !degrees 95
+
+
+ gripper_right_joint:
+ effort: 50
+ velocity: 1.0
+ lower: 0
+ upper: 0.03
+
+ gripper_left_joint:
+ effort: 50
+ velocity: 1.0
+ lower: 0
+ upper: 0.03
diff --git a/unilabos/device_mesh/devices/benyao_arm/macro_device.xacro b/unilabos/device_mesh/devices/benyao_arm/macro_device.xacro
new file mode 100644
index 0000000..137c916
--- /dev/null
+++ b/unilabos/device_mesh/devices/benyao_arm/macro_device.xacro
@@ -0,0 +1,293 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/unilabos/device_mesh/devices/benyao_arm/meshes/arm_base.STL b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_base.STL
new file mode 100644
index 0000000..804e1c6
Binary files /dev/null and b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_base.STL differ
diff --git a/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_1.STL b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_1.STL
new file mode 100644
index 0000000..dd1d7c1
Binary files /dev/null and b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_1.STL differ
diff --git a/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_2.STL b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_2.STL
new file mode 100644
index 0000000..6042fa0
Binary files /dev/null and b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_2.STL differ
diff --git a/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_3.STL b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_3.STL
new file mode 100644
index 0000000..e8510fb
Binary files /dev/null and b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_link_3.STL differ
diff --git a/unilabos/device_mesh/devices/benyao_arm/meshes/arm_slideway.STL b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_slideway.STL
new file mode 100644
index 0000000..65737ab
Binary files /dev/null and b/unilabos/device_mesh/devices/benyao_arm/meshes/arm_slideway.STL differ
diff --git a/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_base.STL b/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_base.STL
new file mode 100644
index 0000000..5de88d0
Binary files /dev/null and b/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_base.STL differ
diff --git a/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_left.STL b/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_left.STL
new file mode 100644
index 0000000..0a5fd52
Binary files /dev/null and b/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_left.STL differ
diff --git a/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_right.STL b/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_right.STL
new file mode 100644
index 0000000..0c5ac69
Binary files /dev/null and b/unilabos/device_mesh/devices/benyao_arm/meshes/gripper_right.STL differ
diff --git a/unilabos/registry/devices/opcua_example.yaml b/unilabos/registry/devices/opcua_example.yaml
new file mode 100644
index 0000000..fad9844
--- /dev/null
+++ b/unilabos/registry/devices/opcua_example.yaml
@@ -0,0 +1,12 @@
+opcua_example:
+ class:
+ action_value_mappings:
+
+ module: unilabos.device_comms.opcua_client.client:OpcUaClient
+ status_types:
+
+ type: python
+ description:
+ handles: []
+ icon: ''
+ init_param_schema: {}