修正物料上传时间

改用biomek_test
增加ResultInfoEncoder
支持返回结果上传
This commit is contained in:
Xuwznln
2025-06-08 14:43:07 +08:00
parent c0b7f2decd
commit ab0c4b708b
9 changed files with 403 additions and 267 deletions

View File

@@ -1,5 +1,4 @@
import copy
import functools
import json
import threading
import time
@@ -20,16 +19,29 @@ from rclpy.service import Service
from unilabos_msgs.action import SendCmd
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type, resource_ulab_to_plr, \
initialize_resources, list_to_nested_dict, dict_to_tree, resource_plr_to_ulab, tree_to_list
from unilabos.resources.graphio import (
convert_resources_to_type,
convert_resources_from_type,
resource_ulab_to_plr,
initialize_resources,
dict_to_tree,
resource_plr_to_ulab,
tree_to_list,
)
from unilabos.ros.msgs.message_converter import (
convert_to_ros_msg,
convert_from_ros_msg,
convert_from_ros_msg_with_mapping,
convert_to_ros_msg_with_mapping, ros_action_to_json_schema,
convert_to_ros_msg_with_mapping,
)
from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \
SerialCommand # type: ignore
from unilabos_msgs.srv import (
ResourceAdd,
ResourceGet,
ResourceDelete,
ResourceUpdate,
ResourceList,
SerialCommand,
) # type: ignore
from unilabos_msgs.msg import Resource # type: ignore
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker
@@ -37,7 +49,7 @@ from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator
from unilabos.utils.async_util import run_async_func
from unilabos.utils.log import info, debug, warning, error, critical, logger
from unilabos.utils.type_check import get_type_class, TypeEncoder
from unilabos.utils.type_check import get_type_class, TypeEncoder, serialize_result_info
T = TypeVar("T")
@@ -292,7 +304,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.create_ros_action_server(action_name, action_value_mapping)
# 创建线程池执行器
self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}")
self._executor = ThreadPoolExecutor(
max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}"
)
# 创建资源管理客户端
self._resource_clients: Dict[str, Client] = {
@@ -334,7 +348,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
other_calling_param["slot"] = slot
# 本地拿到这个物料,可能需要先做初始化?
if isinstance(resources, list):
if len(resources) == 1 and isinstance(resources[0], list) and not initialize_full: # 取消,不存在的情况
if (
len(resources) == 1 and isinstance(resources[0], list) and not initialize_full
): # 取消,不存在的情况
# 预先initialize过以整组的形式传入
request.resources = [convert_to_ros_msg(Resource, resource_) for resource_ in resources[0]]
elif initialize_full:
@@ -373,6 +389,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
from pylabrobot.resources import Coordinate
from pylabrobot.resources import OTDeck
from pylabrobot.resources import Plate
contain_model = not isinstance(resource, Deck)
if isinstance(resource, ResourcePLR):
# resources.list()
@@ -380,25 +397,38 @@ class BaseROS2DeviceNode(Node, Generic[T]):
plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model)
if isinstance(plr_instance, Plate):
empty_liquid_info_in = [(None, 0)] * plr_instance.num_items
for liquid_type, liquid_volume, liquid_input_slot in zip(ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT):
for liquid_type, liquid_volume, liquid_input_slot in zip(
ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT
):
empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume)
plr_instance.set_well_liquids(empty_liquid_info_in)
if isinstance(resource, OTDeck) and "slot" in other_calling_param:
resource.assign_child_at_slot(plr_instance, **other_calling_param)
else:
_discard_slot = other_calling_param.pop("slot", -1)
resource.assign_child_resource(plr_instance, Coordinate(location["x"], location["y"], location["z"]), **other_calling_param)
request2.resources = [convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)])]
resource.assign_child_resource(
plr_instance,
Coordinate(location["x"], location["y"], location["z"]),
**other_calling_param,
)
request2.resources = [
convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)])
]
rclient2.call(request2)
# 发送给ResourceMeshManager
action_client = ActionClient(
self, SendCmd, "/devices/resource_mesh_manager/add_resource_mesh", callback_group=self.callback_group
self,
SendCmd,
"/devices/resource_mesh_manager/add_resource_mesh",
callback_group=self.callback_group,
)
goal = SendCmd.Goal()
goal.command = json.dumps({
"resources": resources,
"bind_parent_id": bind_parent_id,
})
goal.command = json.dumps(
{
"resources": resources,
"bind_parent_id": bind_parent_id,
}
)
future = action_client.send_goal_async(goal, goal_uuid=uuid.uuid4())
def done_cb(*args):
@@ -415,10 +445,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# noinspection PyTypeChecker
self._service_server: Dict[str, Service] = {
"query_host_name": self.create_service(
SerialCommand, f"/srv{self.namespace}/query_host_name", query_host_name_cb, callback_group=self.callback_group
SerialCommand,
f"/srv{self.namespace}/query_host_name",
query_host_name_cb,
callback_group=self.callback_group,
),
"append_resource": self.create_service(
SerialCommand, f"/srv{self.namespace}/append_resource", append_resource, callback_group=self.callback_group
SerialCommand,
f"/srv{self.namespace}/append_resource",
append_resource,
callback_group=self.callback_group,
),
}
@@ -447,6 +483,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
registered_devices[self.device_id] = device_info
from unilabos.config.config import BasicConfig
from unilabos.ros.nodes.presets.host_node import HostNode
if not BasicConfig.is_host_mode:
sclient = self.create_client(SerialCommand, "/node_info_update")
# 启动线程执行发送任务
@@ -454,7 +491,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
target=self.send_slave_node_info,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_send_slave_node_info"
name=f"ROSDevice{self.device_id}_send_slave_node_info",
).start()
else:
host_node = HostNode.get_instance(0)
@@ -465,12 +502,18 @@ class BaseROS2DeviceNode(Node, Generic[T]):
sclient.wait_for_service()
request = SerialCommand.Request()
from unilabos.config.config import BasicConfig
request.command = json.dumps({
"SYNC_SLAVE_NODE_INFO": {
"machine_name": BasicConfig.machine_name,
"type": "slave",
"edge_device_id": self.device_id
}}, ensure_ascii=False, cls=TypeEncoder)
request.command = json.dumps(
{
"SYNC_SLAVE_NODE_INFO": {
"machine_name": BasicConfig.machine_name,
"type": "slave",
"edge_device_id": self.device_id,
}
},
ensure_ascii=False,
cls=TypeEncoder,
)
# 发送异步请求并等待结果
future = sclient.call_async(request)
@@ -543,6 +586,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"""创建动作执行回调函数"""
async def execute_callback(goal_handle: ServerGoalHandle):
# 初始化结果信息变量
execution_error = ""
execution_success = False
action_return_value = None
self.lab_logger().info(f"执行动作: {action_name}")
goal = goal_handle.request
@@ -582,7 +630,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
current_resources.extend(response.resources)
else:
r = ResourceGet.Request()
r.id = action_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else action_kwargs[k][0]["id"]
r.id = (
action_kwargs[k]["id"]
if v == "unilabos_msgs/Resource"
else action_kwargs[k][0]["id"]
)
r.with_children = True
response = await self._resource_clients["resource_get"].call_async(r)
current_resources.extend(response.resources)
@@ -605,7 +657,17 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if asyncio.iscoroutinefunction(ACTION):
try:
self.lab_logger().info(f"异步执行动作 {ACTION}")
future = ROS2DeviceNode.run_async_func(ACTION, **action_kwargs)
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs)
def _handle_future_exception(fut):
try:
fut.result()
except Exception as e:
execution_error = traceback.format_exc()
error(f"异步任务 {ACTION.__name__} 报错了")
error(traceback.format_exc())
future.add_done_callback(_handle_future_exception)
except Exception as e:
self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}")
raise e
@@ -617,6 +679,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
try:
fut.result()
except Exception as e:
execution_error = traceback.format_exc()
error(f"同步任务 {ACTION.__name__} 报错了")
error(traceback.format_exc())
@@ -693,7 +756,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
goal_handle.succeed()
self.lab_logger().info(f"设置动作成功: {action_name}")
result_values = {}
result_values = {
"return_value": serialize_result_info(execution_error, execution_success, action_return_value)
}
for msg_name, attr_name in action_value_mapping["result"].items():
if hasattr(self.driver_instance, f"get_{attr_name}"):
result_values[msg_name] = getattr(self.driver_instance, f"get_{attr_name}")()
@@ -752,8 +817,8 @@ class ROS2DeviceNode:
return cls._loop
@classmethod
def run_async_func(cls, func, **kwargs):
return run_async_func(func, loop=cls._loop, **kwargs)
def run_async_func(cls, func, trace_error=True, **kwargs):
return run_async_func(func, loop=cls._loop, trace_error=trace_error, **kwargs)
@property
def driver_instance(self):
@@ -805,9 +870,11 @@ class ROS2DeviceNode:
self.resource_tracker = DeviceNodeResourceTracker()
# use_pylabrobot_creator 使用 cls的包路径检测
use_pylabrobot_creator = (driver_class.__module__.startswith("pylabrobot")
or driver_class.__name__ == "LiquidHandlerAbstract"
or driver_class.__name__ == "LiquidHandlerBiomek")
use_pylabrobot_creator = (
driver_class.__module__.startswith("pylabrobot")
or driver_class.__name__ == "LiquidHandlerAbstract"
or driver_class.__name__ == "LiquidHandlerBiomek"
)
# TODO: 要在创建之前预先请求服务器是否有当前id的物料放到resource_tracker中让pylabrobot进行创建
# 创建设备类实例

View File

@@ -151,7 +151,7 @@ class HostNode(BaseROS2DeviceNode):
mqtt_client.publish_registry(device_info["id"], device_info)
for resource_info in lab_registry.obtain_registry_resource_info():
mqtt_client.publish_registry(resource_info["id"], resource_info)
time.sleep(1) # 等待MQTT连接稳定
# 首次发现网络中的设备
self._discover_devices()
@@ -204,9 +204,11 @@ class HostNode(BaseROS2DeviceNode):
for bridge in self.bridges:
if hasattr(bridge, "resource_add"):
resource_start_time = time.time()
resource_add_res = bridge.resource_add(add_schema(resource_with_parent_name))
resource_add_res = bridge.resource_add(add_schema(resource_with_parent_name), True)
resource_end_time = time.time()
self.lab_logger().info(f"[Host Node-Resource] Adding resources to bridge. {round(resource_start_time - resource_end_time, 5)} seconds")
self.lab_logger().info(
f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms"
)
except Exception as ex:
self.lab_logger().error("[Host Node-Resource] 添加物料出错!")
self.lab_logger().error(traceback.format_exc())
@@ -610,6 +612,8 @@ class HostNode(BaseROS2DeviceNode):
def get_result_callback(self, action_id: str, uuid_str: Optional[str], future) -> None:
"""获取结果回调"""
import json
result_msg = future.result().result
result_data = convert_from_ros_msg(result_msg)
self.lab_logger().info(f"[Host Node] Result for {action_id} ({uuid_str}): success")
@@ -618,7 +622,7 @@ class HostNode(BaseROS2DeviceNode):
if uuid_str:
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(result_data, uuid_str, "success")
bridge.publish_job_status(result_data, uuid_str, "success", result_data)
def cancel_goal(self, goal_uuid: str) -> None:
"""取消目标"""