diff --git a/test/experiments/biomek.json b/test/experiments/biomek.json new file mode 100644 index 00000000..604adccc --- /dev/null +++ b/test/experiments/biomek.json @@ -0,0 +1,22 @@ +{ + "nodes": [ + { + "id": "BIOMEK", + "name": "BIOMEK", + "parent": null, + "type": "device", + "class": "liquid_handler.biomek", + "position": { + "x": 620.6111111111111, + "y": 171, + "z": 0 + }, + "config": { + }, + "data": {}, + "children": [ + ] + } + ], + "links": [] +} \ No newline at end of file diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index 9f870691..36bc0337 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -1,6 +1,7 @@ import json import time import traceback +from typing import Optional import uuid import paho.mqtt.client as mqtt @@ -163,10 +164,12 @@ class MQTTClient: self.client.publish(address, json.dumps(status), qos=2) logger.critical(f"Device status published: address: {address}, {status}") - def publish_job_status(self, feedback_data: dict, job_id: str, status: str): + def publish_job_status(self, feedback_data: dict, job_id: str, status: str, return_info: Optional[dict] = None): if self.mqtt_disable: return - jobdata = {"job_id": job_id, "data": feedback_data, "status": status} + if return_info is None: + return_info = {} + jobdata = {"job_id": job_id, "data": feedback_data, "status": status, "return_info": return_info} self.client.publish(f"labs/{MQConfig.lab_id}/job/list/", json.dumps(jobdata), qos=2) def publish_registry(self, device_id: str, device_info: dict): diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index da5d0696..648859f6 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -30,18 +30,18 @@ class HTTPClient: self.auth = MQConfig.lab_id info(f"HTTPClient 初始化完成: remote_addr={self.remote_addr}") - def resource_add(self, resources: List[Dict[str, Any]]) -> requests.Response: + def resource_add(self, resources: List[Dict[str, Any]], database_process_later:bool) -> requests.Response: """ 添加资源 Args: resources: 要添加的资源列表 - + database_process_later: 后台处理资源 Returns: Response: API响应对象 """ response = requests.post( - f"{self.remote_addr}/lab/resource/", + f"{self.remote_addr}/lab/resource/?database_process_later={1 if database_process_later else 0}", json=resources, headers={"Authorization": f"lab {self.auth}"}, timeout=5, diff --git a/unilabos/devices/liquid_handling/biomek.py b/unilabos/devices/liquid_handling/biomek.py index 61b2d936..ea7a9e9c 100644 --- a/unilabos/devices/liquid_handling/biomek.py +++ b/unilabos/devices/liquid_handling/biomek.py @@ -693,224 +693,220 @@ if __name__ == "__main__": } ''' - - -labware_with_liquid = ''' - [ - { - "id": "Tip Rack BC230 on TL1", - "parent": "deck", - "slot_on_deck": "TL1", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "Tip Rack BC230 on TL2", - "parent": "deck", - "slot_on_deck": "TL2", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "Tip Rack BC230 on TL3", - "parent": "deck", - "slot_on_deck": "TL3", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "Tip Rack BC230 on TL4", - "parent": "deck", - "slot_on_deck": "TL4", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "Tip Rack BC230 on TL5", - "parent": "deck", - "slot_on_deck": "TL5", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "Tip Rack BC230 on P5", - "parent": "deck", - "slot_on_deck": "P5", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "Tip Rack BC230 on P6", - "parent": "deck", - "slot_on_deck": "P6", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "Tip Rack BC230 on P15", - "parent": "deck", - "slot_on_deck": "P15", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "Tip Rack BC230 on P16", - "parent": "deck", - "slot_on_deck": "P16", - "class_name": "BC230", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "stock plate on P1", - "parent": "deck", - "slot_on_deck": "P1", - "class_name": "nest_12_reservoir_15ml", - "liquid_type": [ - "master_mix" - ], - "liquid_volume": [10000], - "liquid_input_wells": [ - "A1" - ] - }, - { - "id": "stock plate on P2", - "parent": "deck", - "slot_on_deck": "P2", - "class_name": "nest_12_reservoir_15ml", - "liquid_type": [ - "bind beads" - ], - "liquid_volume": [10000], - "liquid_input_wells": [ - "A1" - ] - }, - { - "id": "stock plate on P3", - "parent": "deck", - "slot_on_deck": "P3", - "class_name": "nest_12_reservoir_15ml", - "liquid_type": [ - "ethyl alcohol" - ], - "liquid_volume": [10000], - "liquid_input_wells": [ - "A1" - ] - }, - { - "id": "elution buffer on P4", - "parent": "deck", - "slot_on_deck": "P4", - "class_name": "nest_12_reservoir_15ml", - "liquid_type": [ - "elution buffer" - ], - "liquid_volume": [5000], - "liquid_input_wells": [ - "A1" - ] - }, - { - "id": "oscillation", - "parent": "deck", - "slot_on_deck": "Orbital1", - "class_name": "Orbital", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "working plate on P11", - "parent": "deck", - "slot_on_deck": "P11", - "class_name": "NEST 2ml Deep Well Plate", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "magnetics module on P12", - "parent": "deck", - "slot_on_deck": "P12", - "class_name": "magnetics module", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "working plate on P13", - "parent": "deck", - "slot_on_deck": "P13", - "class_name": "NEST 2ml Deep Well Plate", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - }, - { - "id": "waste on P22", - "parent": "deck", - "slot_on_deck": "P22", - "class_name": "nest_1_reservoir_195ml", - "liquid_type": [], - "liquid_volume": [], - "liquid_input_wells": [] - } - ] - ''' - - - -handler = LiquidHandlerBiomek() - -handler.temp_protocol = { - "meta": {}, - "labwares": [], - "steps": [] -} - -input_steps = json.loads(steps_info) -labwares = json.loads(labware_with_liquid) - -for step in input_steps['steps']: - operation = step['operation'] - parameters = step['parameters'] - - if operation == 'transfer': - handler.transfer_biomek(source=parameters['source'], - target=parameters['target'], - volume=parameters['volume'], - tip_rack=parameters['tip_rack'], - aspirate_techniques='MC P300 high', - dispense_techniques='MC P300 high') - elif operation == 'move_labware': - handler.move_biomek(source=parameters['source'], - target=parameters['target']) - elif operation == 'oscillation': - handler.oscillation_biomek(rpm=parameters['rpm'], - time=parameters['time']) - elif operation == 'incubation': - handler.incubation_biomek(time=parameters['time']) - -print(json.dumps(handler.temp_protocol, indent=4)) + labware_with_liquid = ''' + [ + { + "id": "Tip Rack BC230 on TL1", + "parent": "deck", + "slot_on_deck": "TL1", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "Tip Rack BC230 on TL2", + "parent": "deck", + "slot_on_deck": "TL2", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "Tip Rack BC230 on TL3", + "parent": "deck", + "slot_on_deck": "TL3", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "Tip Rack BC230 on TL4", + "parent": "deck", + "slot_on_deck": "TL4", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "Tip Rack BC230 on TL5", + "parent": "deck", + "slot_on_deck": "TL5", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "Tip Rack BC230 on P5", + "parent": "deck", + "slot_on_deck": "P5", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "Tip Rack BC230 on P6", + "parent": "deck", + "slot_on_deck": "P6", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "Tip Rack BC230 on P15", + "parent": "deck", + "slot_on_deck": "P15", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "Tip Rack BC230 on P16", + "parent": "deck", + "slot_on_deck": "P16", + "class_name": "BC230", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "stock plate on P1", + "parent": "deck", + "slot_on_deck": "P1", + "class_name": "nest_12_reservoir_15ml", + "liquid_type": [ + "master_mix" + ], + "liquid_volume": [10000], + "liquid_input_wells": [ + "A1" + ] + }, + { + "id": "stock plate on P2", + "parent": "deck", + "slot_on_deck": "P2", + "class_name": "nest_12_reservoir_15ml", + "liquid_type": [ + "bind beads" + ], + "liquid_volume": [10000], + "liquid_input_wells": [ + "A1" + ] + }, + { + "id": "stock plate on P3", + "parent": "deck", + "slot_on_deck": "P3", + "class_name": "nest_12_reservoir_15ml", + "liquid_type": [ + "ethyl alcohol" + ], + "liquid_volume": [10000], + "liquid_input_wells": [ + "A1" + ] + }, + { + "id": "elution buffer on P4", + "parent": "deck", + "slot_on_deck": "P4", + "class_name": "nest_12_reservoir_15ml", + "liquid_type": [ + "elution buffer" + ], + "liquid_volume": [5000], + "liquid_input_wells": [ + "A1" + ] + }, + { + "id": "oscillation", + "parent": "deck", + "slot_on_deck": "Orbital1", + "class_name": "Orbital", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "working plate on P11", + "parent": "deck", + "slot_on_deck": "P11", + "class_name": "NEST 2ml Deep Well Plate", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "magnetics module on P12", + "parent": "deck", + "slot_on_deck": "P12", + "class_name": "magnetics module", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "working plate on P13", + "parent": "deck", + "slot_on_deck": "P13", + "class_name": "NEST 2ml Deep Well Plate", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + }, + { + "id": "waste on P22", + "parent": "deck", + "slot_on_deck": "P22", + "class_name": "nest_1_reservoir_195ml", + "liquid_type": [], + "liquid_volume": [], + "liquid_input_wells": [] + } + ] + ''' + # 创建Biomek液体处理器实例 + handler = LiquidHandlerBiomek() + + handler.temp_protocol = { + "meta": {}, + "labwares": [], + "steps": [] + } + + input_steps = json.loads(steps_info) + labwares = json.loads(labware_with_liquid) + + for step in input_steps['steps']: + operation = step['operation'] + parameters = step['parameters'] + + if operation == 'transfer': + handler.transfer_biomek(source=parameters['source'], + target=parameters['target'], + volume=parameters['volume'], + tip_rack=parameters['tip_rack'], + aspirate_techniques='MC P300 high', + dispense_techniques='MC P300 high') + elif operation == 'move_labware': + handler.move_biomek(source=parameters['source'], + target=parameters['target']) + elif operation == 'oscillation': + handler.oscillation_biomek(rpm=parameters['rpm'], + time=parameters['time']) + elif operation == 'incubation': + handler.incubation_biomek(time=parameters['time']) + + print(json.dumps(handler.temp_protocol, indent=4)) diff --git a/unilabos/registry/devices/liquid_handler.yaml b/unilabos/registry/devices/liquid_handler.yaml index eef04042..6851f285 100644 --- a/unilabos/registry/devices/liquid_handler.yaml +++ b/unilabos/registry/devices/liquid_handler.yaml @@ -293,7 +293,7 @@ liquid_handler.biomek: description: Biomek液体处理器设备,基于pylabrobot控制 icon: icon_yiyezhan.webp class: - module: unilabos.devices.liquid_handling.biomek:LiquidHandlerBiomek + module: unilabos.devices.liquid_handling.biomek_test:LiquidHandlerBiomek type: python status_types: {} action_value_mappings: diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index e7991303..e75413f1 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -1,5 +1,4 @@ import copy -import functools import json import threading import time @@ -20,16 +19,29 @@ from rclpy.service import Service from unilabos_msgs.action import SendCmd from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response -from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type, resource_ulab_to_plr, \ - initialize_resources, list_to_nested_dict, dict_to_tree, resource_plr_to_ulab, tree_to_list +from unilabos.resources.graphio import ( + convert_resources_to_type, + convert_resources_from_type, + resource_ulab_to_plr, + initialize_resources, + dict_to_tree, + resource_plr_to_ulab, + tree_to_list, +) from unilabos.ros.msgs.message_converter import ( convert_to_ros_msg, convert_from_ros_msg, convert_from_ros_msg_with_mapping, - convert_to_ros_msg_with_mapping, ros_action_to_json_schema, + convert_to_ros_msg_with_mapping, ) -from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \ - SerialCommand # type: ignore +from unilabos_msgs.srv import ( + ResourceAdd, + ResourceGet, + ResourceDelete, + ResourceUpdate, + ResourceList, + SerialCommand, +) # type: ignore from unilabos_msgs.msg import Resource # type: ignore from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker @@ -37,7 +49,7 @@ from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.utils.async_util import run_async_func from unilabos.utils.log import info, debug, warning, error, critical, logger -from unilabos.utils.type_check import get_type_class, TypeEncoder +from unilabos.utils.type_check import get_type_class, TypeEncoder, serialize_result_info T = TypeVar("T") @@ -292,7 +304,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.create_ros_action_server(action_name, action_value_mapping) # 创建线程池执行器 - self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}") + self._executor = ThreadPoolExecutor( + max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}" + ) # 创建资源管理客户端 self._resource_clients: Dict[str, Client] = { @@ -334,7 +348,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): other_calling_param["slot"] = slot # 本地拿到这个物料,可能需要先做初始化? if isinstance(resources, list): - if len(resources) == 1 and isinstance(resources[0], list) and not initialize_full: # 取消,不存在的情况 + if ( + len(resources) == 1 and isinstance(resources[0], list) and not initialize_full + ): # 取消,不存在的情况 # 预先initialize过,以整组的形式传入 request.resources = [convert_to_ros_msg(Resource, resource_) for resource_ in resources[0]] elif initialize_full: @@ -373,6 +389,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): from pylabrobot.resources import Coordinate from pylabrobot.resources import OTDeck from pylabrobot.resources import Plate + contain_model = not isinstance(resource, Deck) if isinstance(resource, ResourcePLR): # resources.list() @@ -380,25 +397,38 @@ class BaseROS2DeviceNode(Node, Generic[T]): plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model) if isinstance(plr_instance, Plate): empty_liquid_info_in = [(None, 0)] * plr_instance.num_items - for liquid_type, liquid_volume, liquid_input_slot in zip(ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT): + for liquid_type, liquid_volume, liquid_input_slot in zip( + ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT + ): empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume) plr_instance.set_well_liquids(empty_liquid_info_in) if isinstance(resource, OTDeck) and "slot" in other_calling_param: resource.assign_child_at_slot(plr_instance, **other_calling_param) else: _discard_slot = other_calling_param.pop("slot", -1) - resource.assign_child_resource(plr_instance, Coordinate(location["x"], location["y"], location["z"]), **other_calling_param) - request2.resources = [convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)])] + resource.assign_child_resource( + plr_instance, + Coordinate(location["x"], location["y"], location["z"]), + **other_calling_param, + ) + request2.resources = [ + convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)]) + ] rclient2.call(request2) # 发送给ResourceMeshManager action_client = ActionClient( - self, SendCmd, "/devices/resource_mesh_manager/add_resource_mesh", callback_group=self.callback_group + self, + SendCmd, + "/devices/resource_mesh_manager/add_resource_mesh", + callback_group=self.callback_group, ) goal = SendCmd.Goal() - goal.command = json.dumps({ - "resources": resources, - "bind_parent_id": bind_parent_id, - }) + goal.command = json.dumps( + { + "resources": resources, + "bind_parent_id": bind_parent_id, + } + ) future = action_client.send_goal_async(goal, goal_uuid=uuid.uuid4()) def done_cb(*args): @@ -415,10 +445,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): # noinspection PyTypeChecker self._service_server: Dict[str, Service] = { "query_host_name": self.create_service( - SerialCommand, f"/srv{self.namespace}/query_host_name", query_host_name_cb, callback_group=self.callback_group + SerialCommand, + f"/srv{self.namespace}/query_host_name", + query_host_name_cb, + callback_group=self.callback_group, ), "append_resource": self.create_service( - SerialCommand, f"/srv{self.namespace}/append_resource", append_resource, callback_group=self.callback_group + SerialCommand, + f"/srv{self.namespace}/append_resource", + append_resource, + callback_group=self.callback_group, ), } @@ -447,6 +483,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): registered_devices[self.device_id] = device_info from unilabos.config.config import BasicConfig from unilabos.ros.nodes.presets.host_node import HostNode + if not BasicConfig.is_host_mode: sclient = self.create_client(SerialCommand, "/node_info_update") # 启动线程执行发送任务 @@ -454,7 +491,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): target=self.send_slave_node_info, args=(sclient,), daemon=True, - name=f"ROSDevice{self.device_id}_send_slave_node_info" + name=f"ROSDevice{self.device_id}_send_slave_node_info", ).start() else: host_node = HostNode.get_instance(0) @@ -465,12 +502,18 @@ class BaseROS2DeviceNode(Node, Generic[T]): sclient.wait_for_service() request = SerialCommand.Request() from unilabos.config.config import BasicConfig - request.command = json.dumps({ - "SYNC_SLAVE_NODE_INFO": { - "machine_name": BasicConfig.machine_name, - "type": "slave", - "edge_device_id": self.device_id - }}, ensure_ascii=False, cls=TypeEncoder) + + request.command = json.dumps( + { + "SYNC_SLAVE_NODE_INFO": { + "machine_name": BasicConfig.machine_name, + "type": "slave", + "edge_device_id": self.device_id, + } + }, + ensure_ascii=False, + cls=TypeEncoder, + ) # 发送异步请求并等待结果 future = sclient.call_async(request) @@ -543,6 +586,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): """创建动作执行回调函数""" async def execute_callback(goal_handle: ServerGoalHandle): + # 初始化结果信息变量 + execution_error = "" + execution_success = False + action_return_value = None + self.lab_logger().info(f"执行动作: {action_name}") goal = goal_handle.request @@ -582,7 +630,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): current_resources.extend(response.resources) else: r = ResourceGet.Request() - r.id = action_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else action_kwargs[k][0]["id"] + r.id = ( + action_kwargs[k]["id"] + if v == "unilabos_msgs/Resource" + else action_kwargs[k][0]["id"] + ) r.with_children = True response = await self._resource_clients["resource_get"].call_async(r) current_resources.extend(response.resources) @@ -605,7 +657,17 @@ class BaseROS2DeviceNode(Node, Generic[T]): if asyncio.iscoroutinefunction(ACTION): try: self.lab_logger().info(f"异步执行动作 {ACTION}") - future = ROS2DeviceNode.run_async_func(ACTION, **action_kwargs) + future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) + + def _handle_future_exception(fut): + try: + fut.result() + except Exception as e: + execution_error = traceback.format_exc() + error(f"异步任务 {ACTION.__name__} 报错了") + error(traceback.format_exc()) + + future.add_done_callback(_handle_future_exception) except Exception as e: self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}") raise e @@ -617,6 +679,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: fut.result() except Exception as e: + execution_error = traceback.format_exc() error(f"同步任务 {ACTION.__name__} 报错了") error(traceback.format_exc()) @@ -693,7 +756,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): goal_handle.succeed() self.lab_logger().info(f"设置动作成功: {action_name}") - result_values = {} + result_values = { + "return_value": serialize_result_info(execution_error, execution_success, action_return_value) + } for msg_name, attr_name in action_value_mapping["result"].items(): if hasattr(self.driver_instance, f"get_{attr_name}"): result_values[msg_name] = getattr(self.driver_instance, f"get_{attr_name}")() @@ -752,8 +817,8 @@ class ROS2DeviceNode: return cls._loop @classmethod - def run_async_func(cls, func, **kwargs): - return run_async_func(func, loop=cls._loop, **kwargs) + def run_async_func(cls, func, trace_error=True, **kwargs): + return run_async_func(func, loop=cls._loop, trace_error=trace_error, **kwargs) @property def driver_instance(self): @@ -805,9 +870,11 @@ class ROS2DeviceNode: self.resource_tracker = DeviceNodeResourceTracker() # use_pylabrobot_creator 使用 cls的包路径检测 - use_pylabrobot_creator = (driver_class.__module__.startswith("pylabrobot") - or driver_class.__name__ == "LiquidHandlerAbstract" - or driver_class.__name__ == "LiquidHandlerBiomek") + use_pylabrobot_creator = ( + driver_class.__module__.startswith("pylabrobot") + or driver_class.__name__ == "LiquidHandlerAbstract" + or driver_class.__name__ == "LiquidHandlerBiomek" + ) # TODO: 要在创建之前预先请求服务器是否有当前id的物料,放到resource_tracker中,让pylabrobot进行创建 # 创建设备类实例 diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index f4284629..ece98857 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -151,7 +151,7 @@ class HostNode(BaseROS2DeviceNode): mqtt_client.publish_registry(device_info["id"], device_info) for resource_info in lab_registry.obtain_registry_resource_info(): mqtt_client.publish_registry(resource_info["id"], resource_info) - + time.sleep(1) # 等待MQTT连接稳定 # 首次发现网络中的设备 self._discover_devices() @@ -204,9 +204,11 @@ class HostNode(BaseROS2DeviceNode): for bridge in self.bridges: if hasattr(bridge, "resource_add"): resource_start_time = time.time() - resource_add_res = bridge.resource_add(add_schema(resource_with_parent_name)) + resource_add_res = bridge.resource_add(add_schema(resource_with_parent_name), True) resource_end_time = time.time() - self.lab_logger().info(f"[Host Node-Resource] Adding resources to bridge. {round(resource_start_time - resource_end_time, 5)} seconds") + self.lab_logger().info( + f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" + ) except Exception as ex: self.lab_logger().error("[Host Node-Resource] 添加物料出错!") self.lab_logger().error(traceback.format_exc()) @@ -610,6 +612,8 @@ class HostNode(BaseROS2DeviceNode): def get_result_callback(self, action_id: str, uuid_str: Optional[str], future) -> None: """获取结果回调""" + import json + result_msg = future.result().result result_data = convert_from_ros_msg(result_msg) self.lab_logger().info(f"[Host Node] Result for {action_id} ({uuid_str}): success") @@ -618,7 +622,7 @@ class HostNode(BaseROS2DeviceNode): if uuid_str: for bridge in self.bridges: if hasattr(bridge, "publish_job_status"): - bridge.publish_job_status(result_data, uuid_str, "success") + bridge.publish_job_status(result_data, uuid_str, "success", result_data) def cancel_goal(self, goal_uuid: str) -> None: """取消目标""" diff --git a/unilabos/utils/async_util.py b/unilabos/utils/async_util.py index ce97f5a1..0f50a730 100644 --- a/unilabos/utils/async_util.py +++ b/unilabos/utils/async_util.py @@ -5,7 +5,7 @@ from asyncio import get_event_loop from unilabos.utils.log import error -def run_async_func(func, *, loop=None, **kwargs): +def run_async_func(func, *, loop=None, trace_error=True, **kwargs): if loop is None: loop = get_event_loop() @@ -17,5 +17,6 @@ def run_async_func(func, *, loop=None, **kwargs): error(traceback.format_exc()) future = asyncio.run_coroutine_threadsafe(func(**kwargs), loop) - future.add_done_callback(_handle_future_exception) - return future \ No newline at end of file + if trace_error: + future.add_done_callback(_handle_future_exception) + return future diff --git a/unilabos/utils/type_check.py b/unilabos/utils/type_check.py index 7366652b..578eb93d 100644 --- a/unilabos/utils/type_check.py +++ b/unilabos/utils/type_check.py @@ -1,4 +1,4 @@ -import collections +import collections.abc import json from typing import get_origin, get_args @@ -21,3 +21,46 @@ class TypeEncoder(json.JSONEncoder): return str(obj)[8:-2] return super().default(obj) + +class ResultInfoEncoder(json.JSONEncoder): + """专门用于处理任务执行结果信息的JSON编码器""" + + def default(self, obj): + # 优先处理类型对象 + if isinstance(obj, type): + return str(obj)[8:-2] + + # 对于无法序列化的对象,统一转换为字符串 + try: + # 尝试调用 __dict__ 或者其他序列化方法 + if hasattr(obj, "__dict__"): + return obj.__dict__ + elif hasattr(obj, "_asdict"): # namedtuple + return obj._asdict() + elif hasattr(obj, "to_dict"): + return obj.to_dict() + elif hasattr(obj, "dict"): + return obj.dict() + else: + # 如果都不行,转换为字符串 + return str(obj) + except Exception: + # 如果转换失败,直接返回字符串表示 + return str(obj) + + +def serialize_result_info(error: str, suc: bool, return_value=None) -> str: + """ + 序列化任务执行结果信息 + + Args: + error: 错误信息字符串 + suc: 是否成功的布尔值 + return_value: 返回值,可以是任何类型 + + Returns: + JSON字符串格式的结果信息 + """ + result_info = {"error": error, "suc": suc, "return_value": return_value} + + return json.dumps(result_info, ensure_ascii=False, cls=ResultInfoEncoder)