diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 14c9592..c652757 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -171,6 +171,12 @@ def parse_args(): action="store_true", help="Disable sending update feedback to server", ) + parser.add_argument( + "--test_mode", + action="store_true", + default=False, + help="Test mode: all actions simulate execution and return mock results without running real hardware", + ) # workflow upload subcommand workflow_parser = subparsers.add_parser( "workflow_upload", @@ -204,6 +210,12 @@ def parse_args(): default=False, help="Whether to publish the workflow (default: False)", ) + workflow_parser.add_argument( + "--description", + type=str, + default="", + help="Workflow description, used when publishing the workflow", + ) return parser @@ -231,52 +243,60 @@ def main(): # 加载配置文件,优先加载config,然后从env读取 config_path = args_dict.get("config") - if check_mode: - args_dict["working_dir"] = os.path.abspath(os.getcwd()) - # 当 skip_env_check 时,默认使用当前目录作为 working_dir - if skip_env_check and not args_dict.get("working_dir") and not config_path: + # === 解析 working_dir === + # 规则1: working_dir 传入 → 检测 unilabos_data 子目录,已是则不修改 + # 规则2: 仅 config_path 传入 → 用其父目录作为 working_dir + # 规则4: 两者都传入 → 各用各的,但 working_dir 仍做 unilabos_data 子目录检测 + raw_working_dir = args_dict.get("working_dir") + if raw_working_dir: + working_dir = os.path.abspath(raw_working_dir) + elif config_path and os.path.exists(config_path): + working_dir = os.path.dirname(os.path.abspath(config_path)) + else: working_dir = os.path.abspath(os.getcwd()) - print_status(f"跳过环境检查模式:使用当前目录作为工作目录 {working_dir}", "info") - # 检查当前目录是否有 local_config.py - local_config_in_cwd = os.path.join(working_dir, "local_config.py") - if os.path.exists(local_config_in_cwd): - config_path = local_config_in_cwd + + # unilabos_data 子目录自动检测 + if os.path.basename(working_dir) != "unilabos_data": + unilabos_data_sub = os.path.join(working_dir, "unilabos_data") + if os.path.isdir(unilabos_data_sub): + working_dir = unilabos_data_sub + elif not raw_working_dir and not (config_path and os.path.exists(config_path)): + # 未显式指定路径,默认使用 cwd/unilabos_data + working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data")) + + # === 解析 config_path === + if config_path and not os.path.exists(config_path): + # config_path 传入但不存在,尝试在 working_dir 中查找 + candidate = os.path.join(working_dir, "local_config.py") + if os.path.exists(candidate): + config_path = candidate + print_status(f"在工作目录中发现配置文件: {config_path}", "info") + else: + print_status( + f"配置文件 {config_path} 不存在,工作目录 {working_dir} 中也未找到 local_config.py," + f"请通过 --config 传入 local_config.py 文件路径", + "error", + ) + os._exit(1) + elif not config_path: + # 规则3: 未传入 config_path,尝试 working_dir/local_config.py + candidate = os.path.join(working_dir, "local_config.py") + if os.path.exists(candidate): + config_path = candidate print_status(f"发现本地配置文件: {config_path}", "info") else: print_status(f"未指定config路径,可通过 --config 传入 local_config.py 文件路径", "info") - elif os.getcwd().endswith("unilabos_data"): - working_dir = os.path.abspath(os.getcwd()) - else: - working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data")) - - if args_dict.get("working_dir"): - working_dir = args_dict.get("working_dir", "") - if config_path and not os.path.exists(config_path): - config_path = os.path.join(working_dir, "local_config.py") - if not os.path.exists(config_path): - print_status( - f"当前工作目录 {working_dir} 未找到local_config.py,请通过 --config 传入 local_config.py 文件路径", - "error", + print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info") + if check_mode or input() != "n": + os.makedirs(working_dir, exist_ok=True) + config_path = os.path.join(working_dir, "local_config.py") + shutil.copy( + os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), + config_path, ) + print_status(f"已创建 local_config.py 路径: {config_path}", "info") + else: os._exit(1) - elif config_path and os.path.exists(config_path): - working_dir = os.path.dirname(config_path) - elif os.path.exists(working_dir) and os.path.exists(os.path.join(working_dir, "local_config.py")): - config_path = os.path.join(working_dir, "local_config.py") - elif not skip_env_check and not config_path and ( - not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py")) - ): - print_status(f"未指定config路径,可通过 --config 传入 local_config.py 文件路径", "info") - print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info") - if input() != "n": - os.makedirs(working_dir, exist_ok=True) - config_path = os.path.join(working_dir, "local_config.py") - shutil.copy( - os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), config_path - ) - print_status(f"已创建 local_config.py 路径: {config_path}", "info") - else: - os._exit(1) # 加载配置文件 (check_mode 跳过) print_status(f"当前工作目录为 {working_dir}", "info") @@ -334,6 +354,9 @@ def main(): BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) BasicConfig.upload_registry = args_dict.get("upload_registry", False) BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False) + BasicConfig.test_mode = args_dict.get("test_mode", False) + if BasicConfig.test_mode: + print_status("启用测试模式:所有动作将模拟执行,不调用真实硬件", "warning") BasicConfig.communication_protocol = "websocket" machine_name = os.popen("hostname").read().strip() machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) diff --git a/unilabos/app/register.py b/unilabos/app/register.py index 633df98..5918b43 100644 --- a/unilabos/app/register.py +++ b/unilabos/app/register.py @@ -38,9 +38,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[ response = http_client.resource_registry({"resources": list(devices_to_register.values())}) cost_time = time.time() - start_time if response.status_code in [200, 201]: - logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}ms") + logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}s") else: - logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}ms") + logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}s") except Exception as e: logger.error(f"[UniLab Register] 设备注册异常: {e}") @@ -51,9 +51,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[ response = http_client.resource_registry({"resources": list(resources_to_register.values())}) cost_time = time.time() - start_time if response.status_code in [200, 201]: - logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}ms") + logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}s") else: - logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}ms") + logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}s") except Exception as e: logger.error(f"[UniLab Register] 资源注册异常: {e}") diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 0ecf460..b43b0f4 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -343,9 +343,10 @@ class HTTPClient: edges: List[Dict[str, Any]], tags: Optional[List[str]] = None, published: bool = False, + description: str = "", ) -> Dict[str, Any]: """ - 导入工作流到服务器 + 导入工作流到服务器,如果 published 为 True,则额外发起发布请求 Args: name: 工作流名称(顶层) @@ -355,6 +356,7 @@ class HTTPClient: edges: 工作流边列表 tags: 工作流标签列表,默认为空列表 published: 是否发布工作流,默认为False + description: 工作流描述,发布时使用 Returns: Dict: API响应数据,包含 code 和 data (uuid, name) @@ -367,7 +369,6 @@ class HTTPClient: "nodes": nodes, "edges": edges, "tags": tags if tags is not None else [], - "published": published, }, } # 保存请求到文件 @@ -388,11 +389,51 @@ class HTTPClient: res = response.json() if "code" in res and res["code"] != 0: logger.error(f"导入工作流失败: {response.text}") + return res + # 导入成功后,如果需要发布则额外发起发布请求 + if published: + imported_uuid = res.get("data", {}).get("uuid", workflow_uuid) + publish_res = self.workflow_publish(imported_uuid, description) + res["publish_result"] = publish_res return res else: logger.error(f"导入工作流失败: {response.status_code}, {response.text}") return {"code": response.status_code, "message": response.text} + def workflow_publish(self, workflow_uuid: str, description: str = "") -> Dict[str, Any]: + """ + 发布工作流 + + Args: + workflow_uuid: 工作流UUID + description: 工作流描述 + + Returns: + Dict: API响应数据 + """ + payload = { + "uuid": workflow_uuid, + "description": description, + "published": True, + } + logger.info(f"正在发布工作流: {workflow_uuid}") + response = requests.patch( + f"{self.remote_addr}/lab/workflow/owner", + json=payload, + headers={"Authorization": f"Lab {self.auth}"}, + timeout=60, + ) + if response.status_code == 200: + res = response.json() + if "code" in res and res["code"] != 0: + logger.error(f"发布工作流失败: {response.text}") + else: + logger.info(f"工作流发布成功: {workflow_uuid}") + return res + else: + logger.error(f"发布工作流失败: {response.status_code}, {response.text}") + return {"code": response.status_code, "message": response.text} + # 创建默认客户端实例 http_client = HTTPClient() diff --git a/unilabos/config/config.py b/unilabos/config/config.py index 4093092..4b7d91a 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -22,6 +22,8 @@ class BasicConfig: startup_json_path = None # 填写绝对路径 disable_browser = False # 禁止浏览器自动打开 port = 8002 # 本地HTTP服务 + check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性 + test_mode = False # 测试模式,所有动作不实际执行,返回模拟结果 # 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG" diff --git a/unilabos/registry/device_comms/communication_devices.yaml b/unilabos/registry/device_comms/communication_devices.yaml index ea3f1b6..782889d 100644 --- a/unilabos/registry/device_comms/communication_devices.yaml +++ b/unilabos/registry/device_comms/communication_devices.yaml @@ -96,10 +96,13 @@ serial: type: string port: type: string + registry_name: + type: string resource_tracker: type: object required: - device_id + - registry_name - port type: object data: diff --git a/unilabos/registry/devices/camera.yaml b/unilabos/registry/devices/camera.yaml index fe1aef2..c8b9d94 100644 --- a/unilabos/registry/devices/camera.yaml +++ b/unilabos/registry/devices/camera.yaml @@ -67,6 +67,9 @@ camera: period: default: 0.1 type: number + registry_name: + default: '' + type: string resource_tracker: type: object required: [] diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index df4758d..1af0a4e 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -5,6 +5,7 @@ import sys import inspect import importlib import threading +import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Any, Dict, List, Union, Tuple @@ -88,6 +89,14 @@ class Registry: ) test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。" + test_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("test_resource", {}) + test_resource_schema = self._generate_unilab_json_command_schema( + test_resource_method_info.get("args", []), + "auto-test_resource", + test_resource_method_info.get("return_annotation"), + ) + test_resource_schema["description"] = "用于测试物料、设备和样本。" + self.device_type_registry.update( { "host_node": { @@ -189,32 +198,7 @@ class Registry: "goal": {}, "feedback": {}, "result": {}, - "schema": { - "description": "", - "properties": { - "feedback": {}, - "goal": { - "properties": { - "resource": ros_message_to_json_schema(Resource, "resource"), - "resources": { - "items": { - "properties": ros_message_to_json_schema( - Resource, "resources" - ), - "type": "object", - }, - "type": "array", - }, - "device": {"type": "string"}, - "devices": {"items": {"type": "string"}, "type": "array"}, - }, - "type": "object", - }, - "result": {}, - }, - "title": "test_resource", - "type": "object", - }, + "schema": test_resource_schema, "placeholder_keys": { "device": "unilabos_devices", "devices": "unilabos_devices", @@ -944,6 +928,7 @@ class Registry: if is_valid: results.append((file, data, device_ids)) except Exception as e: + traceback.print_exc() logger.warning(f"[UniLab Registry] 处理设备文件异常: {file}, 错误: {e}") # 线程安全地更新注册表 diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index f05bf0c..63eda32 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -51,6 +51,7 @@ from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.log import warning from unilabos.utils.type_check import serialize_result_info +from unilabos.config.config import BasicConfig if TYPE_CHECKING: from unilabos.app.ws_client import QueueItem @@ -63,7 +64,7 @@ class DeviceActionStatus: class TestResourceReturn(TypedDict): resources: List[List[ResourceDict]] - devices: List[DeviceSlot] + devices: List[Dict[str, Any]] class TestLatencyReturn(TypedDict): @@ -776,6 +777,17 @@ class HostNode(BaseROS2DeviceNode): u = uuid.UUID(item.job_id) device_id = item.device_id action_name = item.action_name + + if BasicConfig.test_mode: + action_id = f"/devices/{device_id}/{action_name}" + self.lab_logger().info( + f"[TEST MODE] 模拟执行: {action_id} (job={item.job_id[:8]}), 参数: {str(action_kwargs)[:500]}" + ) + # 根据注册表 handles 构建模拟返回值 + mock_return = self._build_test_mode_return(device_id, action_name, action_kwargs) + self._handle_test_mode_result(item, action_id, mock_return) + return + if action_type.startswith("UniLabJsonCommand"): if action_name.startswith("auto-"): action_name = action_name[5:] @@ -813,6 +825,51 @@ class HostNode(BaseROS2DeviceNode): ) future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f)) + def _build_test_mode_return( + self, device_id: str, action_name: str, action_kwargs: Dict[str, Any] + ) -> Dict[str, Any]: + """ + 根据注册表 handles 的 output 定义构建测试模式的模拟返回值 + + 根据 data_key 中 @flatten 的层数决定嵌套数组层数,叶子值为空字典。 + 例如: "vessel" → {}, "plate.@flatten" → [{}], "a.@flatten.@flatten" → [[{}]] + """ + mock_return: Dict[str, Any] = {"test_mode": True, "action_name": action_name} + action_mappings = self._action_value_mappings.get(device_id, {}) + action_mapping = action_mappings.get(action_name, {}) + handles = action_mapping.get("handles", {}) + if isinstance(handles, dict): + for output_handle in handles.get("output", []): + data_key = output_handle.get("data_key", "") + handler_key = output_handle.get("handler_key", "") + # 根据 @flatten 层数构建嵌套数组,叶子为空字典 + flatten_count = data_key.count("@flatten") + value: Any = {} + for _ in range(flatten_count): + value = [value] + mock_return[handler_key] = value + return mock_return + + def _handle_test_mode_result( + self, item: "QueueItem", action_id: str, mock_return: Dict[str, Any] + ) -> None: + """ + 测试模式下直接构建结果并走正常的结果回调流程(跳过 ROS) + """ + job_id = item.job_id + status = "success" + return_info = serialize_result_info("", True, mock_return) + + self.lab_logger().info(f"[TEST MODE] Result for {action_id} ({job_id[:8]}): {status}") + + from unilabos.app.web.controller import store_job_result + store_job_result(job_id, status, return_info, mock_return) + + # 发布状态到桥接器 + for bridge in self.bridges: + if hasattr(bridge, "publish_job_status"): + bridge.publish_job_status(mock_return, item, status, return_info) + def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None: """目标响应回调""" goal_handle = future.result() diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index 381cc66..3a1fee2 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -362,14 +362,16 @@ def build_protocol_graph( protocol_steps: List[Dict[str, Any]], workstation_name: str, action_resource_mapping: Optional[Dict[str, str]] = None, + labware_defs: Optional[List[Dict[str, Any]]] = None, ) -> WorkflowGraph: """统一的协议图构建函数,根据设备类型自动选择构建逻辑 Args: - labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...} + labware_info: reagent 信息字典,格式为 {name: {slot, well}, ...},用于 set_liquid 和 well 查找 protocol_steps: 协议步骤列表 workstation_name: 工作站名称 action_resource_mapping: action 到 resource_name 的映射字典,可选 + labware_defs: labware 定义列表,格式为 [{"name": "...", "slot": "1", "type": "lab_xxx"}, ...] """ G = WorkflowGraph() resource_last_writer = {} # reagent_name -> "node_id:port" @@ -377,18 +379,7 @@ def build_protocol_graph( protocol_steps = refactor_data(protocol_steps, action_resource_mapping) - # ==================== 第一步:按 slot 去重创建 create_resource 节点 ==================== - # 收集所有唯一的 slot - slots_info = {} # slot -> {labware, res_id} - for labware_id, item in labware_info.items(): - slot = str(item.get("slot", "")) - if slot and slot not in slots_info: - res_id = f"plate_slot_{slot}" - slots_info[slot] = { - "labware": item.get("labware", ""), - "res_id": res_id, - } - + # ==================== 第一步:按 slot 创建 create_resource 节点 ==================== # 创建 Group 节点,包含所有 create_resource 节点 group_node_id = str(uuid.uuid4()) G.add_node( @@ -404,29 +395,35 @@ def build_protocol_graph( param=None, ) - # 为每个唯一的 slot 创建 create_resource 节点 + # 直接使用 JSON 中的 labware 定义,每个 slot 一条记录,type 即 class_name res_index = 0 - for slot, info in slots_info.items(): - node_id = str(uuid.uuid4()) - res_id = info["res_id"] + for lw in (labware_defs or []): + slot = str(lw.get("slot", "")) + if not slot or slot in slot_to_create_resource: + continue # 跳过空 slot 或已处理的 slot + + lw_name = lw.get("name", f"slot {slot}") + lw_type = lw.get("type", CREATE_RESOURCE_DEFAULTS["class_name"]) + res_id = f"plate_slot_{slot}" res_index += 1 + node_id = str(uuid.uuid4()) G.add_node( node_id, template_name="create_resource", resource_name="host_node", - name=f"Plate {res_index}", - description=f"Create plate on slot {slot}", + name=lw_name, + description=f"Create {lw_name}", lab_node_type="Labware", footer="create_resource-host_node", device_name=DEVICE_NAME_HOST, type=NODE_TYPE_DEFAULT, - parent_uuid=group_node_id, # 指向 Group 节点 - minimized=True, # 折叠显示 + parent_uuid=group_node_id, + minimized=True, param={ "res_id": res_id, "device_id": CREATE_RESOURCE_DEFAULTS["device_id"], - "class_name": CREATE_RESOURCE_DEFAULTS["class_name"], + "class_name": lw_type, "parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot), "bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0}, "slot_on_deck": slot, @@ -434,8 +431,6 @@ def build_protocol_graph( ) slot_to_create_resource[slot] = node_id - # create_resource 之间不需要 ready 连接 - # ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ==================== # 创建 Group 节点,包含所有 set_liquid_from_plate 节点 set_liquid_group_id = str(uuid.uuid4()) diff --git a/unilabos/workflow/convert_from_json.py b/unilabos/workflow/convert_from_json.py index ff749d7..acd0d71 100644 --- a/unilabos/workflow/convert_from_json.py +++ b/unilabos/workflow/convert_from_json.py @@ -1,16 +1,20 @@ """ JSON 工作流转换模块 -将 workflow/reagent 格式的 JSON 转换为统一工作流格式。 +将 workflow/reagent/labware 格式的 JSON 转换为统一工作流格式。 输入格式: { + "labware": [ + {"name": "...", "slot": "1", "type": "lab_xxx"}, + ... + ], "workflow": [ {"action": "...", "action_args": {...}}, ... ], "reagent": { - "reagent_name": {"slot": int, "well": [...], "labware": "..."}, + "reagent_name": {"slot": int, "well": [...]}, ... } } @@ -245,18 +249,18 @@ def convert_from_json( if "workflow" not in json_data or "reagent" not in json_data: raise ValueError( "不支持的 JSON 格式。请使用标准格式:\n" - '{"workflow": [{"action": "...", "action_args": {...}}, ...], ' - '"reagent": {"name": {"slot": int, "well": [...], "labware": "..."}, ...}}' + '{"labware": [...], "workflow": [...], "reagent": {...}}' ) # 提取数据 workflow = json_data["workflow"] reagent = json_data["reagent"] + labware_defs = json_data.get("labware", []) # 新的 labware 定义列表 # 规范化步骤数据 protocol_steps = normalize_workflow_steps(workflow) - # reagent 已经是字典格式,直接使用 + # reagent 已经是字典格式,用于 set_liquid 和 well 数量查找 labware_info = reagent # 构建工作流图 @@ -265,6 +269,7 @@ def convert_from_json( protocol_steps=protocol_steps, workstation_name=workstation_name, action_resource_mapping=ACTION_RESOURCE_MAPPING, + labware_defs=labware_defs, ) # 校验句柄配置 diff --git a/unilabos/workflow/wf_utils.py b/unilabos/workflow/wf_utils.py index 4645128..6332f1d 100644 --- a/unilabos/workflow/wf_utils.py +++ b/unilabos/workflow/wf_utils.py @@ -41,6 +41,7 @@ def upload_workflow( workflow_name: Optional[str] = None, tags: Optional[List[str]] = None, published: bool = False, + description: str = "", ) -> Dict[str, Any]: """ 上传工作流到服务器 @@ -56,6 +57,7 @@ def upload_workflow( workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名 tags: 工作流标签列表,默认为空列表 published: 是否发布工作流,默认为False + description: 工作流描述,发布时使用 Returns: Dict: API响应数据 @@ -75,6 +77,14 @@ def upload_workflow( print_status(f"工作流文件JSON解析失败: {e}", "error") return {"code": -1, "message": f"JSON解析失败: {e}"} + # 从 JSON 文件中提取 description 和 tags(作为 fallback) + if not description and "description" in workflow_data: + description = workflow_data["description"] + print_status(f"从文件中读取 description", "info") + if not tags and "tags" in workflow_data: + tags = workflow_data["tags"] + print_status(f"从文件中读取 tags: {tags}", "info") + # 自动检测并转换格式 if not _is_node_link_format(workflow_data): try: @@ -96,6 +106,7 @@ def upload_workflow( print_status(f" - 节点数量: {len(nodes)}", "info") print_status(f" - 边数量: {len(edges)}", "info") print_status(f" - 标签: {tags or []}", "info") + print_status(f" - 描述: {description[:50]}{'...' if len(description) > 50 else ''}", "info") print_status(f" - 发布状态: {published}", "info") # 调用 http_client 上传 @@ -107,6 +118,7 @@ def upload_workflow( edges=edges, tags=tags, published=published, + description=description, ) if result.get("code") == 0: @@ -131,8 +143,9 @@ def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None: workflow_name = args_dict.get("workflow_name") tags = args_dict.get("tags", []) published = args_dict.get("published", False) + description = args_dict.get("description", "") if workflow_file: - upload_workflow(workflow_file, workflow_name, tags, published) + upload_workflow(workflow_file, workflow_name, tags, published, description) else: print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error")