diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index 35aba21..07abb87 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -27,7 +27,12 @@ from typing_extensions import TypedDict from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend from unilabos.registry.placeholder_type import ResourceSlot -from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict +from unilabos.resources.resource_tracker import ( + ResourceTreeSet, + ResourceDict, + EXTRA_SAMPLE_UUID, + EXTRA_UNILABOS_SAMPLE_UUID, +) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode @@ -231,12 +236,11 @@ class LiquidHandlerMiddleware(LiquidHandler): res_samples = [] res_volumes = [] for resource, volume, channel in zip(resources, vols, use_channels): - res_samples.append( - {"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)} - ) + sample_uuid_value = resource.unilabos_extra.get(EXTRA_SAMPLE_UUID, None) + res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: sample_uuid_value}) res_volumes.append(volume) self.pending_liquids_dict[channel] = { - "sample_uuid": resource.unilabos_extra.get("sample_uuid", None), + EXTRA_SAMPLE_UUID: sample_uuid_value, "volume": volume, } return SimpleReturn(samples=res_samples, volumes=res_volumes) @@ -278,10 +282,10 @@ class LiquidHandlerMiddleware(LiquidHandler): res_samples = [] res_volumes = [] for resource, volume, channel in zip(resources, vols, use_channels): - res_uuid = self.pending_liquids_dict[channel]["sample_uuid"] + res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID] self.pending_liquids_dict[channel]["volume"] -= volume - resource.unilabos_extra["sample_uuid"] = res_uuid - res_samples.append({"name": resource.name, "sample_uuid": res_uuid}) + resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid + res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: res_uuid}) res_volumes.append(volume) return SimpleReturn(samples=res_samples, volumes=res_volumes) diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index 8a0fef3..0b67fe3 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -14,6 +14,20 @@ if TYPE_CHECKING: EXTRA_CLASS = "unilabos_resource_class" +EXTRA_SAMPLE_UUID = "sample_uuid" +EXTRA_UNILABOS_SAMPLE_UUID = "unilabos_sample_uuid" + +# 函数参数名常量 - 用于自动注入 sample_uuids 列表 +PARAM_SAMPLE_UUIDS = "sample_uuids" + +# JSON Command 中的系统参数字段名 +JSON_UNILABOS_PARAM = "unilabos_param" + +# 返回值中的 samples 字段名 +RETURN_UNILABOS_SAMPLES = "unilabos_samples" + +# sample_uuids 参数类型 (用于 virtual bench 等设备添加 sample_uuids 参数) +SampleUUIDsType = Dict[str, Optional["PLRResource"]] class ResourceDictPositionSize(BaseModel): @@ -529,6 +543,7 @@ class ResourceTreeSet(object): plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True) from pylabrobot.resources import Coordinate from pylabrobot.serializer import deserialize + location = cast(Coordinate, deserialize(plr_dict["location"])) plr_resource.location = location plr_resource.load_all_state(all_states) diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 56585f6..7cde6a3 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -4,8 +4,20 @@ import json import threading import time import traceback -from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \ - Tuple +from typing import ( + get_type_hints, + TypeVar, + Generic, + Dict, + Any, + Type, + TypedDict, + Optional, + List, + TYPE_CHECKING, + Union, + Tuple, +) from concurrent.futures import ThreadPoolExecutor import asyncio @@ -48,6 +60,9 @@ from unilabos.resources.resource_tracker import ( ResourceTreeSet, ResourceTreeInstance, ResourceDictInstance, + EXTRA_SAMPLE_UUID, + PARAM_SAMPLE_UUIDS, + JSON_UNILABOS_PARAM, ) from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator from rclpy.task import Task, Future @@ -361,6 +376,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): from pylabrobot.resources.deck import Deck from pylabrobot.resources import Coordinate from pylabrobot.resources import Plate + # 物料传输到对应的node节点 client = self._resource_clients["c2s_update_resource_tree"] request = SerialCommand.Request() @@ -388,9 +404,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources) parent_resource = None if bind_parent_id != self.node_name: - parent_resource = self.resource_tracker.figure_resource( - {"name": bind_parent_id} - ) + parent_resource = self.resource_tracker.figure_resource({"name": bind_parent_id}) for r in rts.root_nodes: # noinspection PyUnresolvedReferences r.res_content.parent_uuid = parent_resource.unilabos_uuid @@ -398,19 +412,20 @@ class BaseROS2DeviceNode(Node, Generic[T]): for r in rts.root_nodes: r.res_content.parent_uuid = self.uuid - if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer): + if ( + len(LIQUID_INPUT_SLOT) + and LIQUID_INPUT_SLOT[0] == -1 + and len(rts.root_nodes) == 1 + and isinstance(rts.root_nodes[0], RegularContainer) + ): # noinspection PyTypeChecker container_instance: RegularContainer = rts.root_nodes[0] - found_resources = self.resource_tracker.figure_resource( - {"id": container_instance.name}, try_mode=True - ) + found_resources = self.resource_tracker.figure_resource({"id": container_instance.name}, try_mode=True) if not len(found_resources): self.resource_tracker.add_resource(container_instance) logger.info(f"添加物料{container_instance.name}到资源跟踪器") else: - assert ( - len(found_resources) == 1 - ), f"找到多个同名物料: {container_instance.name}, 请检查物料系统" + assert len(found_resources) == 1, f"找到多个同名物料: {container_instance.name}, 请检查物料系统" found_resource = found_resources[0] if isinstance(found_resource, RegularContainer): logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}") @@ -422,14 +437,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}" ) # noinspection PyUnresolvedReferences - request.command = json.dumps({ - "action": "add", - "data": { - "data": rts.dump(), - "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "", - "first_add": False, - }, - }) + request.command = json.dumps( + { + "action": "add", + "data": { + "data": rts.dump(), + "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "", + "first_add": False, + }, + } + ) tree_response: SerialCommand.Response = await client.call_async(request) uuid_maps = json.loads(tree_response.response) plr_instances = rts.to_plr_resources() @@ -471,7 +488,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1: ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT) LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT) - self.lab_logger().warning(f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个") + self.lab_logger().warning( + f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个" + ) for liquid_type, liquid_volume, liquid_input_slot in zip( ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT ): @@ -490,9 +509,15 @@ class BaseROS2DeviceNode(Node, Generic[T]): input_wells = [] for r in LIQUID_INPUT_SLOT: input_wells.append(plr_instance.children[r]) - final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(input_wells).dump() + final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources( + input_wells + ).dump() res.response = json.dumps(final_response) - if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param: + if ( + issubclass(parent_resource.__class__, Deck) + and hasattr(parent_resource, "assign_child_at_slot") + and "slot" in other_calling_param + ): other_calling_param["slot"] = int(other_calling_param["slot"]) parent_resource.assign_child_at_slot(plr_instance, **other_calling_param) else: @@ -507,14 +532,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): rts_with_parent = ResourceTreeSet.from_plr_resources([parent_resource]) if rts_with_parent.root_nodes[0].res_content.uuid_parent is None: rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid - request.command = json.dumps({ - "action": "add", - "data": { - "data": rts_with_parent.dump(), - "mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent, - "first_add": False, - }, - }) + request.command = json.dumps( + { + "action": "add", + "data": { + "data": rts_with_parent.dump(), + "mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent, + "first_add": False, + }, + } + ) tree_response: SerialCommand.Response = await client.call_async(request) uuid_maps = json.loads(tree_response.response) self.resource_tracker.loop_update_uuid(input_resources, uuid_maps) @@ -811,7 +838,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): } def _handle_update( - plr_resources: List[Union[ResourcePLR, ResourceDictInstance]], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any] + plr_resources: List[Union[ResourcePLR, ResourceDictInstance]], + tree_set: ResourceTreeSet, + additional_add_params: Dict[str, Any], ) -> Tuple[Dict[str, Any], List[ResourcePLR]]: """ 处理资源更新操作的内部函数 @@ -836,7 +865,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): original_parent_resource = original_instance.parent original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None) target_parent_resource_uuid = tree.root_node.res_content.uuid_parent - not_same_parent = original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None + not_same_parent = ( + original_parent_resource_uuid != target_parent_resource_uuid + and original_parent_resource is not None + ) old_name = original_instance.name new_name = plr_resource.name parent_appended = False @@ -872,8 +904,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): else: # 判断是否变更了resource_site,重新登记 target_site = original_instance.unilabos_extra.get("update_resource_site") - sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None - site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else [] + sites = ( + original_instance.parent.sites + if original_instance.parent is not None and hasattr(original_instance.parent, "sites") + else None + ) + site_names = ( + list(original_instance.parent._ordering.keys()) + if original_instance.parent is not None and hasattr(original_instance.parent, "sites") + else [] + ) if target_site is not None and sites is not None and site_names is not None: site_index = sites.index(original_instance) site_name = site_names[site_index] @@ -910,9 +950,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): action = i.get("action") # remove, add, update resources_uuid: List[str] = i.get("data") # 资源数据 additional_add_params = i.get("additional_add_params", {}) # 额外参数 - self.lab_logger().trace( - f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}" - ) + self.lab_logger().trace(f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}") tree_set = None if action in ["add", "update"]: tree_set = await self.get_resource( @@ -939,9 +977,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): tree.root_node.res_content.parent_uuid = self.uuid r = SerialCommand.Request() r.command = json.dumps( - {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致 + {"data": {"data": new_tree_set.dump()}, "action": "update"} + ) # 和Update Resource一致 response: SerialCommand_Response = await self._resource_clients[ - "c2s_update_resource_tree"].call_async(r) # type: ignore + "c2s_update_resource_tree" + ].call_async( + r + ) # type: ignore self.lab_logger().info(f"确认资源云端 Add 结果: {response.response}") results.append(result) elif action == "update": @@ -961,9 +1003,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): tree.root_node.res_content.parent_uuid = self.uuid r = SerialCommand.Request() r.command = json.dumps( - {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致 + {"data": {"data": new_tree_set.dump()}, "action": "update"} + ) # 和Update Resource一致 response: SerialCommand_Response = await self._resource_clients[ - "c2s_update_resource_tree"].call_async(r) # type: ignore + "c2s_update_resource_tree" + ].call_async( + r + ) # type: ignore self.lab_logger().info(f"确认资源云端 Update 结果: {response.response}") results.append(result) elif action == "remove": @@ -1333,7 +1379,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): resource_id=resource_data["id"], with_children=True ) if "sample_id" in resource_data: - plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"] + plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"] queried_resources[idx] = plr_resource else: uuid_indices.append((idx, unilabos_uuid, resource_data)) @@ -1346,7 +1392,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): for i, (idx, _, resource_data) in enumerate(uuid_indices): plr_resource = plr_resources[i] if "sample_id" in resource_data: - plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"] + plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"] queried_resources[idx] = plr_resource self.lab_logger().debug(f"资源查询结果: 共 {len(queried_resources)} 个资源") @@ -1354,7 +1400,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 通过资源跟踪器获取本地实例 final_resources = queried_resources if is_sequence else queried_resources[0] if not is_sequence: - plr = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) + plr = self.resource_tracker.figure_resource( + {"name": final_resources.name}, try_mode=False + ) # 保留unilabos_extra if hasattr(final_resources, "unilabos_extra") and hasattr(plr, "unilabos_extra"): plr.unilabos_extra = getattr(final_resources, "unilabos_extra", {}).copy() @@ -1393,8 +1441,12 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_success = True except Exception as _: execution_error = traceback.format_exc() - error(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}") - trace(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") + error( + f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}" + ) + trace( + f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" + ) future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) future.add_done_callback(_handle_future_exception) @@ -1414,9 +1466,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): except Exception as _: execution_error = traceback.format_exc() error( - f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}") + f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}" + ) trace( - f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") + f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" + ) future.add_done_callback(_handle_future_exception) @@ -1539,20 +1593,29 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: function_name = target["function_name"] function_args = target["function_args"] + # 获取 unilabos 系统参数 + unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {}) + assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) assert callable( function ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" - # 处理 ResourceSlot 类型参数 - args_list = default_manager._analyze_method_signature(function)["args"] + # 处理参数(包含 unilabos 系统参数如 sample_uuids) + args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"] for arg in args_list: arg_name = arg["name"] arg_type = arg["type"] # 跳过不在 function_args 中的参数 if arg_name not in function_args: + # 处理 sample_uuids 参数注入 + if arg_name == PARAM_SAMPLE_UUIDS: + function_args[PARAM_SAMPLE_UUIDS] = unilabos_param.get(PARAM_SAMPLE_UUIDS, []) + self.lab_logger().debug( + f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {function_args[PARAM_SAMPLE_UUIDS]}" + ) continue # 处理单个 ResourceSlot @@ -1581,6 +1644,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}" ) raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}") + # todo: 默认反报送 return function(**function_args) except KeyError as ex: @@ -1601,14 +1665,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): raise ValueError("至少需要提供一个 UUID") uuids_list = list(uuids) - future = self._resource_clients["c2s_update_resource_tree"].call_async(SerialCommand.Request( - command=json.dumps( - { - "data": {"data": uuids_list, "with_children": True}, - "action": "get", - } + future = self._resource_clients["c2s_update_resource_tree"].call_async( + SerialCommand.Request( + command=json.dumps( + { + "data": {"data": uuids_list, "with_children": True}, + "action": "get", + } + ) ) - )) + ) # 等待结果(使用while循环,每次sleep 0.05秒,最多等待30秒) timeout = 30.0 @@ -1666,6 +1732,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: function_name = target["function_name"] function_args = target["function_args"] + # 获取 unilabos 系统参数 + unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {}) + assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) assert callable( @@ -1675,14 +1744,20 @@ class BaseROS2DeviceNode(Node, Generic[T]): function ), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}" - # 处理 ResourceSlot 类型参数 - args_list = default_manager._analyze_method_signature(function)["args"] + # 处理参数(包含 unilabos 系统参数如 sample_uuids) + args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"] for arg in args_list: arg_name = arg["name"] arg_type = arg["type"] # 跳过不在 function_args 中的参数 if arg_name not in function_args: + # 处理 sample_uuids 参数注入 + if arg_name == PARAM_SAMPLE_UUIDS: + function_args[PARAM_SAMPLE_UUIDS] = unilabos_param.get(PARAM_SAMPLE_UUIDS, []) + self.lab_logger().debug( + f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {function_args[PARAM_SAMPLE_UUIDS]}" + ) continue # 处理单个 ResourceSlot @@ -1960,7 +2035,9 @@ class ROS2DeviceNode: asyncio.set_event_loop(loop) loop.run_forever() - ROS2DeviceNode._asyncio_loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNode") + ROS2DeviceNode._asyncio_loop_thread = threading.Thread( + target=run_event_loop, daemon=True, name="ROS2DeviceNode" + ) ROS2DeviceNode._asyncio_loop_thread.start() logger.info(f"循环线程已启动") diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 4cfa9c1..64e5104 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -42,6 +42,9 @@ from unilabos.resources.resource_tracker import ( ResourceDictInstance, ResourceTreeSet, ResourceTreeInstance, + EXTRA_SAMPLE_UUID, + EXTRA_UNILABOS_SAMPLE_UUID, + RETURN_UNILABOS_SAMPLES, ) from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid @@ -791,12 +794,17 @@ class HostNode(BaseROS2DeviceNode): action_client: ActionClient = self._action_clients[action_id] - # 遍历action_kwargs下的所有子dict,将"sample_uuid"的值赋给"sample_id" + # 遍历action_kwargs下的所有子dict,将sample_uuid的值赋给sample_id def assign_sample_id(obj): if isinstance(obj, dict): - if "sample_uuid" in obj: - obj["sample_id"] = obj["sample_uuid"] - obj.pop("sample_uuid") + # 处理 EXTRA_SAMPLE_UUID ("sample_uuid") + if EXTRA_SAMPLE_UUID in obj: + obj["sample_id"] = obj[EXTRA_SAMPLE_UUID] + obj.pop(EXTRA_SAMPLE_UUID) + # 处理 EXTRA_UNILABOS_SAMPLE_UUID ("unilabos_sample_uuid") + if EXTRA_UNILABOS_SAMPLE_UUID in obj: + obj["sample_id"] = obj[EXTRA_UNILABOS_SAMPLE_UUID] + obj.pop(EXTRA_UNILABOS_SAMPLE_UUID) for k, v in obj.items(): if k != "unilabos_extra": assign_sample_id(v) @@ -867,14 +875,14 @@ class HostNode(BaseROS2DeviceNode): # 适配后端的一些额外处理 return_value = return_info.get("return_value") if isinstance(return_value, dict): - unilabos_samples = return_value.pop("unilabos_samples", None) + unilabos_samples = return_value.pop(RETURN_UNILABOS_SAMPLES, None) if isinstance(unilabos_samples, list) and unilabos_samples: self.lab_logger().info( f"[Host Node] Job {job_id[:8]} returned {len(unilabos_samples)} sample(s): " f"{[s.get('name', s.get('id', 'unknown')) if isinstance(s, dict) else str(s)[:20] for s in unilabos_samples[:5]]}" f"{'...' if len(unilabos_samples) > 5 else ''}" ) - return_info["unilabos_samples"] = unilabos_samples + return_info[RETURN_UNILABOS_SAMPLES] = unilabos_samples suc = return_info.get("suc", False) if not suc: status = "failed" diff --git a/unilabos/utils/import_manager.py b/unilabos/utils/import_manager.py index 2df7636..18a3920 100644 --- a/unilabos/utils/import_manager.py +++ b/unilabos/utils/import_manager.py @@ -27,6 +27,7 @@ __all__ = [ from ast import Constant +from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS from unilabos.utils import logger from unilabos.utils.decorator import is_not_action @@ -341,13 +342,18 @@ class ImportManager: result["action_methods"][method_name] = method_info return result - def _analyze_method_signature(self, method) -> Dict[str, Any]: + def _analyze_method_signature(self, method, skip_unilabos_params: bool = True) -> Dict[str, Any]: """ 分析方法签名,提取具体的命名参数信息 注意:此方法会跳过*args和**kwargs,只提取具体的命名参数 这样可以确保通过**dict方式传参时的准确性 + Args: + method: 要分析的方法 + skip_unilabos_params: 是否跳过 unilabos 系统参数(如 sample_uuids), + registry 补全时为 True,JsonCommand 执行时为 False + 示例用法: method_info = self._analyze_method_signature(some_method) params = {"param1": "value1", "param2": "value2"} @@ -368,6 +374,10 @@ class ImportManager: if param.kind == param.VAR_KEYWORD: # **kwargs continue + # 跳过 sample_uuids 参数(由系统自动注入,registry 补全时跳过) + if skip_unilabos_params and param_name == PARAM_SAMPLE_UUIDS: + continue + is_required = param.default == inspect.Parameter.empty if is_required: num_required += 1 @@ -563,6 +573,9 @@ class ImportManager: for i, arg in enumerate(node.args.args): if arg.arg == "self": continue + # 跳过 sample_uuids 参数(由系统自动注入) + if arg.arg == PARAM_SAMPLE_UUIDS: + continue arg_info = { "name": arg.arg, "type": None, diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index f4c0ac8..381cc66 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -60,7 +60,11 @@ ==================== 连接关系图 ==================== 控制流 (ready 端口串联): - create_resource_1 -> create_resource_2 -> ... -> set_liquid_1 -> set_liquid_2 -> ... -> transfer_liquid_1 -> transfer_liquid_2 -> ... + - create_resource 之间: 无 ready 连接 + - set_liquid_from_plate 之间: 无 ready 连接 + - create_resource 与 set_liquid_from_plate 之间: 无 ready 连接 + - transfer_liquid 之间: 通过 ready 端口串联 + transfer_liquid_1 -> transfer_liquid_2 -> transfer_liquid_3 -> ... 物料流: [create_resource] --labware--> [set_liquid_from_plate] --output_wells--> [transfer_liquid] --sources_out/targets_out--> [下一个 transfer_liquid] @@ -402,7 +406,6 @@ def build_protocol_graph( # 为每个唯一的 slot 创建 create_resource 节点 res_index = 0 - last_create_resource_id = None for slot, info in slots_info.items(): node_id = str(uuid.uuid4()) res_id = info["res_id"] @@ -431,10 +434,7 @@ def build_protocol_graph( ) slot_to_create_resource[slot] = node_id - # create_resource 之间通过 ready 串联 - if last_create_resource_id is not None: - G.add_edge(last_create_resource_id, node_id, source_port="ready", target_port="ready") - last_create_resource_id = node_id + # create_resource 之间不需要 ready 连接 # ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ==================== # 创建 Group 节点,包含所有 set_liquid_from_plate 节点 @@ -453,7 +453,6 @@ def build_protocol_graph( ) set_liquid_index = 0 - last_set_liquid_id = last_create_resource_id # set_liquid_from_plate 连接在 create_resource 之后 for labware_id, item in labware_info.items(): # 跳过 Tip/Rack 类型 @@ -494,10 +493,7 @@ def build_protocol_graph( }, ) - # ready 连接:上一个节点 -> set_liquid_from_plate - if last_set_liquid_id is not None: - G.add_edge(last_set_liquid_id, node_id, source_port="ready", target_port="ready") - last_set_liquid_id = node_id + # set_liquid_from_plate 之间不需要 ready 连接 # 物料流:create_resource 的 labware -> set_liquid_from_plate 的 input_plate create_res_node_id = slot_to_create_resource.get(slot) @@ -507,7 +503,8 @@ def build_protocol_graph( # set_liquid_from_plate 的输出 output_wells 用于连接 transfer_liquid resource_last_writer[labware_id] = f"{node_id}:output_wells" - last_control_node_id = last_set_liquid_id + # transfer_liquid 之间通过 ready 串联,从 None 开始 + last_control_node_id = None # 端口名称映射:JSON 字段名 -> 实际 handle key INPUT_PORT_MAPPING = {