Merge remote-tracking branch 'upstream/dev' into device_visualization

This commit is contained in:
zhangshixiang
2025-07-19 16:56:30 +08:00
133 changed files with 240551 additions and 7286 deletions

View File

@@ -0,0 +1,61 @@
import rclpy
from rclpy.node import Node
import cv2
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker
class VideoPublisher(BaseROS2DeviceNode):
def __init__(self, device_id='video_publisher', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None):
# 初始化BaseROS2DeviceNode使用自身作为driver_instance
BaseROS2DeviceNode.__init__(
self,
driver_instance=self,
device_id=device_id,
status_types={},
action_value_mappings={},
hardware_interface="camera",
print_publish=False,
resource_tracker=resource_tracker,
)
# 创建一个发布者,发布到 /video 话题,消息类型为 sensor_msgs/Image队列长度设为 10
self.publisher_ = self.create_publisher(Image, f'/{device_id}/video', 10)
# 初始化摄像头(默认设备索引为 0
self.cap = cv2.VideoCapture(camera_index)
if not self.cap.isOpened():
self.get_logger().error("无法打开摄像头")
# 用于将 OpenCV 的图像转换为 ROS 图像消息
self.bridge = CvBridge()
# 设置定时器10 Hz 发布一次
timer_period = period # 单位:秒
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
ret, frame = self.cap.read()
if not ret:
self.get_logger().error("读取视频帧失败")
return
# 将 OpenCV 图像转换为 ROS Image 消息,注意图像编码需与摄像头数据匹配,这里使用 bgr8
img_msg = self.bridge.cv2_to_imgmsg(frame, encoding="bgr8")
self.publisher_.publish(img_msg)
# self.get_logger().info("已发布视频帧")
def destroy_node(self):
# 释放摄像头资源
if self.cap.isOpened():
self.cap.release()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = VideoPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -459,6 +459,8 @@ class HostNode(BaseROS2DeviceNode):
self.devices_instances[device_id] = d
# noinspection PyProtectedMember
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith("UniLabJsonCommand"):
continue
action_id = f"/devices/{device_id}/{action_name}"
if action_id not in self._action_clients:
action_type = action_value_mapping["type"]
@@ -561,12 +563,13 @@ class HostNode(BaseROS2DeviceNode):
if hasattr(bridge, "publish_device_status"):
bridge.publish_device_status(self.device_status, device_id, property_name)
self.lab_logger().debug(
f"[Host Node] Status updated: {device_id}.{property_name} = {msg.data}"
f"[Host Node] Status updated: {device_id}.{property_name} = {msg.data}"
)
def send_goal(
self,
device_id: str,
action_type: str,
action_name: str,
action_kwargs: Dict[str, Any],
goal_uuid: Optional[str] = None,
@@ -577,16 +580,30 @@ class HostNode(BaseROS2DeviceNode):
Args:
device_id: 设备ID
action_type: 动作类型
action_name: 动作名称
action_kwargs: 动作参数
goal_uuid: 目标UUID如果为None则自动生成
server_info: 服务器发送信息,包含发送时间戳等
"""
action_id = f"/devices/{device_id}/{action_name}"
if action_type.startswith("UniLabJsonCommand"):
if action_name.startswith("auto-"):
action_name = action_name[5:]
action_id = f"/devices/{device_id}/_execute_driver_command"
action_kwargs = {
"string": json.dumps({
"function_name": action_name,
"function_args": action_kwargs,
})
}
if action_type.startswith("UniLabJsonCommandAsync"):
action_id = f"/devices/{device_id}/_execute_driver_command_async"
else:
action_id = f"/devices/{device_id}/{action_name}"
if action_name == "test_latency" and server_info is not None:
self.server_latest_timestamp = server_info.get("send_timestamp", 0.0)
if action_id not in self._action_clients:
self.lab_logger().error(f"[Host Node] ActionClient {action_id} not found.")
return
raise ValueError(f"ActionClient {action_id} not found.")
action_client: ActionClient = self._action_clients[action_id]

View File

@@ -1,7 +1,5 @@
import time
import asyncio
import traceback
from types import MethodType
from typing import Union
import rclpy
@@ -19,9 +17,11 @@ from unilabos.ros.msgs.message_converter import (
get_action_type,
convert_to_ros_msg,
convert_from_ros_msg,
convert_from_ros_msg_with_mapping,
convert_from_ros_msg_with_mapping, String,
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode
from unilabos.utils.log import error
from unilabos.utils.type_check import serialize_result_info
class ROS2ProtocolNode(BaseROS2DeviceNode):
@@ -33,7 +33,15 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
# create_action_server = False # Action Server要自己创建
def __init__(self, device_id: str, children: dict, protocol_type: Union[str, list[str]], resource_tracker: DeviceNodeResourceTracker, *args, **kwargs):
def __init__(
self,
device_id: str,
children: dict,
protocol_type: Union[str, list[str]],
resource_tracker: DeviceNodeResourceTracker,
*args,
**kwargs,
):
self._setup_protocol_names(protocol_type)
# 初始化其它属性
@@ -60,12 +68,14 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
for device_id, device_config in self.children.items():
if device_config.get("type", "device") != "device":
self.lab_logger().debug(f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping.")
self.lab_logger().debug(
f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping."
)
continue
try:
d = self.initialize_device(device_id, device_config)
except Exception as ex:
self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}")
self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}")
d = None
if d is None:
continue
@@ -76,22 +86,27 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
# 设置硬件接口代理
if d:
hardware_interface = d.ros_node_instance._hardware_interface
if (
hasattr(d.driver_instance, d.ros_node_instance._hardware_interface["name"])
and hasattr(d.driver_instance, d.ros_node_instance._hardware_interface["write"])
and (d.ros_node_instance._hardware_interface["read"] is None or hasattr(d.driver_instance, d.ros_node_instance._hardware_interface["read"]))
hasattr(d.driver_instance, hardware_interface["name"])
and hasattr(d.driver_instance, hardware_interface["write"])
and (hardware_interface["read"] is None or hasattr(d.driver_instance, hardware_interface["read"]))
):
name = getattr(d.driver_instance, d.ros_node_instance._hardware_interface["name"])
read = d.ros_node_instance._hardware_interface.get("read", None)
write = d.ros_node_instance._hardware_interface.get("write", None)
name = getattr(d.driver_instance, hardware_interface["name"])
read = hardware_interface.get("read", None)
write = hardware_interface.get("write", None)
# 如果硬件接口是字符串,通过通信设备提供
if isinstance(name, str) and name in self.sub_devices:
communicate_device = self.sub_devices[name]
communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface
self._setup_hardware_proxy(d, self.sub_devices[name], read, write)
self.lab_logger().info(f"\n通信代理:为子设备{device_id}\n 添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n 添加了{write}方法(来源:{name} {communicate_hardware_info['read']})")
self.lab_logger().info(
f"\n通信代理:为子设备{device_id}\n "
f"添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n "
f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})"
)
def _setup_protocol_names(self, protocol_type):
# 处理协议类型
@@ -119,11 +134,17 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
if d is not None and hasattr(d, "ros_node_instance"):
node = d.ros_node_instance
for action_name, action_mapping in node._action_value_mappings.items():
if action_name.startswith("auto-") or str(action_mapping.get("type", "")).startswith("UniLabJsonCommand"):
continue
action_id = f"/devices/{device_id_abs}/{action_name}"
if action_id not in self._action_clients:
self._action_clients[action_id] = ActionClient(
self, action_mapping["type"], action_id, callback_group=self.callback_group
)
try:
self._action_clients[action_id] = ActionClient(
self, action_mapping["type"], action_id, callback_group=self.callback_group
)
except Exception as ex:
self.lab_logger().error(f"创建动作客户端失败: {action_id}, 错误: {ex}")
continue
self.lab_logger().debug(f"为子设备 {device_id} 创建动作客户端: {action_name}")
return d
@@ -149,63 +170,135 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
def _create_protocol_execute_callback(self, protocol_name, protocol_steps_generator):
async def execute_protocol(goal_handle: ServerGoalHandle):
"""执行完整的工作流"""
self.get_logger().info(f'Executing {protocol_name} action...')
# 初始化结果信息变量
execution_error = ""
execution_success = False
protocol_return_value = None
self.get_logger().info(f"Executing {protocol_name} action...")
action_value_mapping = self._action_value_mappings[protocol_name]
print('+'*30)
print(protocol_steps_generator)
# 从目标消息中提取参数, 并调用Protocol生成器(根据设备连接图)生成action步骤
goal = goal_handle.request
protocol_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"])
try:
print("+" * 30)
print(protocol_steps_generator)
# 从目标消息中提取参数, 并调用Protocol生成器(根据设备连接图)生成action步骤
goal = goal_handle.request
protocol_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"])
# # 🔧 添加调试信息
# print(f"🔍 转换后的 protocol_kwargs: {protocol_kwargs}")
# print(f"🔍 vessel 在转换后: {protocol_kwargs.get('vessel', 'NOT_FOUND')}")
# 向Host查询物料当前状态
for k, v in goal.get_fields_and_field_types().items():
if v in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
r = ResourceGet.Request()
r.id = protocol_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else protocol_kwargs[k][0]["id"]
r.with_children = True
response = await self._resource_clients["resource_get"].call_async(r)
protocol_kwargs[k] = list_to_nested_dict([convert_from_ros_msg(rs) for rs in response.resources])
# # 🔧 完全禁用Host查询直接使用转换后的数据
# print(f"🔧 跳过Host查询直接使用转换后的数据")
# 向Host查询物料当前状态
for k, v in goal.get_fields_and_field_types().items():
if v in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
r = ResourceGet.Request()
resource_id = (
protocol_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else protocol_kwargs[k][0]["id"]
)
r.id = resource_id
r.with_children = True
response = await self._resource_clients["resource_get"].call_async(r)
protocol_kwargs[k] = list_to_nested_dict(
[convert_from_ros_msg(rs) for rs in response.resources]
)
from unilabos.resources.graphio import physical_setup_graph
self.get_logger().info(f'Working on physical setup: {physical_setup_graph}')
protocol_steps = protocol_steps_generator(G=physical_setup_graph, **protocol_kwargs)
self.lab_logger().info(f"🔍 最终传递给协议的 protocol_kwargs: {protocol_kwargs}")
self.lab_logger().info(f"🔍 最终的 vessel: {protocol_kwargs.get('vessel', 'NOT_FOUND')}")
self.get_logger().info(f'Goal received: {protocol_kwargs}, running steps: \n{protocol_steps}')
from unilabos.resources.graphio import physical_setup_graph
time_start = time.time()
time_overall = 100
self._busy = True
self.lab_logger().info(f"Working on physical setup: {physical_setup_graph}")
protocol_steps = protocol_steps_generator(G=physical_setup_graph, **protocol_kwargs)
self.lab_logger().info(f"Goal received: {protocol_kwargs}, running steps: \n{protocol_steps}")
# 逐步执行工作流
for i, action in enumerate(protocol_steps):
self.get_logger().info(f'Running step {i+1}: {action}')
if type(action) == dict:
# 如果是单个动作,直接执行
if action["action_name"] == "wait":
time.sleep(action["action_kwargs"]["time"])
else:
result = await self.execute_single_action(**action)
elif type(action) == list:
# 如果是并行动作,同时执行
actions = action
futures = [rclpy.get_global_executor().create_task(self.execute_single_action(**a)) for a in actions]
results = [await f for f in futures]
time_start = time.time()
time_overall = 100
self._busy = True
# 向Host更新物料当前状态
for k, v in goal.get_fields_and_field_types().items():
if v in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
r = ResourceUpdate.Request()
r.resources = [
convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k])
]
response = await self._resource_clients["resource_update"].call_async(r)
# 逐步执行工作流
step_results = []
for i, action in enumerate(protocol_steps):
# self.get_logger().info(f"Running step {i + 1}: {action}")
if isinstance(action, dict):
# 如果是单个动作,直接执行
if action["action_name"] == "wait":
time.sleep(action["action_kwargs"]["time"])
step_results.append({"step": i + 1, "action": "wait", "result": "completed"})
else:
result = await self.execute_single_action(**action)
step_results.append({"step": i + 1, "action": action["action_name"], "result": result})
elif isinstance(action, list):
# 如果是并行动作,同时执行
actions = action
futures = [
rclpy.get_global_executor().create_task(self.execute_single_action(**a)) for a in actions
]
results = [await f for f in futures]
step_results.append(
{
"step": i + 1,
"parallel_actions": [a["action_name"] for a in actions],
"results": results,
}
)
goal_handle.succeed()
# 向Host更新物料当前状态
for k, v in goal.get_fields_and_field_types().items():
if v in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
r = ResourceUpdate.Request()
r.resources = [
convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k])
]
response = await self._resource_clients["resource_update"].call_async(r)
# 设置成功状态和返回值
execution_success = True
protocol_return_value = {
"protocol_name": protocol_name,
"steps_executed": len(protocol_steps),
"step_results": step_results,
"total_time": time.time() - time_start,
}
goal_handle.succeed()
except Exception as e:
# 捕获并记录错误信息
execution_error = traceback.format_exc()
execution_success = False
error(f"协议 {protocol_name} 执行失败")
error(traceback.format_exc())
self.lab_logger().error(f"协议执行出错: {str(e)}")
# 设置动作失败
goal_handle.abort()
finally:
self._busy = False
# 创建结果消息
result = action_value_mapping["type"].Result()
result.success = True
result.success = execution_success
self._busy = False
# 获取结果消息类型信息检查是否有return_info字段
result_msg_types = action_value_mapping["type"].Result.get_fields_and_field_types()
# 设置return_info字段如果存在
for attr_name in result_msg_types.keys():
if attr_name in ["success", "reached_goal"]:
setattr(result, attr_name, execution_success)
elif attr_name == "return_info":
setattr(
result,
attr_name,
serialize_result_info(execution_error, execution_success, protocol_return_value),
)
self.lab_logger().info(f"🤩🤩🤩🤩🤩🤩协议 {protocol_name} 完成并返回结果😎😎😎😎😎😎")
return result
return execute_protocol
async def execute_single_action(self, device_id, action_name, action_kwargs):
@@ -225,7 +318,7 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
action_client = self._action_clients[action_id]
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
self.lab_logger().info(f"发送动作请求到: {action_id}")
##### self.lab_logger().info(f"发送动作请求到: {action_id}")
action_client.wait_for_server()
# 等待动作完成
@@ -237,18 +330,23 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
return None
result_future = await handle.get_result_async()
self.lab_logger().info(f"动作完成: {action_name}")
##### self.lab_logger().info(f"动作完成: {action_name}")
return result_future.result
"""还没有改过的部分"""
def _setup_hardware_proxy(self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method):
def _setup_hardware_proxy(
self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method
):
"""为设备设置硬件接口代理"""
# extra_info = [getattr(device.driver_instance, info) for info in communication_device.ros_node_instance._hardware_interface.get("extra_info", [])]
write_func = getattr(communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["write"])
read_func = getattr(communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["read"])
write_func = getattr(
communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["write"]
)
read_func = getattr(
communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["read"]
)
def _read(*args, **kwargs):
return read_func(*args, **kwargs)
@@ -264,7 +362,6 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
# bound_write = MethodType(_write, device.driver_instance)
setattr(device.driver_instance, write_method, _write)
async def _update_resources(self, goal, protocol_kwargs):
"""更新资源状态"""
for k, v in goal.get_fields_and_field_types().items():