diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 2a2facd..3af50cb 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -418,7 +418,7 @@ def main(): # 如果从远端获取了物料信息,则与本地物料进行同步 if request_startup_json and "nodes" in request_startup_json: print_status("开始同步远端物料到本地...", "info") - remote_tree_set = ResourceTreeSet.from_raw_list(request_startup_json["nodes"]) + remote_tree_set = ResourceTreeSet.from_raw_dict_list(request_startup_json["nodes"]) resource_tree_set.merge_remote_resources(remote_tree_set) print_status("远端物料同步完成", "info") diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 6da4f5f..23f139d 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -579,6 +579,8 @@ class MessageProcessor: elif message_type == "session_id": self.session_id = message_data.get("session_id") logger.info(f"[MessageProcessor] Session ID: {self.session_id}") + elif message_type == "request_reload": + await self._handle_request_reload(message_data) else: logger.debug(f"[MessageProcessor] Unknown message type: {message_type}") @@ -888,6 +890,20 @@ class MessageProcessor: ) thread.start() + async def _handle_request_reload(self, data: Dict[str, Any]): + """ + 处理重载请求 + + 当LabGo发送request_reload时,重新发送设备注册信息 + """ + reason = data.get("reason", "unknown") + logger.info(f"[MessageProcessor] Received reload request, reason: {reason}") + + # 重新发送host_node_ready信息 + if self.websocket_client: + self.websocket_client.publish_host_ready() + logger.info("[MessageProcessor] Re-sent host_node_ready after reload request") + async def _send_action_state_response( self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int ): @@ -1282,7 +1298,7 @@ class WebSocketClient(BaseCommunicationClient): self.message_processor.send_message(message) job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name) - logger.debug(f"[WebSocketClient] Job status published: {job_log} - {status}") + logger.trace(f"[WebSocketClient] Job status published: {job_log} - {status}") def send_ping(self, ping_id: str, timestamp: float) -> None: """发送ping消息""" @@ -1313,17 +1329,55 @@ class WebSocketClient(BaseCommunicationClient): logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}") def publish_host_ready(self) -> None: - """发布host_node ready信号""" + """发布host_node ready信号,包含设备和动作信息""" if self.is_disabled or not self.is_connected(): logger.debug("[WebSocketClient] Not connected, cannot publish host ready signal") return + # 收集设备信息 + devices = [] + machine_name = BasicConfig.machine_name + + try: + host_node = HostNode.get_instance(0) + if host_node: + # 获取设备信息 + for device_id, namespace in host_node.devices_names.items(): + device_key = f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}" + is_online = device_key in host_node._online_devices + + # 获取设备的动作信息 + actions = {} + for action_id, client in host_node._action_clients.items(): + # action_id 格式: /namespace/device_id/action_name + if device_id in action_id: + action_name = action_id.split("/")[-1] + actions[action_name] = { + "action_path": action_id, + "action_type": str(type(client).__name__), + } + + devices.append({ + "device_id": device_id, + "namespace": namespace, + "device_key": device_key, + "is_online": is_online, + "machine_name": host_node.device_machine_names.get(device_id, machine_name), + "actions": actions, + }) + + logger.info(f"[WebSocketClient] Collected {len(devices)} devices for host_ready") + except Exception as e: + logger.warning(f"[WebSocketClient] Error collecting device info: {e}") + message = { "action": "host_node_ready", "data": { "status": "ready", "timestamp": time.time(), + "machine_name": machine_name, + "devices": devices, }, } self.message_processor.send_message(message) - logger.info("[WebSocketClient] Host node ready signal published") + logger.info(f"[WebSocketClient] Host node ready signal published with {len(devices)} devices") diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index 02ef20d..e0c7e80 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -70,7 +70,16 @@ class PRCXI9300Deck(Deck): def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs): super().__init__(name, size_x, size_y, size_z) - self.slots = [None] * 6 # PRCXI 9300 有 6 个槽位 + self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位 + self.slot_locations = [Coordinate(0, 0, 0)] * 16 + + def assign_child_at_slot(self, resource: Resource, slot: int, reassign: bool = False) -> None: + if self.slots[slot - 1] is not None and not reassign: + raise ValueError(f"Spot {slot} is already occupied") + + self.slots[slot - 1] = resource + super().assign_child_resource(resource, location=self.slot_locations[slot - 1]) + class PRCXI9300Container(Plate): """PRCXI 9300 的专用 Container 类,继承自 Plate,用于槽位定位和未知模块。 diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 4541cb0..8744b45 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -134,7 +134,7 @@ def canonicalize_nodes_data( parent_instance.children.append(current_instance) # 第五步:创建 ResourceTreeSet - resource_tree_set = ResourceTreeSet.from_nested_list(standardized_instances) + resource_tree_set = ResourceTreeSet.from_nested_instance_list(standardized_instances) return resource_tree_set diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 16aba6d..e0b1b9a 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -5,7 +5,8 @@ import json import threading import time import traceback -from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union +from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \ + Tuple from concurrent.futures import ThreadPoolExecutor import asyncio @@ -362,78 +363,82 @@ class BaseROS2DeviceNode(Node, Generic[T]): return res async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response): + from pylabrobot.resources.resource import Resource as ResourcePLR + from pylabrobot.resources.deck import Deck + from pylabrobot.resources import Coordinate + from pylabrobot.resources import Plate # 物料传输到对应的node节点 - rclient = self.create_client(ResourceAdd, "/resources/add") - rclient.wait_for_service() - rclient2 = self.create_client(ResourceAdd, "/resources/add") - rclient2.wait_for_service() - request = ResourceAdd.Request() - request2 = ResourceAdd.Request() + client = self._resource_clients["c2s_update_resource_tree"] + request = SerialCommand.Request() + request2 = SerialCommand.Request() command_json = json.loads(req.command) namespace = command_json["namespace"] bind_parent_id = command_json["bind_parent_id"] edge_device_id = command_json["edge_device_id"] location = command_json["bind_location"] other_calling_param = command_json["other_calling_param"] - resources = command_json["resource"] + input_resources = command_json["resource"] initialize_full = other_calling_param.pop("initialize_full", False) # 用来增加液体 ADD_LIQUID_TYPE = other_calling_param.pop("ADD_LIQUID_TYPE", []) - LIQUID_VOLUME = other_calling_param.pop("LIQUID_VOLUME", []) - LIQUID_INPUT_SLOT = other_calling_param.pop("LIQUID_INPUT_SLOT", []) + LIQUID_VOLUME: List[float] = other_calling_param.pop("LIQUID_VOLUME", []) + LIQUID_INPUT_SLOT: List[int] = other_calling_param.pop("LIQUID_INPUT_SLOT", []) slot = other_calling_param.pop("slot", "-1") - resource = None - if slot != "-1": # slot为负数的时候采用assign方法 + if slot != -1: # slot为负数的时候采用assign方法 other_calling_param["slot"] = slot - # 本地拿到这个物料,可能需要先做初始化? - if isinstance(resources, list): - if ( - len(resources) == 1 and isinstance(resources[0], list) and not initialize_full - ): # 取消,不存在的情况 - # 预先initialize过,以整组的形式传入 - request.resources = [convert_to_ros_msg(Resource, resource_) for resource_ in resources[0]] - elif initialize_full: - resources = initialize_resources(resources) - request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources] - else: - request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources] - else: - if initialize_full: - resources = initialize_resources([resources]) - request.resources = [convert_to_ros_msg(Resource, resources)] - if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1: - container_instance = request.resources[0] - container_query_dict: dict = resources + # 本地拿到这个物料,可能需要先做初始化 + if isinstance(input_resources, list) and initialize_full: + input_resources = initialize_resources(input_resources) + elif initialize_full: + input_resources = initialize_resources([input_resources]) + rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources) + parent_resource = None + if bind_parent_id != self.node_name: + parent_resource = self.resource_tracker.figure_resource( + {"name": bind_parent_id} + ) + for r in rts.root_nodes: + # noinspection PyUnresolvedReferences + r.res_content.parent_uuid = parent_resource.unilabos_uuid + + if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer): + # noinspection PyTypeChecker + container_instance: RegularContainer = rts.root_nodes[0] found_resources = self.resource_tracker.figure_resource( - {"id": container_query_dict["name"]}, try_mode=True + {"id": container_instance.name}, try_mode=True ) if not len(found_resources): self.resource_tracker.add_resource(container_instance) - logger.info(f"添加物料{container_query_dict['name']}到资源跟踪器") + logger.info(f"添加物料{container_instance.name}到资源跟踪器") else: assert ( len(found_resources) == 1 - ), f"找到多个同名物料: {container_query_dict['name']}, 请检查物料系统" - resource = found_resources[0] - if isinstance(resource, Resource): - regular_container = RegularContainer(resource.id) - regular_container.ulr_resource = resource - regular_container.ulr_resource_data.update(json.loads(container_instance.data)) - logger.info(f"更新物料{container_query_dict['name']}的数据{resource.data} ULR") - elif isinstance(resource, dict): - if "data" not in resource: - resource["data"] = {} - resource["data"].update(json.loads(container_instance.data)) - request.resources[0].name = resource["name"] - logger.info(f"更新物料{container_query_dict['name']}的数据{resource['data']} dict") + ), f"找到多个同名物料: {container_instance.name}, 请检查物料系统" + found_resource = found_resources[0] + if isinstance(found_resource, RegularContainer): + logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}") + found_resource.state.update(json.loads(container_instance.state)) + elif isinstance(found_resource, dict): + raise ValueError("已不支持 字典 版本的RegularContainer") else: logger.info( - f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}" + f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}" ) - response: ResourceAdd.Response = await rclient.call_async(request) - # 应该先add_resource了 + # noinspection PyUnresolvedReferences + request.command = json.dumps({ + "action": "add", + "data": { + "data": rts.dump(), + "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "", + "first_add": True, + }, + }) + tree_response: SerialCommand.Response = await client.call_async(request) + uuid_maps = json.loads(tree_response.response) + self.resource_tracker.loop_update_uuid(input_resources, uuid_maps) + self.lab_logger().info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes") final_response = { - "created_resources": [ROS2MessageInstance(i).get_python_dict() for i in request.resources], + "created_resources": rts.dump(), "liquid_input_resources": [], } res.response = json.dumps(final_response) @@ -458,59 +463,60 @@ class BaseROS2DeviceNode(Node, Generic[T]): ) res.response = get_result_info_str(traceback.format_exc(), False, {}) return res - # 接下来该根据bind_parent_id进行assign了,目前只有plr可以进行assign,不然没有办法输入到物料系统中 - if bind_parent_id != self.node_name: - resource = self.resource_tracker.figure_resource( - {"name": bind_parent_id} - ) # 拿到父节点,进行具体assign等操作 - # request.resources = [convert_to_ros_msg(Resource, resources)] - try: - from pylabrobot.resources.resource import Resource as ResourcePLR - from pylabrobot.resources.deck import Deck - from pylabrobot.resources import Coordinate - from pylabrobot.resources import OTDeck - from pylabrobot.resources import Plate - - contain_model = not isinstance(resource, Deck) - if isinstance(resource, ResourcePLR): - # resources.list() - plr_instance = ResourceTreeSet.from_raw_list(resources).to_plr_resources()[0] - # resources_tree = dict_to_tree(copy.deepcopy({r["id"]: r for r in resources})) - # plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model) - + if len(rts.root_nodes) == 1 and parent_resource is not None: + plr_instance = rts.to_plr_resources()[0] if isinstance(plr_instance, Plate): - empty_liquid_info_in = [(None, 0)] * plr_instance.num_items + empty_liquid_info_in: List[Tuple[Optional[str], float]] = [(None, 0)] * plr_instance.num_items + if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1: + ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT) + LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT) + self.lab_logger().warning(f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个") for liquid_type, liquid_volume, liquid_input_slot in zip( ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT ): empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume) plr_instance.set_well_liquids(empty_liquid_info_in) - input_wells_ulr = [ - convert_to_ros_msg( - Resource, - resource_plr_to_ulab(plr_instance.get_well(LIQUID_INPUT_SLOT), with_children=False), - ) - for r in LIQUID_INPUT_SLOT - ] - final_response["liquid_input_resources"] = [ - ROS2MessageInstance(i).get_python_dict() for i in input_wells_ulr - ] + try: + # noinspection PyProtectedMember + keys = list(plr_instance._ordering.keys()) + for ind, r in enumerate(LIQUID_INPUT_SLOT[:]): + if isinstance(r, int): + # noinspection PyTypeChecker + LIQUID_INPUT_SLOT[ind] = keys[r] + input_wells = [plr_instance.get_well(r) for r in LIQUID_INPUT_SLOT] + except AttributeError: + # 按照id回去失败,回退到children + input_wells = [] + for r in LIQUID_INPUT_SLOT: + input_wells.append(plr_instance.children[r]) + final_response["liquid_input_resources"] = ResourceTreeSet.from_plr_resources(input_wells).dump() res.response = json.dumps(final_response) - if isinstance(resource, OTDeck) and "slot" in other_calling_param: + if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param: other_calling_param["slot"] = int(other_calling_param["slot"]) - resource.assign_child_at_slot(plr_instance, **other_calling_param) + parent_resource.assign_child_at_slot(plr_instance, **other_calling_param) else: - _discard_slot = other_calling_param.pop("slot", "-1") - resource.assign_child_resource( + _discard_slot = other_calling_param.pop("slot", -1) + parent_resource.assign_child_resource( plr_instance, Coordinate(location["x"], location["y"], location["z"]), **other_calling_param, ) - request2.resources = [ - convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)]) - ] - rclient2.call(request2) + # 调整了液体以及Deck之后要重新Assign + # noinspection PyUnresolvedReferences + request.command = json.dumps({ + "action": "add", + "data": { + "data": ResourceTreeSet.from_plr_resources([parent_resource]).dump(), + "mount_uuid": parent_resource.parent.unilabos_uuid if parent_resource.parent is not None else self.uuid, + "first_add": False, + }, + }) + tree_response: SerialCommand.Response = await client.call_async(request) + uuid_maps = json.loads(tree_response.response) + self.resource_tracker.loop_update_uuid(input_resources, uuid_maps) + self._lab_logger.info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes") + # 这里created_resources不包含parent_resource # 发送给ResourceMeshManager action_client = ActionClient( self, @@ -521,7 +527,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): goal = SendCmd.Goal() goal.command = json.dumps( { - "resources": resources, + "resources": input_resources, "bind_parent_id": bind_parent_id, } ) @@ -614,7 +620,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): ) ) # type: ignore raw_nodes = json.loads(response.response) - tree_set = ResourceTreeSet.from_raw_list(raw_nodes) + tree_set = ResourceTreeSet.from_raw_dict_list(raw_nodes) self.lab_logger().debug(f"获取资源结果: {len(tree_set.trees)} 个资源树") return tree_set @@ -642,7 +648,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): raw_data = json.loads(response.response) # 转换为 PLR 资源 - tree_set = ResourceTreeSet.from_raw_list(raw_data) + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) plr_resource = tree_set.to_plr_resources()[0] self.lab_logger().debug(f"获取资源 {resource_id} 成功") return plr_resource @@ -1523,7 +1529,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): raw_data = json.loads(response.response) # 转换为 PLR 资源 - tree_set = ResourceTreeSet.from_raw_list(raw_data) + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) plr_resource = tree_set.to_plr_resources()[0] # 通过资源跟踪器获取本地实例 diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index ebc4492..971b6dd 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -45,6 +45,7 @@ from unilabos.ros.nodes.resource_tracker import ( ) from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid +from unilabos.utils.log import warning from unilabos.utils.type_check import serialize_result_info from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot @@ -180,7 +181,7 @@ class HostNode(BaseROS2DeviceNode): for plr_resource in ResourceTreeSet([tree]).to_plr_resources(): self._resource_tracker.add_resource(plr_resource) except Exception as ex: - self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!") + warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!") except Exception as ex: logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}") # 初始化Node基类,传递空参数覆盖列表 @@ -455,10 +456,10 @@ class HostNode(BaseROS2DeviceNode): async def create_resource( self, - device_id: str, + device_id: DeviceSlot, res_id: str, class_name: str, - parent: str, + parent: ResourceSlot, bind_locations: Point, liquid_input_slot: list[int] = [], liquid_type: list[str] = [], @@ -805,7 +806,7 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}") if goal_status != GoalStatus.STATUS_CANCELED: - self.lab_logger().debug(f"[Host Node] Result data: {result_data}") + self.lab_logger().trace(f"[Host Node] Result data: {result_data}") # 清理 _goals 中的记录 if job_id in self._goals: diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index 7325dda..91cd0ef 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -244,7 +244,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): r ) # type: ignore raw_data = json.loads(response.response) - tree_set = ResourceTreeSet.from_raw_list(raw_data) + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) target = tree_set.dump() protocol_kwargs[k] = target[0][0] if v == "unilabos_msgs/Resource" else target except Exception as ex: diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index aaa8079..09e7749 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -523,7 +523,7 @@ class ResourceTreeSet(object): return plr_resources @classmethod - def from_raw_list(cls, raw_list: List[Dict[str, Any]]) -> "ResourceTreeSet": + def from_raw_dict_list(cls, raw_list: List[Dict[str, Any]]) -> "ResourceTreeSet": """ 从原始字典列表创建 ResourceTreeSet,自动建立 parent-children 关系 @@ -573,10 +573,10 @@ class ResourceTreeSet(object): parent_instance.children.append(instance) # 第四步:使用 from_nested_list 创建 ResourceTreeSet - return cls.from_nested_list(instances) + return cls.from_nested_instance_list(instances) @classmethod - def from_nested_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet": + def from_nested_instance_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet": """ 从扁平化的资源列表创建ResourceTreeSet,自动按根节点分组 @@ -785,7 +785,7 @@ class ResourceTreeSet(object): """ nested_lists = [] for tree_data in data: - nested_lists.extend(ResourceTreeSet.from_raw_list(tree_data).trees) + nested_lists.extend(ResourceTreeSet.from_raw_dict_list(tree_data).trees) return cls(nested_lists) @@ -965,7 +965,7 @@ class DeviceNodeResourceTracker(object): if current_uuid in self.uuid_to_resources: self.uuid_to_resources.pop(current_uuid) self.uuid_to_resources[new_uuid] = res - logger.debug(f"更新uuid: {current_uuid} -> {new_uuid}") + logger.trace(f"更新uuid: {current_uuid} -> {new_uuid}") replaced = 1 return replaced