diff --git a/.conda/recipe.yaml b/.conda/recipe.yaml index 2d70300..bbc3527 100644 --- a/.conda/recipe.yaml +++ b/.conda/recipe.yaml @@ -61,7 +61,7 @@ requirements: - uvicorn - gradio - flask - - websocket + - websockets - ipython - jupyter - jupyros diff --git a/README.md b/README.md index 142e7f9..93f2fcb 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ # Uni-Lab-OS + **English** | [中文](README_zh.md) [![GitHub Stars](https://img.shields.io/github/stars/dptech-corp/Uni-Lab-OS.svg)](https://github.com/dptech-corp/Uni-Lab-OS/stargazers) @@ -74,4 +75,4 @@ This project is licensed under GPL-3.0 - see the [LICENSE](LICENSE) file for det ## Contact Us -- GitHub Issues: [https://github.com/dptech-corp/Uni-Lab-OS/issues](https://github.com/dptech-corp/Uni-Lab-OS/issues) \ No newline at end of file +- GitHub Issues: [https://github.com/dptech-corp/Uni-Lab-OS/issues](https://github.com/dptech-corp/Uni-Lab-OS/issues) diff --git a/README_zh.md b/README_zh.md index 9ac8159..07b400d 100644 --- a/README_zh.md +++ b/README_zh.md @@ -5,6 +5,7 @@ # Uni-Lab-OS + [English](README.md) | **中文** [![GitHub Stars](https://img.shields.io/github/stars/dptech-corp/Uni-Lab-OS.svg)](https://github.com/dptech-corp/Uni-Lab-OS/stargazers) @@ -12,7 +13,7 @@ [![GitHub Issues](https://img.shields.io/github/issues/dptech-corp/Uni-Lab-OS.svg)](https://github.com/dptech-corp/Uni-Lab-OS/issues) [![GitHub License](https://img.shields.io/github/license/dptech-corp/Uni-Lab-OS.svg)](https://github.com/dptech-corp/Uni-Lab-OS/blob/main/LICENSE) -Uni-Lab-OS是一个用于实验室自动化的综合平台,旨在连接和控制各种实验设备,实现实验流程的自动化和标准化。 +Uni-Lab-OS 是一个用于实验室自动化的综合平台,旨在连接和控制各种实验设备,实现实验流程的自动化和标准化。 ## 🏆 比赛 @@ -34,7 +35,7 @@ Uni-Lab-OS是一个用于实验室自动化的综合平台,旨在连接和控 ## 快速开始 -1. 配置Conda环境 +1. 配置 Conda 环境 Uni-Lab-OS 建议使用 `mamba` 管理环境。根据您的操作系统选择适当的环境文件: @@ -43,7 +44,7 @@ Uni-Lab-OS 建议使用 `mamba` 管理环境。根据您的操作系统选择适 mamba create -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge ``` -2. 安装开发版Uni-Lab-OS: +2. 安装开发版 Uni-Lab-OS: ```bash # 克隆仓库 @@ -76,4 +77,4 @@ Uni-Lab-OS 使用预构建的 `unilabos_msgs` 进行系统通信。您可以在 ## 联系我们 -- GitHub Issues: [https://github.com/dptech-corp/Uni-Lab-OS/issues](https://github.com/dptech-corp/Uni-Lab-OS/issues) \ No newline at end of file +- GitHub Issues: [https://github.com/dptech-corp/Uni-Lab-OS/issues](https://github.com/dptech-corp/Uni-Lab-OS/issues) diff --git a/test/experiments/comprehensive_protocol/comprehensive_slim.json b/test/experiments/comprehensive_protocol/comprehensive_slim.json index 2eed369..f533d22 100644 --- a/test/experiments/comprehensive_protocol/comprehensive_slim.json +++ b/test/experiments/comprehensive_protocol/comprehensive_slim.json @@ -18,7 +18,6 @@ "config": { "protocol_type": [ "AddProtocol", - "TransferProtocol", "StartStirProtocol", "StopStirProtocol", "StirProtocol", diff --git a/unilabos-linux-64.yaml b/unilabos-linux-64.yaml index c84e045..2604b05 100644 --- a/unilabos-linux-64.yaml +++ b/unilabos-linux-64.yaml @@ -34,7 +34,7 @@ dependencies: - uvicorn - gradio - flask - - websocket + - websockets # Notebook - ipython - jupyter diff --git a/unilabos-osx-64.yaml b/unilabos-osx-64.yaml index ca9a96f..2d0c332 100644 --- a/unilabos-osx-64.yaml +++ b/unilabos-osx-64.yaml @@ -34,7 +34,7 @@ dependencies: - uvicorn - gradio - flask - - websocket + - websockets # Notebook - ipython - jupyter diff --git a/unilabos-osx-arm64.yaml b/unilabos-osx-arm64.yaml index 7f9675d..a4e8801 100644 --- a/unilabos-osx-arm64.yaml +++ b/unilabos-osx-arm64.yaml @@ -35,8 +35,7 @@ dependencies: - uvicorn - gradio - flask - - websocket - - paho-mqtt + - websockets # Notebook - ipython - jupyter diff --git a/unilabos-win64.yaml b/unilabos-win64.yaml index b2065a0..9eb55fd 100644 --- a/unilabos-win64.yaml +++ b/unilabos-win64.yaml @@ -34,7 +34,7 @@ dependencies: - uvicorn - gradio - flask - - websocket + - websockets # Notebook - ipython - jupyter diff --git a/unilabos/app/communication.py b/unilabos/app/communication.py new file mode 100644 index 0000000..60b9381 --- /dev/null +++ b/unilabos/app/communication.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python +# coding=utf-8 +""" +通信模块 + +提供MQTT和WebSocket的统一接口,支持通过配置选择通信协议。 +包含通信抽象层基类和通信客户端工厂。 +""" + +from abc import ABC, abstractmethod +from typing import Optional +from unilabos.config.config import BasicConfig +from unilabos.utils import logger + + +class BaseCommunicationClient(ABC): + """ + 通信客户端抽象基类 + + 定义了所有通信客户端(MQTT、WebSocket等)需要实现的接口。 + """ + + def __init__(self): + self.is_disabled = True + self.client_id = "" + + @abstractmethod + def start(self) -> None: + """ + 启动通信客户端连接 + """ + pass + + @abstractmethod + def stop(self) -> None: + """ + 停止通信客户端连接 + """ + pass + + @abstractmethod + def publish_device_status(self, device_status: dict, device_id: str, property_name: str) -> None: + """ + 发布设备状态信息 + + Args: + device_status: 设备状态字典 + device_id: 设备ID + property_name: 属性名称 + """ + pass + + @abstractmethod + def publish_job_status( + self, feedback_data: dict, job_id: str, status: str, return_info: Optional[str] = None + ) -> None: + """ + 发布作业状态信息 + + Args: + feedback_data: 反馈数据 + job_id: 作业ID + status: 作业状态 + return_info: 返回信息 + """ + pass + + @abstractmethod + def send_ping(self, ping_id: str, timestamp: float) -> None: + """ + 发送ping消息 + + Args: + ping_id: ping ID + timestamp: 时间戳 + """ + pass + + def setup_pong_subscription(self) -> None: + """ + 设置pong消息订阅(可选实现) + """ + pass + + @property + def is_connected(self) -> bool: + """ + 检查是否已连接 + + Returns: + 是否已连接 + """ + return not self.is_disabled + + +class CommunicationClientFactory: + """ + 通信客户端工厂类 + + 根据配置文件中的通信协议设置创建相应的客户端实例。 + """ + + _client_cache: Optional[BaseCommunicationClient] = None + + @classmethod + def create_client(cls, protocol: Optional[str] = None) -> BaseCommunicationClient: + """ + 创建通信客户端实例 + + Args: + protocol: 指定的协议类型,如果为None则使用配置文件中的设置 + + Returns: + 通信客户端实例 + + Raises: + ValueError: 当协议类型不支持时 + """ + if protocol is None: + protocol = BasicConfig.communication_protocol + + protocol = protocol.lower() + + if protocol == "mqtt": + return cls._create_mqtt_client() + elif protocol == "websocket": + return cls._create_websocket_client() + else: + logger.error(f"[CommunicationFactory] Unsupported protocol: {protocol}") + logger.warning(f"[CommunicationFactory] Falling back to MQTT") + return cls._create_mqtt_client() + + @classmethod + def get_client(cls, protocol: Optional[str] = None) -> BaseCommunicationClient: + """ + 获取通信客户端实例(单例模式) + + Args: + protocol: 指定的协议类型,如果为None则使用配置文件中的设置 + + Returns: + 通信客户端实例 + """ + if cls._client_cache is None: + cls._client_cache = cls.create_client(protocol) + logger.info(f"[CommunicationFactory] Created {type(cls._client_cache).__name__} client") + + return cls._client_cache + + @classmethod + def _create_mqtt_client(cls) -> BaseCommunicationClient: + """创建MQTT客户端""" + try: + from unilabos.app.mq import mqtt_client + return mqtt_client + except Exception as e: + logger.error(f"[CommunicationFactory] Failed to create MQTT client: {str(e)}") + raise + + @classmethod + def _create_websocket_client(cls) -> BaseCommunicationClient: + """创建WebSocket客户端""" + try: + from unilabos.app.ws_client import WebSocketClient + return WebSocketClient() + except Exception as e: + logger.error(f"[CommunicationFactory] Failed to create WebSocket client: {str(e)}") + logger.warning(f"[CommunicationFactory] Falling back to MQTT") + return cls._create_mqtt_client() + + @classmethod + def reset_client(cls): + """重置客户端缓存(用于测试或重新配置)""" + if cls._client_cache: + try: + cls._client_cache.stop() + except Exception as e: + logger.warning(f"[CommunicationFactory] Error stopping old client: {str(e)}") + + cls._client_cache = None + logger.info("[CommunicationFactory] Client cache reset") + + @classmethod + def get_supported_protocols(cls) -> list[str]: + """ + 获取支持的协议列表 + + Returns: + 支持的协议列表 + """ + return ["mqtt", "websocket"] + + +def get_communication_client(protocol: Optional[str] = None) -> BaseCommunicationClient: + """ + 获取通信客户端实例的便捷函数 + + Args: + protocol: 指定的协议类型,如果为None则使用配置文件中的设置 + + Returns: + 通信客户端实例 + """ + return CommunicationClientFactory.get_client(protocol) diff --git a/unilabos/app/main.py b/unilabos/app/main.py index e565f86..3d7761e 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -10,7 +10,6 @@ from copy import deepcopy import yaml -from unilabos.resources.graphio import modify_to_backend_format # 首先添加项目根目录到路径 current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -20,6 +19,7 @@ if unilabos_dir not in sys.path: from unilabos.config.config import load_config, BasicConfig from unilabos.utils.banner_print import print_status, print_unilab_banner +from unilabos.resources.graphio import modify_to_backend_format def load_config_from_file(config_path, override_labid=None): @@ -134,6 +134,23 @@ def parse_args(): default="", help="实验室唯一ID,也可通过环境变量 UNILABOS_MQCONFIG_LABID 设置或传入--config设置", ) + parser.add_argument( + "--ak", + type=str, + default="", + help="实验室请求的ak", + ) + parser.add_argument( + "--sk", + type=str, + default="", + help="实验室请求的sk", + ) + parser.add_argument( + "--websocket", + action="store_true", + help="使用websocket而非mqtt作为通信协议", + ) parser.add_argument( "--skip_env_check", action="store_true", @@ -167,7 +184,7 @@ def main(): else: working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data")) if args_dict.get("working_dir"): - working_dir = args_dict.get("working_dir") + working_dir = args_dict.get("working_dir", "") if config_path and not os.path.exists(config_path): config_path = os.path.join(working_dir, "local_config.py") if not os.path.exists(config_path): @@ -203,6 +220,7 @@ def main(): if args_dict["use_remote_resource"]: print_status("使用远程资源启动", "info") from unilabos.app.web import http_client + res = http_client.resource_get("host_node", False) if str(res.get("code", 0)) == "0" and len(res.get("data", [])) > 0: print_status("远程资源已存在,使用云端物料!", "info") @@ -211,10 +229,13 @@ def main(): print_status("远程资源不存在,本地将进行首次上报!", "info") # 设置BasicConfig参数 + BasicConfig.ak = args_dict.get("ak", "") + BasicConfig.sk = args_dict.get("sk", "") BasicConfig.working_dir = working_dir BasicConfig.is_host_mode = not args_dict.get("without_host", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) BasicConfig.upload_registry = args_dict.get("upload_registry", False) + BasicConfig.communication_protocol = "websocket" if args_dict.get("websocket", False) else "mqtt" machine_name = os.popen("hostname").read().strip() machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) BasicConfig.machine_name = machine_name @@ -227,7 +248,7 @@ def main(): dict_to_nested_dict, initialize_resources, ) - from unilabos.app.mq import mqtt_client + from unilabos.app.communication import get_communication_client from unilabos.registry.registry import build_registry from unilabos.app.backend import start_backend from unilabos.app.web import http_client @@ -275,19 +296,22 @@ def main(): args_dict["bridges"] = [] + # 获取通信客户端(根据配置选择MQTT或WebSocket) + comm_client = get_communication_client() + if "mqtt" in args_dict["app_bridges"]: - args_dict["bridges"].append(mqtt_client) + args_dict["bridges"].append(comm_client) if "fastapi" in args_dict["app_bridges"]: args_dict["bridges"].append(http_client) if "mqtt" in args_dict["app_bridges"]: def _exit(signum, frame): - mqtt_client.stop() + comm_client.stop() sys.exit(0) signal.signal(signal.SIGINT, _exit) signal.signal(signal.SIGTERM, _exit) - mqtt_client.start() + comm_client.start() args_dict["resources_mesh_config"] = {} args_dict["resources_edge_config"] = resource_edge_info # web visiualize 2D diff --git a/unilabos/app/model.py b/unilabos/app/model.py index a5b8c78..a7c199c 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -50,11 +50,16 @@ class Resp(BaseModel): class JobAddReq(BaseModel): device_id: str = Field(examples=["Gripper"], description="device id") - data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}]) + action: str = Field(examples=["_execute_driver_command_async"], description="action name", default="") + action_type: str = Field(examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action name", default="") + action_args: dict = Field(examples=[{'string': 'string'}], description="action name", default="") + task_id: str = Field(examples=["task_id"], description="task uuid") job_id: str = Field(examples=["job_id"], description="goal uuid") node_id: str = Field(examples=["node_id"], description="node uuid") server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info") + data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}], default={}) + class JobStepFinishReq(BaseModel): token: str = Field(examples=["030944"], description="token") diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index 1d3f969..65d0ab4 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -15,17 +15,20 @@ import os from unilabos.config.config import MQConfig from unilabos.app.controler import job_add from unilabos.app.model import JobAddReq +from unilabos.app.communication import BaseCommunicationClient from unilabos.utils import logger from unilabos.utils.type_check import TypeEncoder from paho.mqtt.enums import CallbackAPIVersion -class MQTTClient: +class MQTTClient(BaseCommunicationClient): mqtt_disable = True def __init__(self): + super().__init__() self.mqtt_disable = not MQConfig.lab_id + self.is_disabled = self.mqtt_disable # 更新父类属性 self.client_id = f"{MQConfig.group_id}@@@{MQConfig.lab_id}{uuid.uuid4()}" logger.info("[MQTT] Client_id: " + self.client_id) self.client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id=self.client_id, protocol=mqtt.MQTTv5) @@ -208,11 +211,12 @@ class MQTTClient: self.client.subscribe(pong_topic, 0) logger.debug(f"Subscribed to pong topic: {pong_topic}") - def handle_pong(self, pong_data: dict): - """处理pong响应(这个方法会在收到pong消息时被调用)""" - logger.debug(f"Pong received: {pong_data}") - # 这里会被HostNode的ping-pong处理逻辑调用 - pass + @property + def is_connected(self) -> bool: + """检查MQTT是否已连接""" + if self.is_disabled: + return False + return hasattr(self.client, "is_connected") and self.client.is_connected() mqtt_client = MQTTClient() diff --git a/unilabos/app/register.py b/unilabos/app/register.py index 2d61f82..a96ff16 100644 --- a/unilabos/app/register.py +++ b/unilabos/app/register.py @@ -1,44 +1,70 @@ import argparse +import json import time +from unilabos.config.config import BasicConfig from unilabos.registry.registry import build_registry from unilabos.app.main import load_config_from_file from unilabos.utils.log import logger +from unilabos.utils.type_check import TypeEncoder -def register_devices_and_resources(mqtt_client, lab_registry): +def register_devices_and_resources(comm_client, lab_registry): """ - 注册设备和资源到 MQTT + 注册设备和资源到通信服务器(MQTT/WebSocket) """ - logger.info("[UniLab Register] 开始注册设备和资源...") - - # 注册设备信息 - for device_info in lab_registry.obtain_registry_device_info(): - mqtt_client.publish_registry(device_info["id"], device_info, False) - logger.debug(f"[UniLab Register] 注册设备: {device_info['id']}") - - # # 注册资源信息 - # for resource_info in lab_registry.obtain_registry_resource_info(): - # mqtt_client.publish_registry(resource_info["id"], resource_info, False) - # logger.debug(f"[UniLab Register] 注册资源: {resource_info['id']}") # 注册资源信息 - 使用HTTP方式 from unilabos.app.web.client import http_client - resources_to_register = {} - for resource_info in lab_registry.obtain_registry_resource_info(): - resources_to_register[resource_info["id"]] = resource_info - logger.debug(f"[UniLab Register] 准备注册资源: {resource_info['id']}") + logger.info("[UniLab Register] 开始注册设备和资源...") + if BasicConfig.auth_secret(): + # 注册设备信息 + devices_to_register = {} + for device_info in lab_registry.obtain_registry_device_info(): + devices_to_register[device_info["id"]] = json.loads( + json.dumps(device_info, ensure_ascii=False, cls=TypeEncoder) + ) + logger.debug(f"[UniLab Register] 收集设备: {device_info['id']}") + resources_to_register = {} + for resource_info in lab_registry.obtain_registry_resource_info(): + resources_to_register[resource_info["id"]] = resource_info + logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}") + print( + "[UniLab Register] 设备注册", + http_client.resource_registry({"resources": list(devices_to_register.values())}).text, + ) + print( + "[UniLab Register] 资源注册", + http_client.resource_registry({"resources": list(resources_to_register.values())}).text, + ) + else: + # 注册设备信息 + for device_info in lab_registry.obtain_registry_device_info(): + comm_client.publish_registry(device_info["id"], device_info, False) + logger.debug(f"[UniLab Register] 注册设备: {device_info['id']}") - if resources_to_register: - start_time = time.time() - response = http_client.resource_registry(resources_to_register) - cost_time = time.time() - start_time - if response.status_code in [200, 201]: - logger.info(f"[UniLab Register] 成功通过HTTP注册 {len(resources_to_register)} 个资源 {cost_time}ms") - else: - logger.error(f"[UniLab Register] HTTP注册资源失败: {response.status_code}, {response.text} {cost_time}ms") + # # 注册资源信息 + # for resource_info in lab_registry.obtain_registry_resource_info(): + # comm_client.publish_registry(resource_info["id"], resource_info, False) + # logger.debug(f"[UniLab Register] 注册资源: {resource_info['id']}") + + resources_to_register = {} + for resource_info in lab_registry.obtain_registry_resource_info(): + resources_to_register[resource_info["id"]] = resource_info + logger.debug(f"[UniLab Register] 准备注册资源: {resource_info['id']}") + + if resources_to_register: + start_time = time.time() + response = http_client.resource_registry(resources_to_register) + cost_time = time.time() - start_time + if response.status_code in [200, 201]: + logger.info(f"[UniLab Register] 成功通过HTTP注册 {len(resources_to_register)} 个资源 {cost_time}ms") + else: + logger.error( + f"[UniLab Register] HTTP注册资源失败: {response.status_code}, {response.text} {cost_time}ms" + ) logger.info("[UniLab Register] 设备和资源注册完成.") @@ -60,6 +86,18 @@ def main(): default=None, help="配置文件路径,支持.py格式的Python配置文件", ) + parser.add_argument( + "--ak", + type=str, + default="", + help="实验室请求的ak", + ) + parser.add_argument( + "--sk", + type=str, + default="", + help="实验室请求的sk", + ) parser.add_argument( "--complete_registry", action="store_true", @@ -68,17 +106,20 @@ def main(): ) args = parser.parse_args() load_config_from_file(args.config) + BasicConfig.ak = args.ak + BasicConfig.sk = args.sk # 构建注册表 build_registry(args.registry, args.complete_registry, True) - from unilabos.app.mq import mqtt_client + from unilabos.app.communication import get_communication_client - # 连接mqtt - mqtt_client.start() + # 获取通信客户端并启动连接 + comm_client = get_communication_client() + comm_client.start() from unilabos.registry.registry import lab_registry # 注册设备和资源 - register_devices_and_resources(mqtt_client, lab_registry) + register_devices_and_resources(comm_client, lab_registry) if __name__ == "__main__": diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 7c35c15..7a50bf4 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -15,6 +15,7 @@ from unilabos.utils import logger class HTTPClient: """HTTP客户端,用于与远程服务器通信""" + backend_go = False # 是否使用Go后端 def __init__(self, remote_addr: Optional[str] = None, auth: Optional[str] = None) -> None: """ @@ -28,7 +29,13 @@ class HTTPClient: if auth is not None: self.auth = auth else: - self.auth = MQConfig.lab_id + auth_secret = BasicConfig.auth_secret() + if auth_secret: + self.auth = auth_secret + self.backend_go = True + info(f"正在使用ak sk作为授权信息 {auth_secret}") + else: + self.auth = MQConfig.lab_id info(f"HTTPClient 初始化完成: remote_addr={self.remote_addr}") def resource_edge_add(self, resources: List[Dict[str, Any]], database_process_later: bool) -> requests.Response: @@ -43,13 +50,18 @@ class HTTPClient: """ database_param = 1 if database_process_later else 0 response = requests.post( - f"{self.remote_addr}/lab/resource/edge/batch_create/?database_process_later={database_param}", - json=resources, - headers={"Authorization": f"lab {self.auth}"}, + f"{self.remote_addr}/lab/resource/edge/batch_create/?database_process_later={database_param}" + if not self.backend_go else f"{self.remote_addr}/lab/material/edge", + json={ + "edges": resources, + } if self.backend_go else resources, + headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=100, ) if response.status_code != 200 and response.status_code != 201: logger.error(f"添加物料关系失败: {response.status_code}, {response.text}") + elif self.backend_go: + logger.info(f"添加物料关系 {response.text}") return response def resource_add(self, resources: List[Dict[str, Any]], database_process_later: bool) -> requests.Response: @@ -63,13 +75,15 @@ class HTTPClient: Response: API响应对象 """ response = requests.post( - f"{self.remote_addr}/lab/resource/?database_process_later={1 if database_process_later else 0}", - json=resources, - headers={"Authorization": f"lab {self.auth}"}, + f"{self.remote_addr}/lab/resource/?database_process_later={1 if database_process_later else 0}" if not self.backend_go else f"{self.remote_addr}/lab/material", + json=resources if not self.backend_go else {"nodes": resources}, + headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=100, ) if response.status_code != 200: logger.error(f"添加物料失败: {response.text}") + elif self.backend_go: + logger.info(f"添加物料 {response.text}") return response def resource_get(self, id: str, with_children: bool = False) -> Dict[str, Any]: @@ -84,9 +98,9 @@ class HTTPClient: Dict: 返回的资源数据 """ response = requests.get( - f"{self.remote_addr}/lab/resource/?edge_format=1", + f"{self.remote_addr}/lab/resource/?edge_format=1" if not self.backend_go else f"{self.remote_addr}/lab/material", params={"id": id, "with_children": with_children}, - headers={"Authorization": f"lab {self.auth}"}, + headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=20, ) return response.json() @@ -104,7 +118,7 @@ class HTTPClient: response = requests.delete( f"{self.remote_addr}/lab/resource/batch_delete/", params={"id": id}, - headers={"Authorization": f"lab {self.auth}"}, + headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=20, ) return response @@ -122,7 +136,7 @@ class HTTPClient: response = requests.patch( f"{self.remote_addr}/lab/resource/batch_update/?edge_format=1", json=resources, - headers={"Authorization": f"lab {self.auth}"}, + headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=100, ) return response @@ -146,25 +160,25 @@ class HTTPClient: response = requests.post( f"{self.remote_addr}/api/account/file_upload/{scene}", files=files, - headers={"Authorization": f"lab {self.auth}"}, + headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=30, # 上传文件可能需要更长的超时时间 ) return response - def resource_registry(self, registry_data: Dict[str, Any]) -> requests.Response: + def resource_registry(self, registry_data: Dict[str, Any] | List[Dict[str, Any]]) -> requests.Response: """ 注册资源到服务器 Args: - registry_data: 注册表数据,格式为 {resource_id: resource_info} + registry_data: 注册表数据,格式为 {resource_id: resource_info} / [{resource_info}] Returns: Response: API响应对象 """ response = requests.post( - f"{self.remote_addr}/lab/registry/", + f"{self.remote_addr}/lab/registry/" if not self.backend_go else f"{self.remote_addr}/lab/resource", json=registry_data, - headers={"Authorization": f"lab {self.auth}"}, + headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=30, ) if response.status_code not in [200, 201]: @@ -183,7 +197,7 @@ class HTTPClient: """ response = requests.get( f"{self.remote_addr}/lab/resource/graph_info/", - headers={"Authorization": f"lab {self.auth}"}, + headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=(3, 30), ) if response.status_code != 200: diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py new file mode 100644 index 0000000..69c70b1 --- /dev/null +++ b/unilabos/app/ws_client.py @@ -0,0 +1,898 @@ +#!/usr/bin/env python +# coding=utf-8 +""" +WebSocket通信客户端和任务调度器 + +基于WebSocket协议的通信客户端实现,继承自BaseCommunicationClient。 +包含WebSocketClient(连接管理)和TaskScheduler(任务调度)两个类。 +""" + +import json +import logging +import time +import uuid +import threading +import asyncio +import traceback +import websockets +import ssl as ssl_module +from dataclasses import dataclass +from typing import Optional, Dict, Any +from urllib.parse import urlparse +from unilabos.app.model import JobAddReq +from unilabos.ros.nodes.presets.host_node import HostNode +from unilabos.utils.type_check import serialize_result_info +from unilabos.app.communication import BaseCommunicationClient +from unilabos.config.config import WSConfig, HTTPConfig, BasicConfig +from unilabos.utils import logger + + +@dataclass +class QueueItem: + """队列项数据结构""" + + task_type: str # "query_action_status" 或 "job_call_back_status" + device_id: str + action_name: str + task_id: str + job_id: str + device_action_key: str + next_run_time: float # 下次执行时间戳 + retry_count: int = 0 # 重试次数 + + +class TaskScheduler: + """ + 任务调度器类 + + 负责任务队列管理、状态跟踪、业务逻辑处理等功能。 + """ + + def __init__(self, message_sender: "WebSocketClient"): + """初始化任务调度器""" + self.message_sender = message_sender + + # 队列管理 + self.action_queue = [] # 任务队列 + self.action_queue_lock = threading.Lock() # 队列锁 + + # 任务状态跟踪 + self.active_jobs = {} # job_id -> 任务信息 + self.cancel_events = {} # job_id -> asyncio.Event for cancellation + + # 立即执行标记字典 - device_id+action_name -> timestamp + self.immediate_execution_flags = {} # 存储需要立即执行的设备动作组合 + self.immediate_execution_lock = threading.Lock() # 立即执行标记锁 + + # 队列处理器 + self.queue_processor_thread = None + self.queue_running = False + + # 队列处理器相关方法 + def start(self) -> None: + """启动任务调度器""" + if self.queue_running: + logger.warning("[TaskScheduler] Already running") + return + + self.queue_running = True + self.queue_processor_thread = threading.Thread( + target=self._run_queue_processor, daemon=True, name="TaskScheduler" + ) + self.queue_processor_thread.start() + + def stop(self) -> None: + """停止任务调度器""" + self.queue_running = False + if self.queue_processor_thread and self.queue_processor_thread.is_alive(): + self.queue_processor_thread.join(timeout=5) + logger.info("[TaskScheduler] Stopped") + + def _run_queue_processor(self): + """在独立线程中运行队列处理器""" + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + loop.run_until_complete(self._action_queue_processor()) + except Exception as e: + logger.error(f"[TaskScheduler] Queue processor thread error: {str(e)}") + finally: + if loop: + loop.close() + + async def _action_queue_processor(self) -> None: + """队列处理器 - 从队列头部取出任务处理,保持顺序,使用list避免队尾排队问题""" + logger.info("[TaskScheduler] Action queue processor started") + + try: + while self.queue_running: + try: + current_time = time.time() + items_to_process = [] + items_to_requeue = [] + + # 使用锁安全地复制队列内容 + with self.action_queue_lock: + if not self.action_queue: + # 队列为空,等待一段时间 + pass + else: + # 复制队列内容以避免并发修改问题 + items_to_process = self.action_queue.copy() + self.action_queue.clear() + + if not items_to_process: + await asyncio.sleep(0.2) # 队列为空时等待 + continue + + with self.immediate_execution_lock: + expired_keys = [k for k, v in self.immediate_execution_flags.items() if current_time > v] + for k in expired_keys: + del self.immediate_execution_flags[k] + immediate_execution = self.immediate_execution_flags.copy() + # 处理每个任务 + for item in items_to_process: + try: + # 检查是否到了执行时间,是我们本地的执行时间,按顺序填入 + if current_time < item.next_run_time and item.device_action_key not in immediate_execution: + # 还没到执行时间,保留在队列中(保持原有顺序) + items_to_requeue.append(item) + continue + + # 执行相应的任务 + should_continue = False + if item.task_type == "query_action_status": + should_continue = await self._process_query_status_item(item) + elif item.task_type == "job_call_back_status": + should_continue = await self._process_job_callback_item(item) + else: + logger.warning(f"[TaskScheduler] Unknown task type: {item.task_type}") + continue + + # 如果需要继续,放入重新排队列表 + if should_continue: + item.next_run_time = current_time + 10 # 10秒后再次执行 + item.retry_count += 1 + items_to_requeue.append(item) + logger.trace( # type: ignore + f"[TaskScheduler] Re-queued {item.job_id} {item.task_type} " + f"for {item.device_action_key}" + ) + else: + logger.debug( + f"[TaskScheduler] Completed {item.job_id} {item.task_type} " + f"for {item.device_action_key}" + ) + + except Exception as e: + logger.error(f"[TaskScheduler] Error processing item {item.task_type}: {str(e)}") + + # 将需要重新排队的任务放回队列开头(保持原有顺序,确保优先于新任务执行) + if items_to_requeue and self.action_queue is not None: + with self.action_queue_lock: + self.action_queue = items_to_requeue + self.action_queue + + await asyncio.sleep(0.1) # 短暂等待避免过度占用CPU + + except Exception as e: + logger.error(f"[TaskScheduler] Error in queue processor: {str(e)}") + await asyncio.sleep(1) # 错误后稍等再继续 + + except asyncio.CancelledError: + logger.info("[TaskScheduler] Action queue processor cancelled") + except Exception as e: + logger.error(f"[TaskScheduler] Fatal error in queue processor: {str(e)}") + finally: + logger.info("[TaskScheduler] Action queue processor stopped") + + # 队列处理方法 + async def _process_query_status_item(self, item: QueueItem) -> bool: + """处理query_action_status类型的队列项,返回True表示需要继续,False表示可以停止""" + try: + # 检查设备状态 + host_node = HostNode.get_instance(0) + if not host_node: + logger.error("[TaskScheduler] HostNode instance not available in queue processor") + return False + + action_jobs = len(host_node._device_action_status[item.device_action_key].job_ids) + free = not bool(action_jobs) + + # 发送状态报告 + if free: + # 设备空闲,发送最终状态并停止 + # 下面要增加和handle_query_state相同的逻辑 + host_node._device_action_status[item.device_action_key].job_ids[item.job_id] = time.time() + await self._publish_device_action_state( + item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", True, 0 + ) + return False # 停止继续监控 + else: + # 设备忙碌,发送状态并继续监控 + await self._publish_device_action_state( + item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", False, 10 + ) + return True # 继续监控 + + except Exception as e: + logger.error(f"[TaskScheduler] Error processing query status item: {str(e)}") + return False # 出错则停止 + + async def _process_job_callback_item(self, item: QueueItem) -> bool: + """处理job_call_back_status类型的队列项,返回True表示需要继续,False表示可以停止""" + try: + # 检查任务是否还在活跃列表中 + if item.job_id not in self.active_jobs: + logger.debug(f"[TaskScheduler] Job {item.job_id} no longer active") + return False + + # 检查是否收到取消信号 + if item.job_id in self.cancel_events and self.cancel_events[item.job_id].is_set(): + logger.info(f"[TaskScheduler] Job {item.job_id} cancelled via cancel event") + return False + + # 检查设备状态 + host_node = HostNode.get_instance(0) + if not host_node: + logger.error( + f"[TaskScheduler] HostNode instance not available in job callback queue for job_id: {item.job_id}" + ) + return False + + action_jobs = len(host_node._device_action_status[item.device_action_key].job_ids) + free = not bool(action_jobs) + + # 发送job_call_back_status状态 + await self._publish_device_action_state( + item.device_id, item.action_name, item.task_id, item.job_id, "job_call_back_status", free, 10 + ) + + # 如果任务完成,停止监控 + if free: + return False + else: + return True # 继续监控 + + except Exception as e: + logger.error(f"[TaskScheduler] Error processing job callback item for job_id {item.job_id}: {str(e)}") + return False # 出错则停止 + + # 消息发送方法 + async def _publish_device_action_state( + self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int + ) -> None: + """发布设备动作状态""" + message = { + "action": "report_action_state", + "data": { + "type": typ, + "device_id": device_id, + "action_name": action_name, + "task_id": task_id, + "job_id": job_id, + "free": free, + "need_more": need_more, + }, + } + await self.message_sender.send_message(message) + + # 业务逻辑处理方法 + async def handle_query_state(self, data: Dict[str, str]) -> None: + """处理query_action_state消息""" + device_id = data.get("device_id", "") + if not device_id: + logger.error("[TaskScheduler] query_action_state missing device_id") + return + action_name = data.get("action_name", "") + if not action_name: + logger.error("[TaskScheduler] query_action_state missing action_name") + return + task_id = data.get("task_id", "") + if not task_id: + logger.error("[TaskScheduler] query_action_state missing task_id") + return + job_id = data.get("job_id", "") + if not job_id: + logger.error("[TaskScheduler] query_action_state missing job_id") + return + + device_action_key = f"/devices/{device_id}/{action_name}" + host_node = HostNode.get_instance(0) + if not host_node: + logger.error("[TaskScheduler] HostNode instance not available") + return + + action_jobs = len(host_node._device_action_status[device_action_key].job_ids) + free = not bool(action_jobs) + + # 如果设备空闲,立即响应free状态 + if free: + await self._publish_device_action_state( + device_id, action_name, task_id, job_id, "query_action_status", True, 0 + ) + logger.debug(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is free, responded immediately") + host_node = HostNode.get_instance(0) + if not host_node: + logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {job_id}") + return + host_node._device_action_status[device_action_key].job_ids[job_id] = time.time() + return + + # 设备忙碌时,检查是否已有相同的轮询任务 + if self.action_queue is not None: + with self.action_queue_lock: + # 检查是否已存在相同job_id和task_id的轮询任务 + for existing_item in self.action_queue: + if ( + existing_item.task_type == "query_action_status" + and existing_item.job_id == job_id + and existing_item.task_id == task_id + and existing_item.device_action_key == device_action_key + ): + logger.error( + f"[TaskScheduler] Duplicate query_action_state ignored: " + f"job_id={job_id}, task_id={task_id}, server error" + ) + return + + # 没有重复,加入轮询队列 + queue_item = QueueItem( + task_type="query_action_status", + device_id=device_id, + action_name=action_name, + task_id=task_id, + job_id=job_id, + device_action_key=device_action_key, + next_run_time=time.time() + 10, # 10秒后执行 + ) + self.action_queue.append(queue_item) + logger.debug( + f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is busy, " + f"added to polling queue {action_jobs}" + ) + + # 立即发送busy状态 + await self._publish_device_action_state( + device_id, action_name, task_id, job_id, "query_action_status", False, 10 + ) + else: + logger.warning("[TaskScheduler] Action queue not available") + + async def handle_job_start(self, data: Dict[str, Any]): + """处理作业启动消息""" + try: + req = JobAddReq(**data) + device_action_key = f"/devices/{req.device_id}/{req.action}" + + logger.info( + f"[TaskScheduler] Starting job with job_id: {req.job_id}, " + f"device: {req.device_id}, action: {req.action}" + ) + + # 添加到活跃任务 + self.active_jobs[req.job_id] = { + "device_id": req.device_id, + "action_name": req.action, + "task_id": data.get("task_id", ""), + "start_time": time.time(), + "device_action_key": device_action_key, + "callback_started": False, # 标记callback是否已启动 + } + + # 创建取消事件,todo:要移动到query_state中 + self.cancel_events[req.job_id] = asyncio.Event() + + try: + # 启动callback定时发送 + await self._start_job_callback(req.job_id, req.device_id, req.action, req.task_id, device_action_key) + + # 创建兼容HostNode的QueueItem对象 + job_queue_item = QueueItem( + task_type="job_call_back_status", + device_id=req.device_id, + action_name=req.action, + task_id=req.task_id, + job_id=req.job_id, + device_action_key=device_action_key, + next_run_time=time.time(), + ) + host_node = HostNode.get_instance(0) + if not host_node: + logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {req.job_id}") + return + host_node.send_goal( + job_queue_item, + action_type=req.action_type, + action_kwargs=req.action_args, + server_info=req.server_info, + ) + except Exception as e: + logger.error(f"[TaskScheduler] Exception during job start for job_id {req.job_id}: {str(e)}") + traceback.print_exc() + # 异常结束,先停止callback,然后发送失败状态 + await self._stop_job_callback( + req.job_id, "failed", serialize_result_info(traceback.format_exc(), False, {}) + ) + + host_node = HostNode.get_instance(0) + if host_node: + host_node._device_action_status[device_action_key].job_ids.pop(req.job_id, None) + logger.warning(f"[TaskScheduler] Cleaned up failed job from HostNode: {req.job_id}") + except Exception as e: + logger.error(f"[TaskScheduler] Error handling job start: {str(e)}") + + async def handle_cancel_action(self, data: Dict[str, Any]) -> None: + """处理取消动作请求""" + task_id = data.get("task_id") + job_id = data.get("job_id") + + logger.debug(f"[TaskScheduler] Handling cancel action request - task_id: {task_id}, job_id: {job_id}") + + if not task_id and not job_id: + logger.error("[TaskScheduler] cancel_action missing both task_id and job_id") + return + + # 通过job_id取消 + if job_id: + logger.info(f"[TaskScheduler] Cancelling job by job_id: {job_id}") + # 设置取消事件 + if job_id in self.cancel_events: + self.cancel_events[job_id].set() + logger.debug(f"[TaskScheduler] Set cancel event for job_id: {job_id}") + else: + logger.warning(f"[TaskScheduler] Cancel event not found for job_id: {job_id}") + + # 停止job callback并发送取消状态 + if job_id in self.active_jobs: + logger.debug(f"[TaskScheduler] Found active job for cancellation: {job_id}") + # 调用HostNode的cancel_goal + host_node = HostNode.get_instance(0) + if host_node: + host_node.cancel_goal(job_id) + logger.info(f"[TaskScheduler] Cancelled goal in HostNode for job_id: {job_id}") + else: + logger.error(f"[TaskScheduler] HostNode not available for cancel goal: {job_id}") + + # 停止callback并发送取消状态 + await self._stop_job_callback(job_id, "cancelled", "Job was cancelled by user request") + logger.info(f"[TaskScheduler] Stopped job callback and sent cancel status for job_id: {job_id}") + else: + logger.warning(f"[TaskScheduler] Job not found in active jobs for cancellation: {job_id}") + + # 通过task_id取消(需要查找对应的job_id) + if task_id and not job_id: + logger.debug(f"[TaskScheduler] Cancelling jobs by task_id: {task_id}") + jobs_to_cancel = [] + for jid, job_info in self.active_jobs.items(): + if job_info.get("task_id") == task_id: + jobs_to_cancel.append(jid) + + logger.debug( + f"[TaskScheduler] Found {len(jobs_to_cancel)} jobs to cancel for task_id {task_id}: {jobs_to_cancel}" + ) + + for jid in jobs_to_cancel: + logger.debug(f"[TaskScheduler] Recursively cancelling job_id: {jid} for task_id: {task_id}") + # 递归调用自身来取消每个job + await self.handle_cancel_action({"job_id": jid}) + + logger.debug(f"[TaskScheduler] Completed cancel action handling - task_id: {task_id}, job_id: {job_id}") + + # job管理方法 + async def _start_job_callback( + self, job_id: str, device_id: str, action_name: str, task_id: str, device_action_key: str + ) -> None: + """启动job的callback定时发送""" + if job_id not in self.active_jobs: + logger.debug(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}") + return + + # 检查是否已经启动过callback + if self.active_jobs[job_id].get("callback_started", False): + logger.warning(f"[TaskScheduler] Job callback already started for job_id: {job_id}") + return + + # 标记callback已启动 + self.active_jobs[job_id]["callback_started"] = True + + # 将job_call_back_status任务放入队列 + queue_item = QueueItem( + task_type="job_call_back_status", + device_id=device_id, + action_name=action_name, + task_id=task_id, + job_id=job_id, + device_action_key=device_action_key, + next_run_time=time.time() + 10, # 10秒后开始报送 + ) + if self.action_queue is not None: + with self.action_queue_lock: + self.action_queue.append(queue_item) + else: + logger.debug(f"[TaskScheduler] Action queue not available for job callback: {job_id}") + + async def _stop_job_callback(self, job_id: str, final_status: str, return_info: Optional[str] = None) -> None: + """停止job的callback定时发送并发送最终结果""" + logger.info(f"[TaskScheduler] Stopping job callback for job_id: {job_id} with final status: {final_status}") + if job_id not in self.active_jobs: + logger.debug(f"[TaskScheduler] Job {job_id} not found in active jobs when stopping callback") + return + + job_info = self.active_jobs[job_id] + device_id = job_info["device_id"] + action_name = job_info["action_name"] + task_id = job_info["task_id"] + device_action_key = job_info["device_action_key"] + + logger.debug( + f"[TaskScheduler] Job {job_id} details - device: {device_id}, action: {action_name}, task: {task_id}" + ) + + # 移除活跃任务和取消事件(这会让队列处理器自动停止callback) + self.active_jobs.pop(job_id, None) + self.cancel_events.pop(job_id, None) + logger.debug(f"[TaskScheduler] Removed job {job_id} from active jobs and cancel events") + + # 发送最终的callback状态 + await self._publish_device_action_state( + device_id, action_name, task_id, job_id, "job_call_back_status", True, 0 + ) + logger.debug(f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}") + + # 外部接口方法 + def publish_job_status( + self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None + ) -> None: + """发布作业状态,拦截最终结果(给HostNode调用的接口)""" + if not self.message_sender.is_connected(): + logger.debug(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}") + return + + # 拦截最终结果状态 + if status in ["success", "failed"]: + host_node = HostNode.get_instance(0) + if host_node: + host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id) + logger.info(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}") + # 给其他同名action至少执行一次的机会 + with self.immediate_execution_lock: + self.immediate_execution_flags[item.device_action_key] = time.time() + 3 + # 如果是最终状态,通过_stop_job_callback处理 + if self.message_sender.event_loop: + asyncio.run_coroutine_threadsafe( + self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop + ).result() + # 执行结果信息上传 + message = { + "action": "job_status", + "data": { + "job_id": item.job_id, + "task_id": item.task_id, + "device_id": item.device_id, + "action_name": item.action_name, + "status": status, + "feedback_data": feedback_data, + "return_info": return_info, + "timestamp": time.time(), + }, + } + try: + loop = asyncio.get_event_loop() + loop.create_task(self.message_sender.send_message(message)) + except RuntimeError: + asyncio.run(self.message_sender.send_message(message)) + + logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore + + def cancel_goal(self, job_id: str) -> None: + """取消指定的任务(给外部调用的接口)""" + logger.debug(f"[TaskScheduler] External cancel request for job_id: {job_id}") + if job_id in self.cancel_events: + logger.debug(f"[TaskScheduler] Found cancel event for job_id: {job_id}, processing cancellation") + try: + loop = asyncio.get_event_loop() + loop.create_task(self.handle_cancel_action({"job_id": job_id})) + logger.debug(f"[TaskScheduler] Scheduled cancel action for job_id: {job_id}") + except RuntimeError: + asyncio.run(self.handle_cancel_action({"job_id": job_id})) + logger.debug(f"[TaskScheduler] Executed cancel action for job_id: {job_id}") + logger.debug(f"[TaskScheduler] Initiated cancel for job_id: {job_id}") + else: + logger.debug(f"[TaskScheduler] Job {job_id} not found in cancel events for cancellation") + + +class WebSocketClient(BaseCommunicationClient): + """ + WebSocket通信客户端类 + + 专注于WebSocket连接管理和消息传输。 + """ + + def __init__(self): + super().__init__() + self.is_disabled = False + self.client_id = f"{uuid.uuid4()}" + + # WebSocket连接相关 + self.websocket = None + self.connection_loop = None + self.event_loop = None + self.connection_thread = None + self.is_running = False + self.connected = False + + # 消息处理 + self.message_queue = asyncio.Queue() if not self.is_disabled else None + self.reconnect_count = 0 + + # 任务调度器 + self.task_scheduler = None + + # 构建WebSocket URL + self._build_websocket_url() + + logger.info(f"[WebSocket] Client_id: {self.client_id}") + + # 初始化方法 + def _initialize_task_scheduler(self): + """初始化任务调度器""" + if not self.task_scheduler: + self.task_scheduler = TaskScheduler(self) + self.task_scheduler.start() + logger.info("[WebSocket] Task scheduler initialized") + + def _build_websocket_url(self): + """构建WebSocket连接URL""" + if not HTTPConfig.remote_addr: + self.websocket_url = None + return + + # 解析服务器URL + parsed = urlparse(HTTPConfig.remote_addr) + + # 根据SSL配置选择协议 + if parsed.scheme == "https": + scheme = "wss" + else: + scheme = "ws" + if ":" in parsed.netloc and parsed.port is not None: + self.websocket_url = f"{scheme}://{parsed.hostname}:{parsed.port + 1}/api/v1/ws/schedule" + else: + self.websocket_url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule" + logger.debug(f"[WebSocket] URL: {self.websocket_url}") + + # 连接管理方法 + def start(self) -> None: + """启动WebSocket连接和任务调度器""" + if self.is_disabled: + logger.warning("[WebSocket] WebSocket is disabled, skipping connection.") + return + + if not self.websocket_url: + logger.error("[WebSocket] WebSocket URL not configured") + return + + logger.info(f"[WebSocket] Starting connection to {self.websocket_url}") + + # 初始化任务调度器 + self._initialize_task_scheduler() + + self.is_running = True + + # 在单独线程中运行WebSocket连接 + self.connection_thread = threading.Thread(target=self._run_connection, daemon=True, name="WebSocketConnection") + self.connection_thread.start() + + def stop(self) -> None: + """停止WebSocket连接和任务调度器""" + if self.is_disabled: + return + + logger.info("[WebSocket] Stopping connection") + self.is_running = False + self.connected = False + + # 停止任务调度器 + if self.task_scheduler: + self.task_scheduler.stop() + + if self.event_loop and self.event_loop.is_running(): + asyncio.run_coroutine_threadsafe(self._close_connection(), self.event_loop) + + if self.connection_thread and self.connection_thread.is_alive(): + self.connection_thread.join(timeout=5) + + def _run_connection(self): + """在独立线程中运行WebSocket连接""" + try: + # 创建新的事件循环 + self.event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.event_loop) + + # 运行连接逻辑 + self.event_loop.run_until_complete(self._connection_handler()) + except Exception as e: + logger.error(f"[WebSocket] Connection thread error: {str(e)}") + logger.error(traceback.format_exc()) + finally: + if self.event_loop: + self.event_loop.close() + + async def _connection_handler(self): + """处理WebSocket连接和重连逻辑""" + while self.is_running: + try: + # 构建SSL上下文 + ssl_context = None + assert self.websocket_url is not None + if self.websocket_url.startswith("wss://"): + ssl_context = ssl_module.create_default_context() + ws_logger = logging.getLogger("websockets.client") + ws_logger.setLevel(logging.INFO) + async with websockets.connect( + self.websocket_url, + ssl=ssl_context, + ping_interval=WSConfig.ping_interval, + ping_timeout=10, + additional_headers={"Authorization": f"Lab {BasicConfig.auth_secret()}"}, + logger=ws_logger, + ) as websocket: + self.websocket = websocket + self.connected = True + self.reconnect_count = 0 + + logger.info(f"[WebSocket] Connected to {self.websocket_url}") + + # 处理消息 + await self._message_handler() + + except websockets.exceptions.ConnectionClosed: + logger.warning("[WebSocket] Connection closed") + self.connected = False + except Exception as e: + logger.error(f"[WebSocket] Connection error: {str(e)}") + self.connected = False + finally: + # WebSocket连接结束时只需重置websocket对象 + self.websocket = None + + # 重连逻辑 + if self.is_running and self.reconnect_count < WSConfig.max_reconnect_attempts: + self.reconnect_count += 1 + logger.info( + f"[WebSocket] Reconnecting in {WSConfig.reconnect_interval}s " + f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" + ) + await asyncio.sleep(WSConfig.reconnect_interval) + elif self.reconnect_count >= WSConfig.max_reconnect_attempts: + logger.error("[WebSocket] Max reconnection attempts reached") + break + else: + self.reconnect_count -= 1 + + async def _close_connection(self): + """关闭WebSocket连接""" + if self.websocket: + await self.websocket.close() + self.websocket = None + + # 消息处理方法 + async def _message_handler(self): + """处理接收到的消息""" + if not self.websocket: + logger.error("[WebSocket] WebSocket connection is None") + return + + try: + async for message in self.websocket: + try: + data = json.loads(message) + await self._process_message(data) + except json.JSONDecodeError: + logger.error(f"[WebSocket] Invalid JSON received: {message}") + except Exception as e: + logger.error(f"[WebSocket] Error processing message: {str(e)}") + except websockets.exceptions.ConnectionClosed: + logger.info("[WebSocket] Message handler stopped - connection closed") + except Exception as e: + logger.error(f"[WebSocket] Message handler error: {str(e)}") + + async def _process_message(self, input_message: Dict[str, Any]): + """处理收到的消息""" + message_type = input_message.get("action", "") + data = input_message.get("data", {}) + + if message_type == "pong": + # 处理pong响应(WebSocket层面的连接管理) + self._handle_pong_sync(data) + elif self.task_scheduler: + # 其他消息交给TaskScheduler处理 + if message_type == "job_start": + await self.task_scheduler.handle_job_start(data) + elif message_type == "query_action_state": + await self.task_scheduler.handle_query_state(data) + elif message_type == "cancel_action": + await self.task_scheduler.handle_cancel_action(data) + elif message_type == "": + return + else: + logger.debug(f"[WebSocket] Unknown message: {input_message}") + else: + logger.warning(f"[WebSocket] Task scheduler not available for message: {message_type}") + + def _handle_pong_sync(self, pong_data: Dict[str, Any]): + """同步处理pong响应""" + host_node = HostNode.get_instance(0) + if host_node: + host_node.handle_pong_response(pong_data) + + # 消息发送方法 + async def _send_message(self, message: Dict[str, Any]): + """内部发送消息方法""" + if not self.connected or not self.websocket: + logger.warning("[WebSocket] Not connected, cannot send message") + return + + try: + message_str = json.dumps(message, ensure_ascii=False) + await self.websocket.send(message_str) + logger.debug(f"[WebSocket] Message sent: {message['action']}") + except Exception as e: + logger.error(f"[WebSocket] Failed to send message: {str(e)}") + + # MessageSender接口实现 + async def send_message(self, message: Dict[str, Any]) -> None: + """发送消息(TaskScheduler调用的接口)""" + await self._send_message(message) + + def is_connected(self) -> bool: + """检查是否已连接(TaskScheduler调用的接口)""" + return self.connected and not self.is_disabled + + # 基类方法实现 + def publish_device_status(self, device_status: dict, device_id: str, property_name: str) -> None: + """发布设备状态""" + if self.is_disabled or not self.connected: + return + message = { + "action": "device_status", + "data": { + "device_id": device_id, + "data": { + "property_name": property_name, + "status": device_status.get(device_id, {}).get(property_name), + "timestamp": time.time(), + }, + }, + } + if self.event_loop: + asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop) + logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}") + + def publish_job_status( + self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None + ) -> None: + """发布作业状态(转发给TaskScheduler)""" + if self.task_scheduler: + self.task_scheduler.publish_job_status(feedback_data, item, status, return_info) + else: + logger.debug(f"[WebSocket] Task scheduler not available for job status: {item.job_id}") + + def send_ping(self, ping_id: str, timestamp: float) -> None: + """发送ping消息""" + if self.is_disabled or not self.connected: + logger.warning("[WebSocket] Not connected, cannot send ping") + return + message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}} + if self.event_loop: + asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop) + logger.debug(f"[WebSocket] Ping sent: {ping_id}") + + def cancel_goal(self, job_id: str) -> None: + """取消指定的任务(转发给TaskScheduler)""" + logger.debug(f"[WebSocket] Received cancel goal request for job_id: {job_id}") + if self.task_scheduler: + self.task_scheduler.cancel_goal(job_id) + logger.debug(f"[WebSocket] Forwarded cancel goal to TaskScheduler for job_id: {job_id}") + else: + logger.debug(f"[WebSocket] Task scheduler not available for cancel goal: {job_id}") diff --git a/unilabos/compile/__init__.py b/unilabos/compile/__init__.py index fd848ba..51ca9a2 100644 --- a/unilabos/compile/__init__.py +++ b/unilabos/compile/__init__.py @@ -46,6 +46,7 @@ action_protocol_generators = { HeatChillStopProtocol: generate_heat_chill_stop_protocol, HydrogenateProtocol: generate_hydrogenate_protocol, PumpTransferProtocol: generate_pump_protocol_with_rinsing, + TransferProtocol: generate_pump_protocol, RecrystallizeProtocol: generate_recrystallize_protocol, ResetHandlingProtocol: generate_reset_handling_protocol, RunColumnProtocol: generate_run_column_protocol, diff --git a/unilabos/config/config.py b/unilabos/config/config.py index 44bcc25..99c31ee 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -1,14 +1,18 @@ #!/usr/bin/env python # coding=utf-8 # 定义配置变量和加载函数 +import base64 import traceback import os import importlib.util +from typing import Optional from unilabos.utils import logger class BasicConfig: ENV = "pro" # 'test' + ak = "" + sk = "" working_dir = "" config_path = "" is_host_mode = True @@ -17,6 +21,17 @@ class BasicConfig: machine_name = "undefined" vis_2d_enable = False enable_resource_load = True + # 通信协议配置 + communication_protocol = "mqtt" # 支持: "mqtt", "websocket" + + @classmethod + def auth_secret(cls): + # base64编码 + if not cls.ak or not cls.sk: + return "" + target = f"{cls.ak}:{cls.sk}" + base64_target = base64.b64encode(target.encode("utf-8")).decode("utf-8") + return base64_target # MQTT配置 @@ -38,6 +53,13 @@ class MQConfig: key_file = "" # 相对config.py所在目录的路径 +# WebSocket配置 +class WSConfig: + reconnect_interval = 5 # 重连间隔(秒) + max_reconnect_attempts = 999 # 最大重连次数 + ping_interval = 30 # ping间隔(秒) + + # OSS上传配置 class OSSUploadConfig: api_host = "" @@ -65,7 +87,7 @@ class ROSConfig: ] -def _update_config_from_module(module, override_labid: str): +def _update_config_from_module(module, override_labid: Optional[str]): for name, obj in globals().items(): if isinstance(obj, type) and name.endswith("Config"): if hasattr(module, name) and isinstance(getattr(module, name), type): @@ -74,7 +96,7 @@ def _update_config_from_module(module, override_labid: str): setattr(obj, attr, getattr(getattr(module, name), attr)) # 更新OSS认证 if len(OSSUploadConfig.authorization) == 0: - OSSUploadConfig.authorization = f"lab {MQConfig.lab_id}" + OSSUploadConfig.authorization = f"Lab {MQConfig.lab_id}" # 对 ca_file cert_file key_file 进行初始化 if override_labid: MQConfig.lab_id = override_labid @@ -159,7 +181,6 @@ def _update_config_from_env(): logger.warning(f"[ENV] 解析环境变量 {env_key} 失败: {e}") - def load_config(config_path=None, override_labid=None): # 如果提供了配置文件路径,从该文件导入配置 if config_path: diff --git a/unilabos/config/example_config.py b/unilabos/config/example_config.py index 91e0830..07018cb 100644 --- a/unilabos/config/example_config.py +++ b/unilabos/config/example_config.py @@ -12,6 +12,7 @@ class MQConfig: cert_file = "./lab.crt" key_file = "./lab.key" + # HTTP配置 class HTTPConfig: remote_addr = "https://uni-lab.bohrium.com/api/v1" diff --git a/unilabos/registry/device_comms/communication_devices.yaml b/unilabos/registry/device_comms/communication_devices.yaml index 4b49cc9..6b21394 100644 --- a/unilabos/registry/device_comms/communication_devices.yaml +++ b/unilabos/registry/device_comms/communication_devices.yaml @@ -9,7 +9,7 @@ serial: goal_default: request: null response: null - handles: [] + handles: {} result: {} schema: description: handle_serial_request的参数schema @@ -35,7 +35,7 @@ serial: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: read_data的参数schema @@ -56,7 +56,7 @@ serial: goal: {} goal_default: command: null - handles: [] + handles: {} result: {} schema: description: send_command的参数schema diff --git a/unilabos/registry/devices/camera.yaml b/unilabos/registry/devices/camera.yaml index 5f5b24b..8d7b09f 100644 --- a/unilabos/registry/devices/camera.yaml +++ b/unilabos/registry/devices/camera.yaml @@ -7,7 +7,7 @@ camera.USB: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: 用于安全地关闭摄像头设备,释放摄像头资源,停止视频采集和发布服务。调用此函数将清理OpenCV摄像头连接并销毁ROS2节点。 @@ -27,7 +27,7 @@ camera.USB: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: 定时器回调函数的参数schema。此函数负责定期采集摄像头视频帧,将OpenCV格式的图像转换为ROS Image消息格式,并发布到指定的视频话题。默认以10Hz频率执行,确保视频流的连续性和实时性。 diff --git a/unilabos/registry/devices/characterization_chromatic.yaml b/unilabos/registry/devices/characterization_chromatic.yaml index 7132b4f..527c682 100644 --- a/unilabos/registry/devices/characterization_chromatic.yaml +++ b/unilabos/registry/devices/characterization_chromatic.yaml @@ -7,7 +7,7 @@ hplc.agilent: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: 检查安捷伦HPLC设备状态的函数。用于监控设备的运行状态、连接状态、错误信息等关键指标。该函数定期查询设备状态,确保系统稳定运行,及时发现和报告设备异常。适用于自动化流程中的设备监控、故障诊断、系统维护等场景。 @@ -28,7 +28,7 @@ hplc.agilent: goal: {} goal_default: file_path: null - handles: [] + handles: {} result: {} schema: description: 从文本文件中提取分析数据的函数。用于解析安捷伦HPLC生成的结果文件,提取峰面积、保留时间、浓度等关键分析数据。支持多种文件格式的自动识别和数据结构化处理,为后续数据分析和报告生成提供标准化的数据格式。适用于批量数据处理、结果验证、质量控制等分析工作流程。 @@ -54,7 +54,7 @@ hplc.agilent: params: null resource: null wf_name: null - handles: [] + handles: {} result: {} schema: description: 启动安捷伦HPLC分析序列的函数。用于执行预定义的分析方法序列,包括样品进样、色谱分离、检测等完整的分析流程。支持参数配置、资源分配、工作流程管理等功能,实现全自动的样品分析。适用于批量样品处理、标准化分析、质量检测等需要连续自动分析的应用场景。 @@ -82,7 +82,7 @@ hplc.agilent: goal: {} goal_default: device_name: null - handles: [] + handles: {} result: {} schema: description: 尝试关闭HPLC子设备的函数。用于安全地关闭泵、检测器、进样器等各个子模块,确保设备正常断开连接并保护硬件安全。该函数提供错误处理和状态确认机制,避免强制关闭可能造成的设备损坏。适用于设备维护、系统重启、紧急停机等需要安全关闭设备的场景。 @@ -105,7 +105,7 @@ hplc.agilent: goal: {} goal_default: device_name: null - handles: [] + handles: {} result: {} schema: description: 尝试打开HPLC子设备的函数。用于初始化和连接泵、检测器、进样器等各个子模块,建立设备通信并进行自检。该函数提供连接验证和错误恢复机制,确保子设备正常启动并准备就绪。适用于设备初始化、系统启动、设备重连等需要建立设备连接的场景。 @@ -129,7 +129,7 @@ hplc.agilent: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -228,7 +228,7 @@ hplc.agilent-zhida: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -260,7 +260,7 @@ hplc.agilent-zhida: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: HPLC设备连接关闭函数。安全地断开与智达HPLC设备的TCP socket连接,释放网络资源。该函数确保连接的正确关闭,避免网络资源泄露。通常在设备使用完毕或系统关闭时调用。 @@ -280,7 +280,7 @@ hplc.agilent-zhida: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: HPLC设备连接建立函数。与智达HPLC设备建立TCP socket通信连接,配置通信超时参数。该函数是设备使用前的必要步骤,建立成功后可进行状态查询、方法获取、任务启动等操作。连接失败时会抛出异常。 @@ -300,7 +300,7 @@ hplc.agilent-zhida: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -334,7 +334,7 @@ hplc.agilent-zhida: string: string goal_default: string: '' - handles: [] + handles: {} result: {} schema: description: '' diff --git a/unilabos/registry/devices/characterization_optic.yaml b/unilabos/registry/devices/characterization_optic.yaml index 6ebdcbd..f8a3105 100644 --- a/unilabos/registry/devices/characterization_optic.yaml +++ b/unilabos/registry/devices/characterization_optic.yaml @@ -8,7 +8,7 @@ raman.home_made: goal: {} goal_default: int_time: null - handles: [] + handles: {} result: {} schema: description: 设置CCD检测器积分时间的函数。用于配置拉曼光谱仪的信号采集时间,控制光谱数据的质量和信噪比。较长的积分时间可获得更高的信号强度和更好的光谱质量,但会增加测量时间。该函数允许根据样品特性和测量要求动态调整检测参数,优化测量效果。 @@ -32,7 +32,7 @@ raman.home_made: goal: {} goal_default: output_voltage_laser: null - handles: [] + handles: {} result: {} schema: description: 设置激光器输出功率的函数。用于控制拉曼光谱仪激光器的功率输出,调节激光强度以适应不同样品的测量需求。适当的激光功率能够获得良好的拉曼信号同时避免样品损伤。该函数支持精确的功率控制,确保测量结果的稳定性和重现性。 @@ -57,7 +57,7 @@ raman.home_made: goal_default: int_time: null laser_power: null - handles: [] + handles: {} result: {} schema: description: 执行无背景扣除的拉曼光谱测量函数。用于直接采集样品的拉曼光谱信号,不进行背景校正处理。该函数配置积分时间和激光功率参数,获取原始光谱数据用于后续的数据处理分析。适用于对光谱数据质量要求较高或需要自定义背景处理流程的测量场景。 @@ -87,7 +87,7 @@ raman.home_made: int_time: null laser_power: null sample_name: null - handles: [] + handles: {} result: {} schema: description: 执行多次平均的无背景拉曼光谱测量函数。通过多次测量取平均值来提高光谱数据的信噪比和测量精度,减少随机噪声影响。该函数支持自定义平均次数、积分时间、激光功率等参数,并可为样品指定名称便于数据管理。适用于对测量精度要求较高的定量分析和研究应用。 @@ -121,7 +121,7 @@ raman.home_made: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: diff --git a/unilabos/registry/devices/gas_handler.yaml b/unilabos/registry/devices/gas_handler.yaml index 944675b..644696a 100644 --- a/unilabos/registry/devices/gas_handler.yaml +++ b/unilabos/registry/devices/gas_handler.yaml @@ -7,7 +7,7 @@ gas_source.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_closed的参数schema @@ -27,7 +27,7 @@ gas_source.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_open的参数schema @@ -47,7 +47,7 @@ gas_source.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -79,7 +79,7 @@ gas_source.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -113,7 +113,7 @@ gas_source.mock: string: string goal_default: string: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -187,7 +187,7 @@ vacuum_pump.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_closed的参数schema @@ -207,7 +207,7 @@ vacuum_pump.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_open的参数schema @@ -227,7 +227,7 @@ vacuum_pump.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -259,7 +259,7 @@ vacuum_pump.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -293,7 +293,7 @@ vacuum_pump.mock: string: string goal_default: string: '' - handles: [] + handles: {} result: {} schema: description: '' diff --git a/unilabos/registry/devices/liquid_handler.yaml b/unilabos/registry/devices/liquid_handler.yaml index 12ddc91..df38dbb 100644 --- a/unilabos/registry/devices/liquid_handler.yaml +++ b/unilabos/registry/devices/liquid_handler.yaml @@ -87,7 +87,7 @@ liquid_handler: type: '' use_channels: - 0 - handles: [] + handles: {} placeholder_keys: reagent_sources: unilabos_resources targets: unilabos_resources @@ -398,7 +398,7 @@ liquid_handler: - 0 vols: - 0.0 - handles: [] + handles: {} result: name: name schema: @@ -564,7 +564,7 @@ liquid_handler: protocol_name: null protocol_type: null protocol_version: null - handles: [] + handles: {} result: {} schema: description: 创建实验协议函数。用于建立新的液体处理实验协议,定义协议名称、描述、版本、作者、日期等基本信息。该函数支持协议模板化管理,便于实验流程的标准化和重复性。适用于实验设计、方法开发、标准操作程序建立等需要协议管理的应用场景。 @@ -607,7 +607,7 @@ liquid_handler: goal_default: msg: null seconds: 0 - handles: [] + handles: {} result: {} schema: description: 自定义延时函数。在实验流程中插入可配置的等待时间,用于满足特定的反应时间、孵育时间或设备稳定时间要求。支持自定义延时消息和秒数设置,提供流程控制和时间管理功能。适用于酶反应等待、温度平衡、样品孵育等需要时间控制的实验步骤。 @@ -633,7 +633,7 @@ liquid_handler: goal: {} goal_default: tip_racks: null - handles: [] + handles: {} result: {} schema: description: 吸头迭代函数。用于自动管理和切换吸头架中的吸头,实现批量实验中的吸头自动分配和追踪。该函数监控吸头使用状态,自动切换到下一个可用吸头位置,确保实验流程的连续性。适用于高通量实验、批量处理、自动化流水线等需要大量吸头管理的应用场景。 @@ -657,7 +657,7 @@ liquid_handler: goal: {} goal_default: tip_racks: null - handles: [] + handles: {} result: {} schema: description: 吸头架设置函数。用于配置和初始化液体处理系统的吸头架信息,包括吸头架位置、类型、容量等参数。该函数建立吸头资源管理系统,为后续的吸头选择和使用提供基础配置。适用于系统初始化、吸头架更换、实验配置等需要吸头资源管理的操作场景。 @@ -681,7 +681,7 @@ liquid_handler: goal: {} goal_default: targets: null - handles: [] + handles: {} result: {} schema: description: 吸头碰触函数。控制移液器吸头轻触容器边缘或底部,用于去除吸头外壁附着的液滴,提高移液精度和减少污染。该函数支持多目标位置操作,可配置碰触参数和位置偏移。适用于精密移液、减少液体残留、防止交叉污染等需要提高移液质量的实验操作。 @@ -707,7 +707,7 @@ liquid_handler: goal_default: use_channels: - 0 - handles: [] + handles: {} result: name: name schema: @@ -790,7 +790,7 @@ liquid_handler: - 0 vols: - 0.0 - handles: [] + handles: {} result: name: name schema: @@ -977,7 +977,7 @@ liquid_handler: type: '' use_channels: - 0 - handles: [] + handles: {} placeholder_keys: tip_spots: unilabos_resources result: @@ -1146,7 +1146,7 @@ liquid_handler: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: name: name schema: @@ -1311,7 +1311,7 @@ liquid_handler: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -1527,7 +1527,7 @@ liquid_handler: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: name: name schema: @@ -1844,7 +1844,7 @@ liquid_handler: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: name: name schema: @@ -2156,7 +2156,7 @@ liquid_handler: x: 0.0 y: 0.0 z: 0.0 - handles: [] + handles: {} result: name: name schema: @@ -2368,7 +2368,7 @@ liquid_handler: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -2514,7 +2514,7 @@ liquid_handler: type: '' use_channels: - 0 - handles: [] + handles: {} result: name: name schema: @@ -2676,7 +2676,7 @@ liquid_handler: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: name: name schema: @@ -2876,7 +2876,7 @@ liquid_handler: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -3203,7 +3203,7 @@ liquid_handler: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} placeholder_keys: sources: unilabos_resources waste_liquid: unilabos_resources @@ -3463,7 +3463,7 @@ liquid_handler: allow_nonzero_volume: false use_channels: - 0 - handles: [] + handles: {} result: name: name schema: @@ -3511,7 +3511,7 @@ liquid_handler: allow_nonzero_volume: allow_nonzero_volume goal_default: allow_nonzero_volume: false - handles: [] + handles: {} result: name: name schema: @@ -3598,7 +3598,7 @@ liquid_handler: sample_id: '' type: '' volume: 0.0 - handles: [] + handles: {} result: name: name schema: @@ -3807,7 +3807,7 @@ liquid_handler: to_vessel: '' viscous: false volume: 0.0 - handles: [] + handles: {} schema: description: '' properties: @@ -4431,7 +4431,7 @@ liquid_handler.biomek: resource_tracker: null resources: null slot_on_deck: null - handles: [] + handles: {} result: {} schema: description: create_resource的参数schema @@ -4482,7 +4482,7 @@ liquid_handler.biomek: liquid_volume: null parent: null slot_on_deck: null - handles: [] + handles: {} result: {} schema: description: instrument_setup_biomek的参数schema @@ -4538,7 +4538,7 @@ liquid_handler.biomek: protocol_name: '' protocol_type: '' protocol_version: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -4766,7 +4766,7 @@ liquid_handler.biomek: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -5487,7 +5487,7 @@ liquid_handler.prcxi: type: '' use_channels: - 0 - handles: [] + handles: {} placeholder_keys: reagent_sources: unilabos_resources targets: unilabos_resources @@ -5798,7 +5798,7 @@ liquid_handler.prcxi: - 0 vols: - 0.0 - handles: [] + handles: {} placeholder_keys: resources: unilabos_resources result: {} @@ -5965,7 +5965,7 @@ liquid_handler.prcxi: protocol_name: '' protocol_type: '' protocol_version: '' - handles: [] + handles: {} result: {} schema: description: create_protocol的参数schema @@ -6008,7 +6008,7 @@ liquid_handler.prcxi: goal_default: msg: null seconds: 0 - handles: [] + handles: {} result: {} schema: description: custom_delay的参数schema @@ -6034,7 +6034,7 @@ liquid_handler.prcxi: goal: {} goal_default: tip_racks: null - handles: [] + handles: {} result: {} schema: description: iter_tips的参数schema @@ -6060,7 +6060,7 @@ liquid_handler.prcxi: channel: 0 dis_to_top: 0 well: null - handles: [] + handles: {} result: {} schema: description: move_to的参数schema @@ -6089,7 +6089,7 @@ liquid_handler.prcxi: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: run_protocol的参数schema @@ -6110,7 +6110,7 @@ liquid_handler.prcxi: goal: {} goal_default: targets: null - handles: [] + handles: {} result: {} schema: description: touch_tip的参数schema @@ -6136,7 +6136,7 @@ liquid_handler.prcxi: goal_default: use_channels: - 0 - handles: [] + handles: {} result: {} schema: description: '' @@ -6218,7 +6218,7 @@ liquid_handler.prcxi: - 0 vols: - 0.0 - handles: [] + handles: {} placeholder_keys: resources: unilabos_resources result: {} @@ -6406,7 +6406,7 @@ liquid_handler.prcxi: type: '' use_channels: - 0 - handles: [] + handles: {} placeholder_keys: tip_spots: unilabos_resources result: {} @@ -6583,7 +6583,7 @@ liquid_handler.prcxi: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} placeholder_keys: targets: unilabos_resources result: {} @@ -6763,7 +6763,7 @@ liquid_handler.prcxi: type: '' use_channels: - 0 - handles: [] + handles: {} placeholder_keys: tip_spots: unilabos_resources result: {} @@ -6975,7 +6975,7 @@ liquid_handler.prcxi: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} placeholder_keys: sources: unilabos_resources waste_liquid: unilabos_resources @@ -7257,7 +7257,7 @@ liquid_handler.prcxi: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} placeholder_keys: wells: unilabos_resources result: {} @@ -7398,7 +7398,7 @@ liquid_handler.prcxi: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} placeholder_keys: tip_racks: unilabos_resources result: {} @@ -7527,7 +7527,7 @@ liquid_handler.prcxi: to_vessel: '' viscous: false volume: 0.0 - handles: [] + handles: {} schema: description: '' properties: @@ -7717,7 +7717,7 @@ liquid_handler.prcxi: touch_tip: false use_channels: - 0 - handles: [] + handles: {} placeholder_keys: sources: unilabos_resources targets: unilabos_resources @@ -8164,7 +8164,7 @@ liquid_handler.revvity: sample_id: '' type: '' wf_name: '' - handles: [] + handles: {} result: success: success schema: diff --git a/unilabos/registry/devices/organic_miscellaneous.yaml b/unilabos/registry/devices/organic_miscellaneous.yaml index c3e4aa2..1ac2ac2 100644 --- a/unilabos/registry/devices/organic_miscellaneous.yaml +++ b/unilabos/registry/devices/organic_miscellaneous.yaml @@ -8,7 +8,7 @@ rotavap.one: goal: {} goal_default: cmd: null - handles: [] + handles: {} result: {} schema: description: cmd_write的参数schema @@ -31,7 +31,7 @@ rotavap.one: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: main_loop的参数schema @@ -52,7 +52,7 @@ rotavap.one: goal: {} goal_default: time: null - handles: [] + handles: {} result: {} schema: description: set_pump_time的参数schema @@ -76,7 +76,7 @@ rotavap.one: goal: {} goal_default: time: null - handles: [] + handles: {} result: {} schema: description: set_rotate_time的参数schema @@ -101,7 +101,7 @@ rotavap.one: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -171,7 +171,7 @@ separator.homemade: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: read_sensor_loop的参数schema @@ -193,7 +193,7 @@ separator.homemade: goal_default: condition: null value: null - handles: [] + handles: {} result: {} schema: description: valve_open的参数schema @@ -220,7 +220,7 @@ separator.homemade: goal: {} goal_default: data: null - handles: [] + handles: {} result: {} schema: description: write的参数schema @@ -273,7 +273,7 @@ separator.homemade: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: success: success schema: @@ -410,7 +410,7 @@ separator.homemade: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: diff --git a/unilabos/registry/devices/pump_and_valve.yaml b/unilabos/registry/devices/pump_and_valve.yaml index 7fc3a20..352e841 100644 --- a/unilabos/registry/devices/pump_and_valve.yaml +++ b/unilabos/registry/devices/pump_and_valve.yaml @@ -7,7 +7,7 @@ solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: close的参数schema @@ -27,7 +27,7 @@ solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_closed的参数schema @@ -47,7 +47,7 @@ solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_open的参数schema @@ -67,7 +67,7 @@ solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -87,7 +87,7 @@ solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: read_data的参数schema @@ -108,7 +108,7 @@ solenoid_valve: goal: {} goal_default: command: null - handles: [] + handles: {} result: {} schema: description: send_command的参数schema @@ -133,7 +133,7 @@ solenoid_valve: string: position goal_default: string: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -204,7 +204,7 @@ solenoid_valve.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_closed的参数schema @@ -224,7 +224,7 @@ solenoid_valve.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_open的参数schema @@ -245,7 +245,7 @@ solenoid_valve.mock: goal: {} goal_default: position: null - handles: [] + handles: {} result: {} schema: description: set_valve_position的参数schema @@ -268,7 +268,7 @@ solenoid_valve.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -300,7 +300,7 @@ solenoid_valve.mock: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -375,7 +375,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: close的参数schema @@ -395,7 +395,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -416,7 +416,7 @@ syringe_pump_with_valve.runze.SY03B-T06: goal: {} goal_default: volume: null - handles: [] + handles: {} result: {} schema: description: pull_plunger的参数schema @@ -440,7 +440,7 @@ syringe_pump_with_valve.runze.SY03B-T06: goal: {} goal_default: volume: null - handles: [] + handles: {} result: {} schema: description: push_plunger的参数schema @@ -463,7 +463,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_aux_input_status_1的参数schema @@ -483,7 +483,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_aux_input_status_2的参数schema @@ -503,7 +503,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_backlash_position的参数schema @@ -523,7 +523,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_command_buffer_status的参数schema @@ -543,7 +543,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_software_version的参数schema @@ -564,7 +564,7 @@ syringe_pump_with_valve.runze.SY03B-T06: goal: {} goal_default: full_command: null - handles: [] + handles: {} result: {} schema: description: send_command的参数schema @@ -588,7 +588,7 @@ syringe_pump_with_valve.runze.SY03B-T06: goal: {} goal_default: baudrate: null - handles: [] + handles: {} result: {} schema: description: set_baudrate的参数schema @@ -612,7 +612,7 @@ syringe_pump_with_valve.runze.SY03B-T06: goal: {} goal_default: velocity: null - handles: [] + handles: {} result: {} schema: description: set_max_velocity的参数schema @@ -637,7 +637,7 @@ syringe_pump_with_valve.runze.SY03B-T06: goal_default: max_velocity: null position: null - handles: [] + handles: {} result: {} schema: description: set_position的参数schema @@ -663,7 +663,7 @@ syringe_pump_with_valve.runze.SY03B-T06: goal: {} goal_default: position: null - handles: [] + handles: {} result: {} schema: description: set_valve_position的参数schema @@ -687,7 +687,7 @@ syringe_pump_with_valve.runze.SY03B-T06: goal: {} goal_default: velocity: null - handles: [] + handles: {} result: {} schema: description: set_velocity_grade的参数schema @@ -710,7 +710,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: stop_operation的参数schema @@ -730,7 +730,7 @@ syringe_pump_with_valve.runze.SY03B-T06: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: wait_error的参数schema @@ -879,7 +879,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: close的参数schema @@ -899,7 +899,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -920,7 +920,7 @@ syringe_pump_with_valve.runze.SY03B-T08: goal: {} goal_default: volume: null - handles: [] + handles: {} result: {} schema: description: pull_plunger的参数schema @@ -944,7 +944,7 @@ syringe_pump_with_valve.runze.SY03B-T08: goal: {} goal_default: volume: null - handles: [] + handles: {} result: {} schema: description: push_plunger的参数schema @@ -967,7 +967,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_aux_input_status_1的参数schema @@ -987,7 +987,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_aux_input_status_2的参数schema @@ -1007,7 +1007,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_backlash_position的参数schema @@ -1027,7 +1027,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_command_buffer_status的参数schema @@ -1047,7 +1047,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: query_software_version的参数schema @@ -1068,7 +1068,7 @@ syringe_pump_with_valve.runze.SY03B-T08: goal: {} goal_default: full_command: null - handles: [] + handles: {} result: {} schema: description: send_command的参数schema @@ -1092,7 +1092,7 @@ syringe_pump_with_valve.runze.SY03B-T08: goal: {} goal_default: baudrate: null - handles: [] + handles: {} result: {} schema: description: set_baudrate的参数schema @@ -1116,7 +1116,7 @@ syringe_pump_with_valve.runze.SY03B-T08: goal: {} goal_default: velocity: null - handles: [] + handles: {} result: {} schema: description: set_max_velocity的参数schema @@ -1141,7 +1141,7 @@ syringe_pump_with_valve.runze.SY03B-T08: goal_default: max_velocity: null position: null - handles: [] + handles: {} result: {} schema: description: set_position的参数schema @@ -1167,7 +1167,7 @@ syringe_pump_with_valve.runze.SY03B-T08: goal: {} goal_default: position: null - handles: [] + handles: {} result: {} schema: description: set_valve_position的参数schema @@ -1191,7 +1191,7 @@ syringe_pump_with_valve.runze.SY03B-T08: goal: {} goal_default: velocity: null - handles: [] + handles: {} result: {} schema: description: set_velocity_grade的参数schema @@ -1214,7 +1214,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: stop_operation的参数schema @@ -1234,7 +1234,7 @@ syringe_pump_with_valve.runze.SY03B-T08: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: wait_error的参数schema diff --git a/unilabos/registry/devices/robot_agv.yaml b/unilabos/registry/devices/robot_agv.yaml index be4aa6c..74085b1 100644 --- a/unilabos/registry/devices/robot_agv.yaml +++ b/unilabos/registry/devices/robot_agv.yaml @@ -10,7 +10,7 @@ agv.SEER: cmd: null ex_data: '' obj: receive_socket - handles: [] + handles: {} result: {} schema: description: AGV底层通信命令发送函数。通过TCP socket连接向AGV发送底层控制命令,支持pose(位置)、status(状态)、nav(导航)等命令类型。用于获取AGV当前位置坐标、运行状态或发送导航指令。该函数封装了AGV的通信协议,将命令转换为十六进制数据包并处理响应解析。 @@ -41,7 +41,7 @@ agv.SEER: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: diff --git a/unilabos/registry/devices/robot_arm.yaml b/unilabos/registry/devices/robot_arm.yaml index 61803d1..cf72c76 100644 --- a/unilabos/registry/devices/robot_arm.yaml +++ b/unilabos/registry/devices/robot_arm.yaml @@ -7,7 +7,7 @@ robotic_arm.SCARA_with_slider.virtual: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: check_tf_update_actions的参数schema @@ -32,7 +32,7 @@ robotic_arm.SCARA_with_slider.virtual: move_group: null retry: 10 speed: 1 - handles: [] + handles: {} result: {} schema: description: moveit_joint_task的参数schema @@ -77,7 +77,7 @@ robotic_arm.SCARA_with_slider.virtual: retry: 10 speed: 1 target_link: null - handles: [] + handles: {} result: {} schema: description: moveit_task的参数schema @@ -124,7 +124,7 @@ robotic_arm.SCARA_with_slider.virtual: goal: {} goal_default: ros_node: null - handles: [] + handles: {} result: {} schema: description: post_init的参数schema @@ -149,7 +149,7 @@ robotic_arm.SCARA_with_slider.virtual: goal_default: parent_link: null resource: null - handles: [] + handles: {} result: {} schema: description: resource_manager的参数schema @@ -175,7 +175,7 @@ robotic_arm.SCARA_with_slider.virtual: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: wait_for_resource_action的参数schema @@ -197,7 +197,7 @@ robotic_arm.SCARA_with_slider.virtual: command: command goal_default: command: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -240,7 +240,7 @@ robotic_arm.SCARA_with_slider.virtual: command: command goal_default: command: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -283,7 +283,7 @@ robotic_arm.SCARA_with_slider.virtual: command: command goal_default: command: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -359,7 +359,7 @@ robotic_arm.UR: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: 机械臂初始化函数。执行UR机械臂的完整初始化流程,包括上电、释放制动器、解除保护停止状态等。该函数确保机械臂从安全停止状态恢复到可操作状态,是机械臂使用前的必要步骤。初始化完成后机械臂将处于就绪状态,可以接收后续的运动指令。 @@ -380,7 +380,7 @@ robotic_arm.UR: goal: {} goal_default: data: null - handles: [] + handles: {} result: {} schema: description: 从JSON字符串加载位置数据函数。接收包含机械臂位置信息的JSON格式字符串,解析并存储位置数据供后续运动任务使用。位置数据通常包含多个预定义的工作位置坐标,用于实现精确的多点运动控制。适用于动态配置机械臂工作位置的场景。 @@ -404,7 +404,7 @@ robotic_arm.UR: goal: {} goal_default: file: null - handles: [] + handles: {} result: {} schema: description: 从文件加载位置数据函数。读取指定的JSON文件并加载其中的机械臂位置信息。该函数支持从外部配置文件中获取预设的工作位置,便于位置数据的管理和重用。适用于需要从固定配置文件中读取复杂位置序列的应用场景。 @@ -427,7 +427,7 @@ robotic_arm.UR: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: 重新加载位置数据函数。重新读取并解析之前设置的位置文件,更新内存中的位置数据。该函数用于在位置文件被修改后刷新机械臂的位置配置,无需重新初始化整个系统。适用于动态更新机械臂工作位置的场景。 @@ -449,7 +449,7 @@ robotic_arm.UR: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -535,7 +535,7 @@ robotic_arm.elite: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -555,7 +555,7 @@ robotic_arm.elite: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -578,7 +578,7 @@ robotic_arm.elite: quantity: null start_addr: null unit_id: null - handles: [] + handles: {} result: {} schema: description: '' @@ -608,7 +608,7 @@ robotic_arm.elite: goal: {} goal_default: job_id: null - handles: [] + handles: {} result: {} schema: description: '' @@ -634,7 +634,7 @@ robotic_arm.elite: register_addr: null unit_id: null value: null - handles: [] + handles: {} result: {} schema: description: '' @@ -664,7 +664,7 @@ robotic_arm.elite: goal: {} goal_default: response: null - handles: [] + handles: {} result: {} schema: description: '' @@ -688,7 +688,7 @@ robotic_arm.elite: goal: {} goal_default: command: null - handles: [] + handles: {} result: {} schema: description: '' @@ -713,7 +713,7 @@ robotic_arm.elite: command: command goal_default: command: '' - handles: [] + handles: {} result: {} schema: description: '' diff --git a/unilabos/registry/devices/robot_gripper.yaml b/unilabos/registry/devices/robot_gripper.yaml index 2a2ccce..f3ee141 100644 --- a/unilabos/registry/devices/robot_gripper.yaml +++ b/unilabos/registry/devices/robot_gripper.yaml @@ -7,7 +7,7 @@ gripper.misumi_rz: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: data_loop的参数schema @@ -27,7 +27,7 @@ gripper.misumi_rz: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: data_reader的参数schema @@ -50,7 +50,7 @@ gripper.misumi_rz: force: null pos: null speed: null - handles: [] + handles: {} result: {} schema: description: 夹爪抓取运动控制函数。控制夹爪的开合运动,支持位置、速度、力矩的精确设定。位置参数控制夹爪开合程度,速度参数控制运动快慢,力矩参数控制夹持强度。该函数提供安全的力控制,避免损坏被抓取物体,适用于各种形状和材质的物品抓取。 @@ -79,7 +79,7 @@ gripper.misumi_rz: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: 夹爪初始化函数。执行Misumi RZ夹爪的完整初始化流程,包括Modbus通信建立、电机参数配置、传感器校准等。该函数确保夹爪系统从安全状态恢复到可操作状态,是夹爪使用前的必要步骤。初始化完成后夹爪将处于就绪状态,可接收抓取和旋转指令。 @@ -100,7 +100,7 @@ gripper.misumi_rz: goal: {} goal_default: data: null - handles: [] + handles: {} result: {} schema: description: modbus_crc的参数schema @@ -129,7 +129,7 @@ gripper.misumi_rz: spin_F: null spin_pos: null spin_v: null - handles: [] + handles: {} result: {} schema: description: move_and_rotate的参数schema @@ -168,7 +168,7 @@ gripper.misumi_rz: goal: {} goal_default: cmd: null - handles: [] + handles: {} result: {} schema: description: 节点夹爪移动任务函数。接收逗号分隔的命令字符串,解析位置、速度、力矩参数并执行夹爪抓取动作。该函数等待运动完成并返回执行结果,提供同步的运动控制接口。适用于需要可靠完成确认的精密抓取操作。 @@ -192,7 +192,7 @@ gripper.misumi_rz: goal: {} goal_default: cmd: null - handles: [] + handles: {} result: {} schema: description: 节点旋转移动任务函数。接收逗号分隔的命令字符串,解析角度、速度、力矩参数并执行夹爪旋转动作。该函数等待旋转完成并返回执行结果,提供同步的旋转控制接口。适用于需要精确角度定位和完成确认的旋转操作。 @@ -218,7 +218,7 @@ gripper.misumi_rz: address: null data_len: null id: null - handles: [] + handles: {} result: {} schema: description: read_address的参数schema @@ -250,7 +250,7 @@ gripper.misumi_rz: force: null pos: null speed: null - handles: [] + handles: {} result: {} schema: description: 夹爪绝对位置旋转控制函数。控制夹爪主轴旋转到指定的绝对角度位置,支持360度连续旋转。位置参数指定目标角度,速度参数控制旋转速率,力矩参数设定旋转阻力限制。该函数提供高精度的角度定位,适用于需要精确方向控制的操作场景。 @@ -283,7 +283,7 @@ gripper.misumi_rz: data: null fun: null id: null - handles: [] + handles: {} result: {} schema: description: send_cmd的参数schema @@ -315,7 +315,7 @@ gripper.misumi_rz: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: wait_for_gripper的参数schema @@ -335,7 +335,7 @@ gripper.misumi_rz: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: wait_for_gripper_init的参数schema @@ -355,7 +355,7 @@ gripper.misumi_rz: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: wait_for_rotate的参数schema @@ -377,7 +377,7 @@ gripper.misumi_rz: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -461,7 +461,7 @@ gripper.mock: resource: Gripper1: {} wf_name: gripper_run - handles: [] + handles: {} result: {} schema: description: 模拟夹爪资源ID编辑函数。用于测试和演示资源管理功能,模拟修改夹爪资源的标识信息。该函数接收工作流名称、参数和资源对象,模拟真实的资源更新过程并返回修改后的资源信息。适用于系统测试和开发调试场景。 @@ -498,7 +498,7 @@ gripper.mock: command: max_effort: 0.0 position: 0.0 - handles: [] + handles: {} result: effort: torque position: position diff --git a/unilabos/registry/devices/robot_linear_motion.yaml b/unilabos/registry/devices/robot_linear_motion.yaml index 7088535..4cc4eaf 100644 --- a/unilabos/registry/devices/robot_linear_motion.yaml +++ b/unilabos/registry/devices/robot_linear_motion.yaml @@ -7,7 +7,7 @@ linear_motion.grbl: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: CNC设备初始化函数。执行Grbl CNC的完整初始化流程,包括归零操作、轴校准和状态复位。该函数将所有轴移动到原点位置(0,0,0),确保设备处于已知的参考状态。初始化完成后设备进入空闲状态,可接收后续的运动指令。 @@ -28,7 +28,7 @@ linear_motion.grbl: goal: {} goal_default: position: null - handles: [] + handles: {} result: {} schema: description: CNC绝对位置设定函数。控制CNC设备移动到指定的三维坐标位置(x,y,z)。该函数支持安全限位检查,防止超出设备工作范围。移动过程中会监控设备状态,确保安全到达目标位置。适用于精确定位和轨迹控制操作。 @@ -51,7 +51,7 @@ linear_motion.grbl: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: CNC操作停止函数。立即停止当前正在执行的所有CNC运动,包括轴移动和主轴旋转。该函数用于紧急停止或任务中断,确保设备和工件的安全。停止后设备将保持当前位置,等待新的指令。 @@ -71,7 +71,7 @@ linear_motion.grbl: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: wait_error的参数schema @@ -113,7 +113,7 @@ linear_motion.grbl: x: 0.0 y: 0.0 z: 0.0 - handles: [] + handles: {} result: {} schema: description: '' @@ -345,7 +345,7 @@ linear_motion.grbl: nanosec: 0 sec: 0 position: 0.0 - handles: [] + handles: {} result: {} schema: description: '' @@ -479,7 +479,7 @@ linear_motion.toyo_xyz.sim: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: check_tf_update_actions的参数schema @@ -504,7 +504,7 @@ linear_motion.toyo_xyz.sim: move_group: null retry: 10 speed: 1 - handles: [] + handles: {} result: {} schema: description: moveit_joint_task的参数schema @@ -549,7 +549,7 @@ linear_motion.toyo_xyz.sim: retry: 10 speed: 1 target_link: null - handles: [] + handles: {} result: {} schema: description: moveit_task的参数schema @@ -596,7 +596,7 @@ linear_motion.toyo_xyz.sim: goal: {} goal_default: ros_node: null - handles: [] + handles: {} result: {} schema: description: post_init的参数schema @@ -621,7 +621,7 @@ linear_motion.toyo_xyz.sim: goal_default: parent_link: null resource: null - handles: [] + handles: {} result: {} schema: description: resource_manager的参数schema @@ -647,7 +647,7 @@ linear_motion.toyo_xyz.sim: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: wait_for_resource_action的参数schema @@ -669,7 +669,7 @@ linear_motion.toyo_xyz.sim: command: command goal_default: command: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -712,7 +712,7 @@ linear_motion.toyo_xyz.sim: command: command goal_default: command: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -755,7 +755,7 @@ linear_motion.toyo_xyz.sim: command: command goal_default: command: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -834,7 +834,7 @@ motor.iCL42: mode: null position: null velocity: null - handles: [] + handles: {} result: {} schema: description: 步进电机执行运动函数。直接执行电机运动命令,包括位置设定、速度控制和路径规划。该函数处理底层的电机控制协议,消除警告信息,设置运动参数并启动电机运行。适用于需要直接控制电机运动的应用场景。 @@ -863,7 +863,7 @@ motor.iCL42: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: iCL42电机设备初始化函数。建立与iCL42步进电机驱动器的串口通信连接,配置通信参数包括波特率、数据位、校验位等。该函数是电机使用前的必要步骤,确保驱动器处于可控状态并准备接收运动指令。 @@ -886,7 +886,7 @@ motor.iCL42: mode: null position: null velocity: null - handles: [] + handles: {} result: {} schema: description: 步进电机运动控制函数。根据指定的运动模式、目标位置和速度参数控制电机运动。支持多种运动模式和精确的位置控制,自动处理运动轨迹规划和执行。该函数提供异步执行和状态反馈,确保运动的准确性和可靠性。 @@ -917,7 +917,7 @@ motor.iCL42: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: diff --git a/unilabos/registry/devices/solid_dispenser.yaml b/unilabos/registry/devices/solid_dispenser.yaml index 31af41d..42cabec 100644 --- a/unilabos/registry/devices/solid_dispenser.yaml +++ b/unilabos/registry/devices/solid_dispenser.yaml @@ -13,7 +13,7 @@ solid_dispenser.laiyu: compound_mass: 0.0 powder_tube_number: 0 target_tube_position: '' - handles: [] + handles: {} result: actual_mass_mg: actual_mass_mg schema: @@ -64,7 +64,7 @@ solid_dispenser.laiyu: goal: {} goal_default: data: null - handles: [] + handles: {} result: {} schema: description: Modbus CRC-16校验码计算函数。计算Modbus RTU通信协议所需的CRC-16校验码,确保数据传输的完整性和可靠性。该函数实现标准的CRC-16算法,用于构造完整的Modbus指令帧。 @@ -88,7 +88,7 @@ solid_dispenser.laiyu: goal: {} goal_default: command: null - handles: [] + handles: {} result: {} schema: description: Modbus指令发送函数。构造完整的Modbus RTU指令帧(包含CRC校验),发送给分装设备并等待响应。该函数处理底层通信协议,确保指令的正确传输和响应接收,支持最长3分钟的响应等待时间。 @@ -113,7 +113,7 @@ solid_dispenser.laiyu: float_input: float_input goal_default: float_in: 0.0 - handles: [] + handles: {} result: {} schema: description: '' @@ -153,7 +153,7 @@ solid_dispenser.laiyu: string: string goal_default: string: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -197,7 +197,7 @@ solid_dispenser.laiyu: x: 0.0 y: 0.0 z: 0.0 - handles: [] + handles: {} result: {} schema: description: '' @@ -243,7 +243,7 @@ solid_dispenser.laiyu: int_input: int_input goal_default: int_input: 0 - handles: [] + handles: {} result: {} schema: description: '' @@ -285,7 +285,7 @@ solid_dispenser.laiyu: int_input: int_input goal_default: int_input: 0 - handles: [] + handles: {} result: {} schema: description: '' @@ -325,7 +325,7 @@ solid_dispenser.laiyu: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' diff --git a/unilabos/registry/devices/temperature.yaml b/unilabos/registry/devices/temperature.yaml index 95bdeed..f83d921 100644 --- a/unilabos/registry/devices/temperature.yaml +++ b/unilabos/registry/devices/temperature.yaml @@ -11,7 +11,7 @@ chiller: function_code: null register_address: null value: null - handles: [] + handles: {} result: {} schema: description: build_modbus_frame的参数schema @@ -45,7 +45,7 @@ chiller: goal_default: decimal_points: 1 temperature: null - handles: [] + handles: {} result: {} schema: description: convert_temperature_to_modbus_value的参数schema @@ -72,7 +72,7 @@ chiller: goal: {} goal_default: data: null - handles: [] + handles: {} result: {} schema: description: modbus_crc的参数schema @@ -95,7 +95,7 @@ chiller: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: stop的参数schema @@ -117,7 +117,7 @@ chiller: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -187,7 +187,7 @@ heaterstirrer.dalong: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: close的参数schema @@ -208,7 +208,7 @@ heaterstirrer.dalong: goal: {} goal_default: speed: null - handles: [] + handles: {} result: {} schema: description: set_stir_speed的参数schema @@ -233,7 +233,7 @@ heaterstirrer.dalong: goal_default: temp: null type: warning - handles: [] + handles: {} result: {} schema: description: set_temp_inner的参数schema @@ -293,7 +293,7 @@ heaterstirrer.dalong: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: success: success schema: @@ -438,7 +438,7 @@ heaterstirrer.dalong: command: temp goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -482,7 +482,7 @@ heaterstirrer.dalong: command: temp goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -579,7 +579,7 @@ tempsensor: function_code: null register_address: null register_count: null - handles: [] + handles: {} result: {} schema: description: build_modbus_request的参数schema @@ -612,7 +612,7 @@ tempsensor: goal: {} goal_default: data: null - handles: [] + handles: {} result: {} schema: description: calculate_crc的参数schema @@ -636,7 +636,7 @@ tempsensor: goal: {} goal_default: response: null - handles: [] + handles: {} result: {} schema: description: read_modbus_response的参数schema @@ -660,7 +660,7 @@ tempsensor: goal: {} goal_default: command: null - handles: [] + handles: {} result: {} schema: description: send_prototype_command的参数schema @@ -685,7 +685,7 @@ tempsensor: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: diff --git a/unilabos/registry/devices/virtual_device.yaml b/unilabos/registry/devices/virtual_device.yaml index 9531158..d97d625 100644 --- a/unilabos/registry/devices/virtual_device.yaml +++ b/unilabos/registry/devices/virtual_device.yaml @@ -7,7 +7,7 @@ virtual_centrifuge: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -27,7 +27,7 @@ virtual_centrifuge: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -78,7 +78,7 @@ virtual_centrifuge: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: message: message success: success @@ -295,7 +295,7 @@ virtual_column: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -315,7 +315,7 @@ virtual_column: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -388,7 +388,7 @@ virtual_column: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: message: current_status return_info: current_status @@ -690,7 +690,7 @@ virtual_filter: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -710,7 +710,7 @@ virtual_filter: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -786,7 +786,7 @@ virtual_filter: sample_id: '' type: '' volume: 0.0 - handles: [] + handles: {} result: message: message return_info: message @@ -1088,7 +1088,7 @@ virtual_gas_source: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -1108,7 +1108,7 @@ virtual_gas_source: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -1128,7 +1128,7 @@ virtual_gas_source: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_closed的参数schema @@ -1148,7 +1148,7 @@ virtual_gas_source: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_open的参数schema @@ -1168,7 +1168,7 @@ virtual_gas_source: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -1200,7 +1200,7 @@ virtual_gas_source: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -1234,7 +1234,7 @@ virtual_gas_source: string: string goal_default: string: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -1310,7 +1310,7 @@ virtual_heatchill: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -1330,7 +1330,7 @@ virtual_heatchill: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -1386,7 +1386,7 @@ virtual_heatchill: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: success: success schema: @@ -1555,7 +1555,7 @@ virtual_heatchill: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: success: success schema: @@ -1696,7 +1696,7 @@ virtual_heatchill: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: success: success schema: @@ -1879,7 +1879,7 @@ virtual_multiway_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: close的参数schema @@ -1900,7 +1900,7 @@ virtual_multiway_valve: goal: {} goal_default: port_number: null - handles: [] + handles: {} result: {} schema: description: is_at_port的参数schema @@ -1924,7 +1924,7 @@ virtual_multiway_valve: goal: {} goal_default: position: null - handles: [] + handles: {} result: {} schema: description: is_at_position的参数schema @@ -1947,7 +1947,7 @@ virtual_multiway_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_at_pump_position的参数schema @@ -1967,7 +1967,7 @@ virtual_multiway_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -1987,7 +1987,7 @@ virtual_multiway_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: reset的参数schema @@ -2008,7 +2008,7 @@ virtual_multiway_valve: goal: {} goal_default: port_number: null - handles: [] + handles: {} result: {} schema: description: set_to_port的参数schema @@ -2031,7 +2031,7 @@ virtual_multiway_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: set_to_pump_position的参数schema @@ -2052,7 +2052,7 @@ virtual_multiway_valve: goal: {} goal_default: port_number: null - handles: [] + handles: {} result: {} schema: description: switch_between_pump_and_port的参数schema @@ -2077,7 +2077,7 @@ virtual_multiway_valve: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -2121,7 +2121,7 @@ virtual_multiway_valve: command: command goal_default: command: '' - handles: [] + handles: {} result: success: success schema: @@ -2299,7 +2299,7 @@ virtual_rotavap: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -2319,7 +2319,7 @@ virtual_rotavap: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -2371,7 +2371,7 @@ virtual_rotavap: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: message: message success: success @@ -2629,7 +2629,7 @@ virtual_separator: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -2649,7 +2649,7 @@ virtual_separator: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -2834,7 +2834,7 @@ virtual_separator: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: message: message success: success @@ -3516,7 +3516,7 @@ virtual_solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -3536,7 +3536,7 @@ virtual_solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -3556,7 +3556,7 @@ virtual_solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_closed的参数schema @@ -3576,7 +3576,7 @@ virtual_solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: reset的参数schema @@ -3596,7 +3596,7 @@ virtual_solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: toggle的参数schema @@ -3617,7 +3617,7 @@ virtual_solenoid_valve: goal: command: CLOSED goal_default: {} - handles: [] + handles: {} result: success: success schema: @@ -3650,7 +3650,7 @@ virtual_solenoid_valve: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -3684,7 +3684,7 @@ virtual_solenoid_valve: string: string goal_default: string: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -3724,7 +3724,7 @@ virtual_solenoid_valve: command: command goal_default: command: '' - handles: [] + handles: {} result: message: message success: success @@ -3871,7 +3871,7 @@ virtual_solid_dispenser: type: '' viscous: false volume: '' - handles: [] + handles: {} result: message: message return_info: return_info @@ -4034,7 +4034,7 @@ virtual_solid_dispenser: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -4055,7 +4055,7 @@ virtual_solid_dispenser: goal: {} goal_default: reagent_name: null - handles: [] + handles: {} result: {} schema: description: '' @@ -4078,7 +4078,7 @@ virtual_solid_dispenser: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -4099,7 +4099,7 @@ virtual_solid_dispenser: goal: {} goal_default: mass_str: null - handles: [] + handles: {} result: {} schema: description: '' @@ -4123,7 +4123,7 @@ virtual_solid_dispenser: goal: {} goal_default: mol_str: null - handles: [] + handles: {} result: {} schema: description: '' @@ -4205,7 +4205,7 @@ virtual_stirrer: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -4225,7 +4225,7 @@ virtual_stirrer: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -4271,7 +4271,7 @@ virtual_stirrer: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: success: success schema: @@ -4429,7 +4429,7 @@ virtual_stirrer: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: success: success schema: @@ -4585,7 +4585,7 @@ virtual_stirrer: z: 0.0 sample_id: '' type: '' - handles: [] + handles: {} result: success: success schema: @@ -4772,7 +4772,7 @@ virtual_transfer_pump: goal_default: velocity: null volume: null - handles: [] + handles: {} result: {} schema: description: aspirate的参数schema @@ -4797,7 +4797,7 @@ virtual_transfer_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -4819,7 +4819,7 @@ virtual_transfer_pump: goal_default: velocity: null volume: null - handles: [] + handles: {} result: {} schema: description: dispense的参数schema @@ -4845,7 +4845,7 @@ virtual_transfer_pump: goal: {} goal_default: velocity: null - handles: [] + handles: {} result: {} schema: description: empty_syringe的参数schema @@ -4868,7 +4868,7 @@ virtual_transfer_pump: goal: {} goal_default: velocity: null - handles: [] + handles: {} result: {} schema: description: fill_syringe的参数schema @@ -4890,7 +4890,7 @@ virtual_transfer_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -4910,7 +4910,7 @@ virtual_transfer_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_empty的参数schema @@ -4930,7 +4930,7 @@ virtual_transfer_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_full的参数schema @@ -4952,7 +4952,7 @@ virtual_transfer_pump: goal_default: velocity: null volume: null - handles: [] + handles: {} result: {} schema: description: pull_plunger的参数schema @@ -4979,7 +4979,7 @@ virtual_transfer_pump: goal_default: velocity: null volume: null - handles: [] + handles: {} result: {} schema: description: push_plunger的参数schema @@ -5005,7 +5005,7 @@ virtual_transfer_pump: goal: {} goal_default: velocity: null - handles: [] + handles: {} result: {} schema: description: set_max_velocity的参数schema @@ -5028,7 +5028,7 @@ virtual_transfer_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: stop_operation的参数schema @@ -5055,7 +5055,7 @@ virtual_transfer_pump: goal_default: max_velocity: 0.0 position: 0.0 - handles: [] + handles: {} result: message: message success: success @@ -5133,7 +5133,7 @@ virtual_transfer_pump: to_vessel: '' viscous: false volume: 0.0 - handles: [] + handles: {} result: message: message success: success @@ -5272,7 +5272,7 @@ virtual_vacuum_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: cleanup的参数schema @@ -5292,7 +5292,7 @@ virtual_vacuum_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: initialize的参数schema @@ -5312,7 +5312,7 @@ virtual_vacuum_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_closed的参数schema @@ -5332,7 +5332,7 @@ virtual_vacuum_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: is_open的参数schema @@ -5352,7 +5352,7 @@ virtual_vacuum_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -5384,7 +5384,7 @@ virtual_vacuum_pump: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -5418,7 +5418,7 @@ virtual_vacuum_pump: string: string goal_default: string: '' - handles: [] + handles: {} result: {} schema: description: '' diff --git a/unilabos/registry/devices/work_station.yaml b/unilabos/registry/devices/work_station.yaml index 5de4d42..b501a7d 100644 --- a/unilabos/registry/devices/work_station.yaml +++ b/unilabos/registry/devices/work_station.yaml @@ -53,7 +53,7 @@ workstation: sample_id: '' type: '' to_repo_position: '' - handles: [] + handles: {} result: {} schema: description: '' @@ -6030,7 +6030,7 @@ workstation: goal_default: action_name: null action_value_mapping: null - handles: [] + handles: {} result: {} schema: description: create_ros_action_server的参数schema @@ -6059,7 +6059,7 @@ workstation: action_kwargs: null action_name: null device_id: null - handles: [] + handles: {} result: {} schema: description: execute_single_action的参数schema @@ -6090,7 +6090,7 @@ workstation: goal_default: device_config: null device_id: null - handles: [] + handles: {} result: {} schema: description: initialize_device的参数schema @@ -6150,7 +6150,7 @@ workstation.example: feedback: {} goal: {} goal_default: {} - handles: [] + handles: {} result: {} schema: description: '' @@ -6178,7 +6178,7 @@ workstation.example: resource_tracker: null resources: null slot_on_deck: null - handles: [] + handles: {} result: {} schema: description: '' diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 6f3f081..3cb2937 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -523,6 +523,12 @@ class Registry: for action_name, action_config in device_config["class"]["action_value_mappings"].items(): if "handles" not in action_config: action_config["handles"] = {} + elif isinstance(action_config["handles"], list): + if len(action_config["handles"]): + logger.error(f"设备{device_id} {action_name} 的handles配置错误,应该是字典类型") + continue + else: + action_config["handles"] = {} if "type" in action_config: action_type_str: str = action_config["type"] # 通过Json发放指令,而不是通过特殊的ros action进行处理 diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 0f9f951..fa1d059 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -1,11 +1,12 @@ import collections import copy +from dataclasses import dataclass, field import json import threading import time import traceback import uuid -from typing import Optional, Dict, Any, List, ClassVar, Set, Union +from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union from action_msgs.msg import GoalStatus from geometry_msgs.msg import Point @@ -41,6 +42,14 @@ from unilabos.ros.nodes.presets.controller_node import ControllerNode from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.type_check import serialize_result_info +if TYPE_CHECKING: + from unilabos.app.ws_client import QueueItem + + +@dataclass +class DeviceActionStatus: + job_ids: Dict[str, float] = field(default_factory=dict) + class HostNode(BaseROS2DeviceNode): """ @@ -51,6 +60,9 @@ class HostNode(BaseROS2DeviceNode): _instance: ClassVar[Optional["HostNode"]] = None _ready_event: ClassVar[threading.Event] = threading.Event() + _device_action_status: ClassVar[collections.defaultdict[str, DeviceActionStatus]] = collections.defaultdict( + DeviceActionStatus + ) @classmethod def get_instance(cls, timeout=None) -> Optional["HostNode"]: @@ -152,11 +164,15 @@ class HostNode(BaseROS2DeviceNode): self.device_status = {} # 用来存储设备状态 self.device_status_timestamps = {} # 用来存储设备状态最后更新时间 if BasicConfig.upload_registry: - from unilabos.app.mq import mqtt_client - register_devices_and_resources(mqtt_client, lab_registry) + from unilabos.app.communication import get_communication_client + + comm_client = get_communication_client() + register_devices_and_resources(comm_client, lab_registry) else: - self.lab_logger().warning("本次启动注册表不报送云端,如果您需要联网调试,请使用unilab-register命令进行单独报送,或者在启动命令增加--upload_registry") - time.sleep(1) # 等待MQTT连接稳定 + self.lab_logger().warning( + "本次启动注册表不报送云端,如果您需要联网调试,请使用unilab-register命令进行单独报送,或者在启动命令增加--upload_registry" + ) + time.sleep(1) # 等待通信连接稳定 # 首次发现网络中的设备 self._discover_devices() @@ -214,6 +230,7 @@ class HostNode(BaseROS2DeviceNode): for bridge in self.bridges: if hasattr(bridge, "resource_add"): from unilabos.app.web.client import HTTPClient + client: HTTPClient = bridge resource_start_time = time.time() resource_add_res = client.resource_add(add_schema(resource_with_parent_name), False) @@ -340,9 +357,10 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().trace(f"[Host Node] Created ActionClient (Discovery): {action_id}") action_name = action_id[len(namespace) + 1 :] edge_device_id = namespace[9:] - # from unilabos.app.mq import mqtt_client + # from unilabos.app.comm_factory import get_communication_client + # comm_client = get_communication_client() # info_with_schema = ros_action_to_json_schema(action_type) - # mqtt_client.publish_actions(action_name, { + # comm_client.publish_actions(action_name, { # "device_id": edge_device_id, # "device_type": "", # "action_name": action_name, @@ -365,7 +383,9 @@ class HostNode(BaseROS2DeviceNode): ): # 这里要求device_id传入必须是edge_device_id if device_id not in self.devices_names: - self.lab_logger().error(f"[Host Node] Device {device_id} not found in devices_names. Create resource failed.") + self.lab_logger().error( + f"[Host Node] Device {device_id} not found in devices_names. Create resource failed." + ) raise ValueError(f"[Host Node] Device {device_id} not found in devices_names. Create resource failed.") device_key = f"{self.devices_names[device_id]}/{device_id}" @@ -425,10 +445,12 @@ class HostNode(BaseROS2DeviceNode): res_creation_input.update( { "data": { - "liquids": [{ - "liquid_type": liquid_type[0] if liquid_type else None, - "liquid_volume": liquid_volume[0] if liquid_volume else None, - }] + "liquids": [ + { + "liquid_type": liquid_type[0] if liquid_type else None, + "liquid_volume": liquid_volume[0] if liquid_volume else None, + } + ] } } ) @@ -451,7 +473,9 @@ class HostNode(BaseROS2DeviceNode): ) ] - response = await self.create_resource_detailed(resources, device_ids, bind_parent_id, bind_location, other_calling_param) + response = await self.create_resource_detailed( + resources, device_ids, bind_parent_id, bind_location, other_calling_param + ) return response @@ -482,7 +506,9 @@ class HostNode(BaseROS2DeviceNode): self.devices_instances[device_id] = d # noinspection PyProtectedMember for action_name, action_value_mapping in d._ros_node._action_value_mappings.items(): - if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith("UniLabJsonCommand"): + if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith( + "UniLabJsonCommand" + ): continue action_id = f"/devices/{device_id}/{action_name}" if action_id not in self._action_clients: @@ -491,9 +517,10 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().trace( f"[Host Node] Created ActionClient (Local): {action_id}" ) # 子设备再创建用的是Discover发现的 - # from unilabos.app.mq import mqtt_client + # from unilabos.app.comm_factory import get_communication_client + # comm_client = get_communication_client() # info_with_schema = ros_action_to_json_schema(action_type) - # mqtt_client.publish_actions(action_name, { + # comm_client.publish_actions(action_name, { # "device_id": device_id, # "device_type": device_config["class"], # "action_name": action_name, @@ -591,21 +618,15 @@ class HostNode(BaseROS2DeviceNode): if hasattr(bridge, "publish_device_status"): bridge.publish_device_status(self.device_status, device_id, property_name) if bCreate: - self.lab_logger().trace( - f"Status created: {device_id}.{property_name} = {msg.data}" - ) + self.lab_logger().trace(f"Status created: {device_id}.{property_name} = {msg.data}") else: - self.lab_logger().debug( - f"Status updated: {device_id}.{property_name} = {msg.data}" - ) + self.lab_logger().debug(f"Status updated: {device_id}.{property_name} = {msg.data}") def send_goal( self, - device_id: str, + item: "QueueItem", action_type: str, - action_name: str, action_kwargs: Dict[str, Any], - goal_uuid: Optional[str] = None, server_info: Optional[Dict[str, Any]] = None, ) -> None: """ @@ -619,15 +640,20 @@ class HostNode(BaseROS2DeviceNode): goal_uuid: 目标UUID,如果为None则自动生成 server_info: 服务器发送信息,包含发送时间戳等 """ + u = uuid.UUID(item.job_id) + device_id = item.device_id + action_name = item.action_name if action_type.startswith("UniLabJsonCommand"): if action_name.startswith("auto-"): action_name = action_name[5:] action_id = f"/devices/{device_id}/_execute_driver_command" action_kwargs = { - "string": json.dumps({ - "function_name": action_name, - "function_args": action_kwargs, - }) + "string": json.dumps( + { + "function_name": action_name, + "function_args": action_kwargs, + } + ) } if action_type.startswith("UniLabJsonCommandAsync"): action_id = f"/devices/{device_id}/_execute_driver_command_async" @@ -644,53 +670,47 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info(f"[Host Node] Sending goal for {action_id}: {goal_msg}") action_client.wait_for_server() - - uuid_str = goal_uuid - if goal_uuid is not None: - u = uuid.UUID(goal_uuid) - goal_uuid_obj = UUID(uuid=list(u.bytes)) - else: - goal_uuid_obj = None + goal_uuid_obj = UUID(uuid=list(u.bytes)) future = action_client.send_goal_async( goal_msg, - feedback_callback=lambda feedback_msg: self.feedback_callback(action_id, uuid_str, feedback_msg), + feedback_callback=lambda feedback_msg: self.feedback_callback(item, action_id, feedback_msg), goal_uuid=goal_uuid_obj, ) - future.add_done_callback(lambda future: self.goal_response_callback(action_id, uuid_str, future)) + future.add_done_callback( + lambda future: self.goal_response_callback(item, action_id, future) + ) - def goal_response_callback(self, action_id: str, uuid_str: Optional[str], future) -> None: + def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None: """目标响应回调""" goal_handle = future.result() if not goal_handle.accepted: - self.lab_logger().warning(f"[Host Node] Goal {action_id} ({uuid_str}) rejected") + self.lab_logger().warning(f"[Host Node] Goal {item.action_name} ({item.job_id}) rejected") return - self.lab_logger().info(f"[Host Node] Goal {action_id} ({uuid_str}) accepted") - if uuid_str: - self._goals[uuid_str] = goal_handle - goal_handle.get_result_async().add_done_callback( - lambda future: self.get_result_callback(action_id, uuid_str, future) - ) + self.lab_logger().info(f"[Host Node] Goal {action_id} ({item.job_id}) accepted") + self._goals[item.job_id] = goal_handle + goal_handle.get_result_async().add_done_callback( + lambda future: self.get_result_callback(item, action_id, future) + ) - def feedback_callback(self, action_id: str, uuid_str: Optional[str], feedback_msg) -> None: + def feedback_callback(self, item: "QueueItem", action_id: str, feedback_msg) -> None: """反馈回调""" feedback_data = convert_from_ros_msg(feedback_msg) feedback_data.pop("goal_id") - self.lab_logger().debug(f"[Host Node] Feedback for {action_id} ({uuid_str}): {feedback_data}") + self.lab_logger().trace(f"[Host Node] Feedback for {action_id} ({item.job_id}): {feedback_data}") - if uuid_str: - for bridge in self.bridges: - if hasattr(bridge, "publish_job_status"): - bridge.publish_job_status(feedback_data, uuid_str, "running") + for bridge in self.bridges: + if hasattr(bridge, "publish_job_status"): + bridge.publish_job_status(feedback_data, item, "running") - def get_result_callback(self, action_id: str, uuid_str: Optional[str], future) -> None: + def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None: """获取结果回调""" + job_id = item.job_id result_msg = future.result().result result_data = convert_from_ros_msg(result_msg) status = "success" return_info_str = result_data.get("return_info") - if return_info_str is not None: try: ret = json.loads(return_info_str) @@ -710,13 +730,13 @@ class HostNode(BaseROS2DeviceNode): status = "failed" return_info_str = serialize_result_info("缺少return_info", False, result_data) - self.lab_logger().info(f"[Host Node] Result for {action_id} ({uuid_str}): {status}") + self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id}): {status}") self.lab_logger().debug(f"[Host Node] Result data: {result_data}") - if uuid_str: + if job_id: for bridge in self.bridges: if hasattr(bridge, "publish_job_status"): - bridge.publish_job_status(result_data, uuid_str, status, return_info_str) + bridge.publish_job_status(result_data, item, status, return_info_str) def cancel_goal(self, goal_uuid: str) -> None: """取消目标""" @@ -726,14 +746,14 @@ class HostNode(BaseROS2DeviceNode): else: self.lab_logger().warning(f"[Host Node] Goal {goal_uuid} not found, cannot cancel") - def get_goal_status(self, uuid_str: str) -> int: + def get_goal_status(self, job_id: str) -> int: """获取目标状态""" - if uuid_str in self._goals: - g = self._goals[uuid_str] + if job_id in self._goals: + g = self._goals[job_id] status = g.status - self.lab_logger().debug(f"[Host Node] Goal status for {uuid_str}: {status}") + self.lab_logger().debug(f"[Host Node] Goal status for {job_id}: {status}") return status - self.lab_logger().warning(f"[Host Node] Goal {uuid_str} not found, status unknown") + self.lab_logger().warning(f"[Host Node] Goal {job_id} not found, status unknown") return GoalStatus.STATUS_UNKNOWN """Controller Node""" @@ -802,7 +822,7 @@ class HostNode(BaseROS2DeviceNode): """ self.lab_logger().info(f"[Host Node] Node info update request received: {request}") try: - from unilabos.app.mq import mqtt_client + from unilabos.app.communication import get_communication_client info = json.loads(request.command) if "SYNC_SLAVE_NODE_INFO" in info: @@ -811,9 +831,10 @@ class HostNode(BaseROS2DeviceNode): edge_device_id = info["edge_device_id"] self.device_machine_names[edge_device_id] = machine_name else: + comm_client = get_communication_client() registry_config = info["registry_config"] for device_config in registry_config: - mqtt_client.publish_registry(device_config["id"], device_config) + comm_client.publish_registry(device_config["id"], device_config) self.lab_logger().debug(f"[Host Node] Node info update: {info}") response.response = "OK" except Exception as e: @@ -840,6 +861,7 @@ class HostNode(BaseROS2DeviceNode): success = False if len(self.bridges) > 0: # 边的提交待定 from unilabos.app.web.client import HTTPClient + client: HTTPClient = self.bridges[-1] r = client.resource_add(add_schema(resources), False) success = bool(r) @@ -848,6 +870,7 @@ class HostNode(BaseROS2DeviceNode): if success: from unilabos.resources.graphio import physical_setup_graph + for resource in resources: if resource.get("id") not in physical_setup_graph.nodes: physical_setup_graph.add_node(resource["id"], **resource) @@ -988,9 +1011,10 @@ class HostNode(BaseROS2DeviceNode): send_timestamp = time.time() # 发送ping - from unilabos.app.mq import mqtt_client + from unilabos.app.communication import get_communication_client - mqtt_client.send_ping(ping_id, send_timestamp) + comm_client = get_communication_client() + comm_client.send_ping(ping_id, send_timestamp) # 等待pong响应 timeout = 10.0 diff --git a/unilabos/utils/environment_check.py b/unilabos/utils/environment_check.py index 0c6ae4d..cd50b82 100644 --- a/unilabos/utils/environment_check.py +++ b/unilabos/utils/environment_check.py @@ -18,6 +18,7 @@ class EnvironmentChecker: self.required_packages = { # 包导入名 : pip安装名 # "pymodbus.framer.FramerType": "pymodbus==3.9.2", + "websockets": "websockets", "paho.mqtt": "paho-mqtt", "opentrons_shared_data": "opentrons_shared_data", }