diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 42beea4..ef111e6 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -4,6 +4,8 @@ import os import sys import inspect import importlib +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Any, Dict, List, Union, Tuple @@ -60,6 +62,7 @@ class Registry: self.device_module_to_registry = {} self.resource_type_registry = {} self._setup_called = False # 跟踪setup是否已调用 + self._registry_lock = threading.Lock() # 多线程加载时的锁 # 其他状态变量 # self.is_host_mode = False # 移至BasicConfig中 @@ -177,8 +180,7 @@ class Registry: "result": {}, "schema": test_latency_schema, "goal_default": { - arg["name"]: arg["default"] - for arg in test_latency_method_info.get("args", []) + arg["name"]: arg["default"] for arg in test_latency_method_info.get("args", []) }, "handles": {}, }, @@ -262,67 +264,115 @@ class Registry: # 标记setup已被调用 self._setup_called = True + def _load_single_resource_file( + self, file: Path, complete_registry: bool, upload_registry: bool + ) -> Tuple[Dict[str, Any], Dict[str, Any], bool]: + """ + 加载单个资源文件 (线程安全) + + Returns: + (data, complete_data, is_valid): 资源数据, 完整数据, 是否有效 + """ + try: + with open(file, encoding="utf-8", mode="r") as f: + data = yaml.safe_load(io.StringIO(f.read())) + except Exception as e: + logger.warning(f"[UniLab Registry] 读取资源文件失败: {file}, 错误: {e}") + return {}, {}, False + + if not data: + return {}, {}, False + + complete_data = {} + for resource_id, resource_info in data.items(): + if "version" not in resource_info: + resource_info["version"] = "1.0.0" + if "category" not in resource_info: + resource_info["category"] = [file.stem] + elif file.stem not in resource_info["category"]: + resource_info["category"].append(file.stem) + elif not isinstance(resource_info.get("category"), list): + resource_info["category"] = [resource_info["category"]] + if "config_info" not in resource_info: + resource_info["config_info"] = [] + if "icon" not in resource_info: + resource_info["icon"] = "" + if "handles" not in resource_info: + resource_info["handles"] = [] + if "init_param_schema" not in resource_info: + resource_info["init_param_schema"] = {} + if "config_info" in resource_info: + del resource_info["config_info"] + if "file_path" in resource_info: + del resource_info["file_path"] + complete_data[resource_id] = copy.deepcopy(dict(sorted(resource_info.items()))) + if upload_registry: + class_info = resource_info.get("class", {}) + if len(class_info) and "module" in class_info: + if class_info.get("type") == "pylabrobot": + res_class = get_class(class_info["module"]) + if callable(res_class) and not isinstance(res_class, type): + res_instance = res_class(res_class.__name__) + res_ulr = tree_to_list([resource_plr_to_ulab(res_instance)]) + resource_info["config_info"] = res_ulr + resource_info["registry_type"] = "resource" + resource_info["file_path"] = str(file.absolute()).replace("\\", "/") + + complete_data = dict(sorted(complete_data.items())) + complete_data = copy.deepcopy(complete_data) + + if complete_registry: + try: + with open(file, "w", encoding="utf-8") as f: + yaml.dump(complete_data, f, allow_unicode=True, default_flow_style=False, Dumper=NoAliasDumper) + except Exception as e: + logger.warning(f"[UniLab Registry] 写入资源文件失败: {file}, 错误: {e}") + + return data, complete_data, True + def load_resource_types(self, path: os.PathLike, complete_registry: bool, upload_registry: bool): abs_path = Path(path).absolute() resource_path = abs_path / "resources" files = list(resource_path.glob("*/*.yaml")) - logger.trace(f"[UniLab Registry] load resources? {resource_path.exists()}, total: {len(files)}") - current_resource_number = len(self.resource_type_registry) + 1 - for i, file in enumerate(files): - with open(file, encoding="utf-8", mode="r") as f: - data = yaml.safe_load(io.StringIO(f.read())) - complete_data = {} - if data: - # 为每个资源添加文件路径信息 - for resource_id, resource_info in data.items(): - if "version" not in resource_info: - resource_info["version"] = "1.0.0" - if "category" not in resource_info: - resource_info["category"] = [file.stem] - elif file.stem not in resource_info["category"]: - resource_info["category"].append(file.stem) - elif not isinstance(resource_info.get("category"), list): - resource_info["category"] = [resource_info["category"]] - if "config_info" not in resource_info: - resource_info["config_info"] = [] - if "icon" not in resource_info: - resource_info["icon"] = "" - if "handles" not in resource_info: - resource_info["handles"] = [] - if "init_param_schema" not in resource_info: - resource_info["init_param_schema"] = {} - if "config_info" in resource_info: - del resource_info["config_info"] - if "file_path" in resource_info: - del resource_info["file_path"] - complete_data[resource_id] = copy.deepcopy(dict(sorted(resource_info.items()))) - if upload_registry: - class_info = resource_info.get("class", {}) - if len(class_info) and "module" in class_info: - if class_info.get("type") == "pylabrobot": - res_class = get_class(class_info["module"]) - if callable(res_class) and not isinstance( - res_class, type - ): # 有的是类,有的是函数,这里暂时只登记函数类的 - res_instance = res_class(res_class.__name__) - res_ulr = tree_to_list([resource_plr_to_ulab(res_instance)]) - resource_info["config_info"] = res_ulr - resource_info["registry_type"] = "resource" - resource_info["file_path"] = str(file.absolute()).replace("\\", "/") - complete_data = dict(sorted(complete_data.items())) - complete_data = copy.deepcopy(complete_data) - if complete_registry: - with open(file, "w", encoding="utf-8") as f: - yaml.dump(complete_data, f, allow_unicode=True, default_flow_style=False, Dumper=NoAliasDumper) + logger.debug(f"[UniLab Registry] resources: {resource_path.exists()}, total: {len(files)}") + if not files: + return + + # 使用线程池并行加载 + max_workers = min(8, len(files)) + results = [] + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_file = { + executor.submit(self._load_single_resource_file, file, complete_registry, upload_registry): file + for file in files + } + for future in as_completed(future_to_file): + file = future_to_file[future] + try: + data, complete_data, is_valid = future.result() + if is_valid: + results.append((file, data)) + except Exception as e: + logger.warning(f"[UniLab Registry] 处理资源文件异常: {file}, 错误: {e}") + + # 线程安全地更新注册表 + current_resource_number = len(self.resource_type_registry) + 1 + with self._registry_lock: + for i, (file, data) in enumerate(results): self.resource_type_registry.update(data) - logger.trace( # type: ignore - f"[UniLab Registry] Resource-{current_resource_number} File-{i+1}/{len(files)} " + logger.trace( + f"[UniLab Registry] Resource-{current_resource_number} File-{i+1}/{len(results)} " + f"Add {list(data.keys())}" ) current_resource_number += 1 - else: - logger.debug(f"[UniLab Registry] Res File-{i+1}/{len(files)} Not Valid YAML File: {file.absolute()}") + + # 记录无效文件 + valid_files = {r[0] for r in results} + for file in files: + if file not in valid_files: + logger.debug(f"[UniLab Registry] Res File Not Valid YAML File: {file.absolute()}") def _extract_class_docstrings(self, module_string: str) -> Dict[str, str]: """ @@ -674,213 +724,244 @@ class Registry: "handles": {}, } + def _load_single_device_file( + self, file: Path, complete_registry: bool, get_yaml_from_goal_type + ) -> Tuple[Dict[str, Any], Dict[str, Any], bool, List[str]]: + """ + 加载单个设备文件 (线程安全) + + Returns: + (data, complete_data, is_valid, device_ids): 设备数据, 完整数据, 是否有效, 设备ID列表 + """ + try: + with open(file, encoding="utf-8", mode="r") as f: + data = yaml.safe_load(io.StringIO(f.read())) + except Exception as e: + logger.warning(f"[UniLab Registry] 读取设备文件失败: {file}, 错误: {e}") + return {}, {}, False, [] + + if not data: + return {}, {}, False, [] + + complete_data = {} + action_str_type_mapping = { + "UniLabJsonCommand": "UniLabJsonCommand", + "UniLabJsonCommandAsync": "UniLabJsonCommandAsync", + } + status_str_type_mapping = {} + device_ids = [] + + for device_id, device_config in data.items(): + if "version" not in device_config: + device_config["version"] = "1.0.0" + if "category" not in device_config: + device_config["category"] = [file.stem] + elif file.stem not in device_config["category"]: + device_config["category"].append(file.stem) + if "config_info" not in device_config: + device_config["config_info"] = [] + if "description" not in device_config: + device_config["description"] = "" + if "icon" not in device_config: + device_config["icon"] = "" + if "handles" not in device_config: + device_config["handles"] = [] + if "init_param_schema" not in device_config: + device_config["init_param_schema"] = {} + if "class" in device_config: + if "status_types" not in device_config["class"] or device_config["class"]["status_types"] is None: + device_config["class"]["status_types"] = {} + if ( + "action_value_mappings" not in device_config["class"] + or device_config["class"]["action_value_mappings"] is None + ): + device_config["class"]["action_value_mappings"] = {} + enhanced_info = {} + if complete_registry: + device_config["class"]["status_types"].clear() + enhanced_info = get_enhanced_class_info(device_config["class"]["module"], use_dynamic=True) + if not enhanced_info.get("dynamic_import_success", False): + continue + device_config["class"]["status_types"].update( + {k: v["return_type"] for k, v in enhanced_info["status_methods"].items()} + ) + for status_name, status_type in device_config["class"]["status_types"].items(): + if isinstance(status_type, tuple) or status_type in ["Any", "None", "Unknown"]: + status_type = "String" + device_config["class"]["status_types"][status_name] = status_type + try: + target_type = self._replace_type_with_class(status_type, device_id, f"状态 {status_name}") + except ROSMsgNotFound: + continue + if target_type in [dict, list]: + target_type = String + status_str_type_mapping[status_type] = target_type + device_config["class"]["status_types"] = dict(sorted(device_config["class"]["status_types"].items())) + if complete_registry: + old_action_configs = {} + for action_name, action_config in device_config["class"]["action_value_mappings"].items(): + old_action_configs[action_name] = action_config + + device_config["class"]["action_value_mappings"] = { + k: v + for k, v in device_config["class"]["action_value_mappings"].items() + if not k.startswith("auto-") + } + device_config["class"]["action_value_mappings"].update( + { + f"auto-{k}": { + "type": "UniLabJsonCommandAsync" if v["is_async"] else "UniLabJsonCommand", + "goal": {}, + "feedback": {}, + "result": {}, + "schema": self._generate_unilab_json_command_schema( + v["args"], + k, + v.get("return_annotation"), + old_action_configs.get(f"auto-{k}", {}).get("schema"), + ), + "goal_default": {i["name"]: i["default"] for i in v["args"]}, + "handles": old_action_configs.get(f"auto-{k}", {}).get("handles", []), + "placeholder_keys": { + i["name"]: ( + "unilabos_resources" + if i["type"] == "unilabos.registry.placeholder_type:ResourceSlot" + or i["type"] == ("list", "unilabos.registry.placeholder_type:ResourceSlot") + else "unilabos_devices" + ) + for i in v["args"] + if i.get("type", "") + in [ + "unilabos.registry.placeholder_type:ResourceSlot", + "unilabos.registry.placeholder_type:DeviceSlot", + ("list", "unilabos.registry.placeholder_type:ResourceSlot"), + ("list", "unilabos.registry.placeholder_type:DeviceSlot"), + ] + }, + } + for k, v in enhanced_info["action_methods"].items() + if k not in device_config["class"]["action_value_mappings"] + } + ) + for action_name, old_config in old_action_configs.items(): + if action_name in device_config["class"]["action_value_mappings"]: + old_schema = old_config.get("schema", {}) + if "description" in old_schema and old_schema["description"]: + device_config["class"]["action_value_mappings"][action_name]["schema"][ + "description" + ] = old_schema["description"] + device_config["init_param_schema"] = {} + device_config["init_param_schema"]["config"] = self._generate_unilab_json_command_schema( + enhanced_info["init_params"], "__init__" + )["properties"]["goal"] + device_config["init_param_schema"]["data"] = self._generate_status_types_schema( + enhanced_info["status_methods"] + ) + + device_config.pop("schema", None) + device_config["class"]["action_value_mappings"] = dict( + sorted(device_config["class"]["action_value_mappings"].items()) + ) + for action_name, action_config in device_config["class"]["action_value_mappings"].items(): + if "handles" not in action_config: + action_config["handles"] = {} + elif isinstance(action_config["handles"], list): + if len(action_config["handles"]): + logger.error(f"设备{device_id} {action_name} 的handles配置错误,应该是字典类型") + continue + else: + action_config["handles"] = {} + if "type" in action_config: + action_type_str: str = action_config["type"] + if not action_type_str.startswith("UniLabJsonCommand"): + try: + target_type = self._replace_type_with_class( + action_type_str, device_id, f"动作 {action_name}" + ) + except ROSMsgNotFound: + continue + action_str_type_mapping[action_type_str] = target_type + if target_type is not None: + action_config["goal_default"] = yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(target_type.Goal)) + ) + action_config["schema"] = ros_action_to_json_schema(target_type) + else: + logger.warning( + f"[UniLab Registry] 设备 {device_id} 的动作 {action_name} 类型为空,跳过替换" + ) + complete_data[device_id] = copy.deepcopy(dict(sorted(device_config.items()))) + for status_name, status_type in device_config["class"]["status_types"].items(): + device_config["class"]["status_types"][status_name] = status_str_type_mapping[status_type] + for action_name, action_config in device_config["class"]["action_value_mappings"].items(): + if action_config["type"] not in action_str_type_mapping: + continue + action_config["type"] = action_str_type_mapping[action_config["type"]] + self._add_builtin_actions(device_config, device_id) + device_config["file_path"] = str(file.absolute()).replace("\\", "/") + device_config["registry_type"] = "device" + device_ids.append(device_id) + + complete_data = dict(sorted(complete_data.items())) + complete_data = copy.deepcopy(complete_data) + try: + with open(file, "w", encoding="utf-8") as f: + yaml.dump(complete_data, f, allow_unicode=True, default_flow_style=False, Dumper=NoAliasDumper) + except Exception as e: + logger.warning(f"[UniLab Registry] 写入设备文件失败: {file}, 错误: {e}") + + return data, complete_data, True, device_ids + def load_device_types(self, path: os.PathLike, complete_registry: bool): - # return abs_path = Path(path).absolute() devices_path = abs_path / "devices" device_comms_path = abs_path / "device_comms" files = list(devices_path.glob("*.yaml")) + list(device_comms_path.glob("*.yaml")) - logger.trace( # type: ignore + logger.trace( f"[UniLab Registry] devices: {devices_path.exists()}, device_comms: {device_comms_path.exists()}, " + f"total: {len(files)}" ) - current_device_number = len(self.device_type_registry) + 1 + + if not files: + return + from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type - for i, file in enumerate(files): - with open(file, encoding="utf-8", mode="r") as f: - data = yaml.safe_load(io.StringIO(f.read())) - complete_data = {} - action_str_type_mapping = { - "UniLabJsonCommand": "UniLabJsonCommand", - "UniLabJsonCommandAsync": "UniLabJsonCommandAsync", + # 使用线程池并行加载 + max_workers = min(8, len(files)) + results = [] + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_file = { + executor.submit(self._load_single_device_file, file, complete_registry, get_yaml_from_goal_type): file + for file in files } - status_str_type_mapping = {} - if data: - # 在添加到注册表前处理类型替换 - for device_id, device_config in data.items(): - # 添加文件路径信息 - 使用规范化的完整文件路径 - if "version" not in device_config: - device_config["version"] = "1.0.0" - if "category" not in device_config: - device_config["category"] = [file.stem] - elif file.stem not in device_config["category"]: - device_config["category"].append(file.stem) - if "config_info" not in device_config: - device_config["config_info"] = [] - if "description" not in device_config: - device_config["description"] = "" - if "icon" not in device_config: - device_config["icon"] = "" - if "handles" not in device_config: - device_config["handles"] = [] - if "init_param_schema" not in device_config: - device_config["init_param_schema"] = {} - if "class" in device_config: - if ( - "status_types" not in device_config["class"] - or device_config["class"]["status_types"] is None - ): - device_config["class"]["status_types"] = {} - if ( - "action_value_mappings" not in device_config["class"] - or device_config["class"]["action_value_mappings"] is None - ): - device_config["class"]["action_value_mappings"] = {} - enhanced_info = {} - if complete_registry: - device_config["class"]["status_types"].clear() - enhanced_info = get_enhanced_class_info(device_config["class"]["module"], use_dynamic=True) - if not enhanced_info.get("dynamic_import_success", False): - continue - device_config["class"]["status_types"].update( - {k: v["return_type"] for k, v in enhanced_info["status_methods"].items()} - ) - for status_name, status_type in device_config["class"]["status_types"].items(): - if isinstance(status_type, tuple) or status_type in ["Any", "None", "Unknown"]: - status_type = "String" # 替换成ROS的String,便于显示 - device_config["class"]["status_types"][status_name] = status_type - try: - target_type = self._replace_type_with_class( - status_type, device_id, f"状态 {status_name}" - ) - except ROSMsgNotFound: - continue - if target_type in [ - dict, - list, - ]: # 对于嵌套类型返回的对象,暂时处理成字符串,无法直接进行转换 - target_type = String - status_str_type_mapping[status_type] = target_type - device_config["class"]["status_types"] = dict( - sorted(device_config["class"]["status_types"].items()) - ) - if complete_registry: - # 保存原有的 action 配置(用于保留 schema 的 description 和 handles 等) - old_action_configs = {} - for action_name, action_config in device_config["class"]["action_value_mappings"].items(): - old_action_configs[action_name] = action_config + for future in as_completed(future_to_file): + file = future_to_file[future] + try: + data, complete_data, is_valid, device_ids = future.result() + if is_valid: + results.append((file, data, device_ids)) + except Exception as e: + logger.warning(f"[UniLab Registry] 处理设备文件异常: {file}, 错误: {e}") - device_config["class"]["action_value_mappings"] = { - k: v - for k, v in device_config["class"]["action_value_mappings"].items() - if not k.startswith("auto-") - } - # 处理动作值映射 - device_config["class"]["action_value_mappings"].update( - { - f"auto-{k}": { - "type": "UniLabJsonCommandAsync" if v["is_async"] else "UniLabJsonCommand", - "goal": {}, - "feedback": {}, - "result": {}, - "schema": self._generate_unilab_json_command_schema( - v["args"], - k, - v.get("return_annotation"), - # 传入旧的 schema 以保留字段 description - old_action_configs.get(f"auto-{k}", {}).get("schema"), - ), - "goal_default": {i["name"]: i["default"] for i in v["args"]}, - # 保留原有的 handles 配置 - "handles": old_action_configs.get(f"auto-{k}", {}).get("handles", []), - "placeholder_keys": { - i["name"]: ( - "unilabos_resources" - if i["type"] == "unilabos.registry.placeholder_type:ResourceSlot" - or i["type"] - == ("list", "unilabos.registry.placeholder_type:ResourceSlot") - else "unilabos_devices" - ) - for i in v["args"] - if i.get("type", "") - in [ - "unilabos.registry.placeholder_type:ResourceSlot", - "unilabos.registry.placeholder_type:DeviceSlot", - ("list", "unilabos.registry.placeholder_type:ResourceSlot"), - ("list", "unilabos.registry.placeholder_type:DeviceSlot"), - ] - }, - } - # 不生成已配置action的动作 - for k, v in enhanced_info["action_methods"].items() - if k not in device_config["class"]["action_value_mappings"] - } - ) - # 恢复原有的 description 信息(非 auto- 开头的动作) - for action_name, old_config in old_action_configs.items(): - if action_name in device_config["class"]["action_value_mappings"]: # 有一些会被删除 - old_schema = old_config.get("schema", {}) - if "description" in old_schema and old_schema["description"]: - device_config["class"]["action_value_mappings"][action_name]["schema"][ - "description" - ] = old_schema["description"] - device_config["init_param_schema"] = {} - device_config["init_param_schema"]["config"] = self._generate_unilab_json_command_schema( - enhanced_info["init_params"], "__init__" - )["properties"]["goal"] - device_config["init_param_schema"]["data"] = self._generate_status_types_schema( - enhanced_info["status_methods"] - ) - - device_config.pop("schema", None) - device_config["class"]["action_value_mappings"] = dict( - sorted(device_config["class"]["action_value_mappings"].items()) - ) - for action_name, action_config in device_config["class"]["action_value_mappings"].items(): - if "handles" not in action_config: - action_config["handles"] = {} - elif isinstance(action_config["handles"], list): - if len(action_config["handles"]): - logger.error(f"设备{device_id} {action_name} 的handles配置错误,应该是字典类型") - continue - else: - action_config["handles"] = {} - if "type" in action_config: - action_type_str: str = action_config["type"] - # 通过Json发放指令,而不是通过特殊的ros action进行处理 - if not action_type_str.startswith("UniLabJsonCommand"): - try: - target_type = self._replace_type_with_class( - action_type_str, device_id, f"动作 {action_name}" - ) - except ROSMsgNotFound: - continue - action_str_type_mapping[action_type_str] = target_type - if target_type is not None: - action_config["goal_default"] = yaml.safe_load( - io.StringIO(get_yaml_from_goal_type(target_type.Goal)) - ) - action_config["schema"] = ros_action_to_json_schema(target_type) - else: - logger.warning( - f"[UniLab Registry] 设备 {device_id} 的动作 {action_name} 类型为空,跳过替换" - ) - complete_data[device_id] = copy.deepcopy(dict(sorted(device_config.items()))) # 稍后dump到文件 - for status_name, status_type in device_config["class"]["status_types"].items(): - device_config["class"]["status_types"][status_name] = status_str_type_mapping[status_type] - for action_name, action_config in device_config["class"]["action_value_mappings"].items(): - if action_config["type"] not in action_str_type_mapping: - continue - action_config["type"] = action_str_type_mapping[action_config["type"]] - # 添加内置的驱动命令动作 - self._add_builtin_actions(device_config, device_id) - device_config["file_path"] = str(file.absolute()).replace("\\", "/") - device_config["registry_type"] = "device" - logger.trace( # type: ignore - f"[UniLab Registry] Device-{current_device_number} File-{i+1}/{len(files)} Add {device_id} " + # 线程安全地更新注册表 + current_device_number = len(self.device_type_registry) + 1 + with self._registry_lock: + for file, data, device_ids in results: + self.device_type_registry.update(data) + for device_id in device_ids: + logger.trace( + f"[UniLab Registry] Device-{current_device_number} Add {device_id} " + f"[{data[device_id].get('name', '未命名设备')}]" ) current_device_number += 1 - complete_data = dict(sorted(complete_data.items())) - complete_data = copy.deepcopy(complete_data) - with open(file, "w", encoding="utf-8") as f: - yaml.dump(complete_data, f, allow_unicode=True, default_flow_style=False, Dumper=NoAliasDumper) - self.device_type_registry.update(data) - else: - logger.debug( - f"[UniLab Registry] Device File-{i+1}/{len(files)} Not Valid YAML File: {file.absolute()}" - ) + + # 记录无效文件 + valid_files = {r[0] for r in results} + for file in files: + if file not in valid_files: + logger.debug(f"[UniLab Registry] Device File Not Valid YAML File: {file.absolute()}") def obtain_registry_device_info(self): devices = []