diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 14c9592..cc306b7 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -204,6 +204,12 @@ def parse_args(): default=False, help="Whether to publish the workflow (default: False)", ) + workflow_parser.add_argument( + "--description", + type=str, + default="", + help="Workflow description, used when publishing the workflow", + ) return parser @@ -231,52 +237,60 @@ def main(): # 加载配置文件,优先加载config,然后从env读取 config_path = args_dict.get("config") - if check_mode: - args_dict["working_dir"] = os.path.abspath(os.getcwd()) - # 当 skip_env_check 时,默认使用当前目录作为 working_dir - if skip_env_check and not args_dict.get("working_dir") and not config_path: + # === 解析 working_dir === + # 规则1: working_dir 传入 → 检测 unilabos_data 子目录,已是则不修改 + # 规则2: 仅 config_path 传入 → 用其父目录作为 working_dir + # 规则4: 两者都传入 → 各用各的,但 working_dir 仍做 unilabos_data 子目录检测 + raw_working_dir = args_dict.get("working_dir") + if raw_working_dir: + working_dir = os.path.abspath(raw_working_dir) + elif config_path and os.path.exists(config_path): + working_dir = os.path.dirname(os.path.abspath(config_path)) + else: working_dir = os.path.abspath(os.getcwd()) - print_status(f"跳过环境检查模式:使用当前目录作为工作目录 {working_dir}", "info") - # 检查当前目录是否有 local_config.py - local_config_in_cwd = os.path.join(working_dir, "local_config.py") - if os.path.exists(local_config_in_cwd): - config_path = local_config_in_cwd + + # unilabos_data 子目录自动检测 + if os.path.basename(working_dir) != "unilabos_data": + unilabos_data_sub = os.path.join(working_dir, "unilabos_data") + if os.path.isdir(unilabos_data_sub): + working_dir = unilabos_data_sub + elif not raw_working_dir and not (config_path and os.path.exists(config_path)): + # 未显式指定路径,默认使用 cwd/unilabos_data + working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data")) + + # === 解析 config_path === + if config_path and not os.path.exists(config_path): + # config_path 传入但不存在,尝试在 working_dir 中查找 + candidate = os.path.join(working_dir, "local_config.py") + if os.path.exists(candidate): + config_path = candidate + print_status(f"在工作目录中发现配置文件: {config_path}", "info") + else: + print_status( + f"配置文件 {config_path} 不存在,工作目录 {working_dir} 中也未找到 local_config.py," + f"请通过 --config 传入 local_config.py 文件路径", + "error", + ) + os._exit(1) + elif not config_path: + # 规则3: 未传入 config_path,尝试 working_dir/local_config.py + candidate = os.path.join(working_dir, "local_config.py") + if os.path.exists(candidate): + config_path = candidate print_status(f"发现本地配置文件: {config_path}", "info") else: print_status(f"未指定config路径,可通过 --config 传入 local_config.py 文件路径", "info") - elif os.getcwd().endswith("unilabos_data"): - working_dir = os.path.abspath(os.getcwd()) - else: - working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data")) - - if args_dict.get("working_dir"): - working_dir = args_dict.get("working_dir", "") - if config_path and not os.path.exists(config_path): - config_path = os.path.join(working_dir, "local_config.py") - if not os.path.exists(config_path): - print_status( - f"当前工作目录 {working_dir} 未找到local_config.py,请通过 --config 传入 local_config.py 文件路径", - "error", + print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info") + if check_mode or input() != "n": + os.makedirs(working_dir, exist_ok=True) + config_path = os.path.join(working_dir, "local_config.py") + shutil.copy( + os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), + config_path, ) + print_status(f"已创建 local_config.py 路径: {config_path}", "info") + else: os._exit(1) - elif config_path and os.path.exists(config_path): - working_dir = os.path.dirname(config_path) - elif os.path.exists(working_dir) and os.path.exists(os.path.join(working_dir, "local_config.py")): - config_path = os.path.join(working_dir, "local_config.py") - elif not skip_env_check and not config_path and ( - not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py")) - ): - print_status(f"未指定config路径,可通过 --config 传入 local_config.py 文件路径", "info") - print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info") - if input() != "n": - os.makedirs(working_dir, exist_ok=True) - config_path = os.path.join(working_dir, "local_config.py") - shutil.copy( - os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), config_path - ) - print_status(f"已创建 local_config.py 路径: {config_path}", "info") - else: - os._exit(1) # 加载配置文件 (check_mode 跳过) print_status(f"当前工作目录为 {working_dir}", "info") diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 0ecf460..b43b0f4 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -343,9 +343,10 @@ class HTTPClient: edges: List[Dict[str, Any]], tags: Optional[List[str]] = None, published: bool = False, + description: str = "", ) -> Dict[str, Any]: """ - 导入工作流到服务器 + 导入工作流到服务器,如果 published 为 True,则额外发起发布请求 Args: name: 工作流名称(顶层) @@ -355,6 +356,7 @@ class HTTPClient: edges: 工作流边列表 tags: 工作流标签列表,默认为空列表 published: 是否发布工作流,默认为False + description: 工作流描述,发布时使用 Returns: Dict: API响应数据,包含 code 和 data (uuid, name) @@ -367,7 +369,6 @@ class HTTPClient: "nodes": nodes, "edges": edges, "tags": tags if tags is not None else [], - "published": published, }, } # 保存请求到文件 @@ -388,11 +389,51 @@ class HTTPClient: res = response.json() if "code" in res and res["code"] != 0: logger.error(f"导入工作流失败: {response.text}") + return res + # 导入成功后,如果需要发布则额外发起发布请求 + if published: + imported_uuid = res.get("data", {}).get("uuid", workflow_uuid) + publish_res = self.workflow_publish(imported_uuid, description) + res["publish_result"] = publish_res return res else: logger.error(f"导入工作流失败: {response.status_code}, {response.text}") return {"code": response.status_code, "message": response.text} + def workflow_publish(self, workflow_uuid: str, description: str = "") -> Dict[str, Any]: + """ + 发布工作流 + + Args: + workflow_uuid: 工作流UUID + description: 工作流描述 + + Returns: + Dict: API响应数据 + """ + payload = { + "uuid": workflow_uuid, + "description": description, + "published": True, + } + logger.info(f"正在发布工作流: {workflow_uuid}") + response = requests.patch( + f"{self.remote_addr}/lab/workflow/owner", + json=payload, + headers={"Authorization": f"Lab {self.auth}"}, + timeout=60, + ) + if response.status_code == 200: + res = response.json() + if "code" in res and res["code"] != 0: + logger.error(f"发布工作流失败: {response.text}") + else: + logger.info(f"工作流发布成功: {workflow_uuid}") + return res + else: + logger.error(f"发布工作流失败: {response.status_code}, {response.text}") + return {"code": response.status_code, "message": response.text} + # 创建默认客户端实例 http_client = HTTPClient() diff --git a/unilabos/workflow/wf_utils.py b/unilabos/workflow/wf_utils.py index 4645128..6332f1d 100644 --- a/unilabos/workflow/wf_utils.py +++ b/unilabos/workflow/wf_utils.py @@ -41,6 +41,7 @@ def upload_workflow( workflow_name: Optional[str] = None, tags: Optional[List[str]] = None, published: bool = False, + description: str = "", ) -> Dict[str, Any]: """ 上传工作流到服务器 @@ -56,6 +57,7 @@ def upload_workflow( workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名 tags: 工作流标签列表,默认为空列表 published: 是否发布工作流,默认为False + description: 工作流描述,发布时使用 Returns: Dict: API响应数据 @@ -75,6 +77,14 @@ def upload_workflow( print_status(f"工作流文件JSON解析失败: {e}", "error") return {"code": -1, "message": f"JSON解析失败: {e}"} + # 从 JSON 文件中提取 description 和 tags(作为 fallback) + if not description and "description" in workflow_data: + description = workflow_data["description"] + print_status(f"从文件中读取 description", "info") + if not tags and "tags" in workflow_data: + tags = workflow_data["tags"] + print_status(f"从文件中读取 tags: {tags}", "info") + # 自动检测并转换格式 if not _is_node_link_format(workflow_data): try: @@ -96,6 +106,7 @@ def upload_workflow( print_status(f" - 节点数量: {len(nodes)}", "info") print_status(f" - 边数量: {len(edges)}", "info") print_status(f" - 标签: {tags or []}", "info") + print_status(f" - 描述: {description[:50]}{'...' if len(description) > 50 else ''}", "info") print_status(f" - 发布状态: {published}", "info") # 调用 http_client 上传 @@ -107,6 +118,7 @@ def upload_workflow( edges=edges, tags=tags, published=published, + description=description, ) if result.get("code") == 0: @@ -131,8 +143,9 @@ def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None: workflow_name = args_dict.get("workflow_name") tags = args_dict.get("tags", []) published = args_dict.get("published", False) + description = args_dict.get("description", "") if workflow_file: - upload_workflow(workflow_file, workflow_name, tags, published) + upload_workflow(workflow_file, workflow_name, tags, published, description) else: print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error")