diff --git a/unilabos/app/controler.py b/unilabos/app/controler.py index f58f53a..5d55256 100644 --- a/unilabos/app/controler.py +++ b/unilabos/app/controler.py @@ -31,6 +31,6 @@ def job_add(req: JobAddReq) -> JobData: action_kwargs = {"command": json.dumps(action_kwargs)} elif "command" in action_kwargs: action_kwargs = action_kwargs["command"] - print(f"job_add:{req.device_id} {action_name} {action_kwargs}") - HostNode.get_instance().send_goal(req.device_id, action_name=action_name, action_kwargs=action_kwargs, goal_uuid=req.job_id) + # print(f"job_add:{req.device_id} {action_name} {action_kwargs}") + HostNode.get_instance().send_goal(req.device_id, action_name=action_name, action_kwargs=action_kwargs, goal_uuid=req.job_id, server_info=req.server_info) return JobData(jobId=req.job_id) diff --git a/unilabos/app/model.py b/unilabos/app/model.py index ee7568f..a5b8c78 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -51,8 +51,9 @@ class Resp(BaseModel): class JobAddReq(BaseModel): device_id: str = Field(examples=["Gripper"], description="device id") data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}]) - job_id: str = Field(examples=["sfsfsfeq"], description="goal uuid") - node_id: str = Field(examples=["sfsfsfeq"], description="node uuid") + job_id: str = Field(examples=["job_id"], description="goal uuid") + node_id: str = Field(examples=["node_id"], description="node uuid") + server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info") class JobStepFinishReq(BaseModel): diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index aaade1b..9f87069 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -12,7 +12,7 @@ import tempfile import os from unilabos.config.config import MQConfig -from unilabos.app.controler import devices, job_add +from unilabos.app.controler import job_add from unilabos.app.model import JobAddReq from unilabos.utils import logger from unilabos.utils.type_check import TypeEncoder @@ -43,9 +43,10 @@ class MQTTClient: def _on_connect(self, client, userdata, flags, rc, properties=None): logger.info("[MQTT] Connected with result code " + str(rc)) client.subscribe(f"labs/{MQConfig.lab_id}/job/start/", 0) + client.subscribe(f"labs/{MQConfig.lab_id}/pong/", 0) def _on_message(self, client, userdata, msg) -> None: - logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload)) + # logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload)) try: payload_str = msg.payload.decode("utf-8") payload_json = json.loads(payload_str) @@ -59,6 +60,14 @@ class MQTTClient: job_req = JobAddReq.model_validate(payload_json) data = job_add(job_req) return + elif msg.topic == f"labs/{MQConfig.lab_id}/pong/": + # 处理pong响应,通知HostNode + from unilabos.ros.nodes.presets.host_node import HostNode + + host_instance = HostNode.get_instance(0) + if host_instance: + host_instance.handle_pong_response(payload_json) + return except json.JSONDecodeError as e: logger.error(f"[MQTT] JSON 解析错误: {e}") @@ -175,6 +184,28 @@ class MQTTClient: self.client.publish(address, json.dumps(action_info), qos=2) logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}") + def send_ping(self, ping_id: str, timestamp: float): + """发送ping消息到服务端""" + if self.mqtt_disable: + return + address = f"labs/{MQConfig.lab_id}/ping/" + ping_data = {"ping_id": ping_id, "client_timestamp": timestamp, "type": "ping"} + self.client.publish(address, json.dumps(ping_data), qos=2) + + def setup_pong_subscription(self): + """设置pong消息订阅""" + if self.mqtt_disable: + return + pong_topic = f"labs/{MQConfig.lab_id}/pong/" + self.client.subscribe(pong_topic, 0) + logger.debug(f"Subscribed to pong topic: {pong_topic}") + + def handle_pong(self, pong_data: dict): + """处理pong响应(这个方法会在收到pong消息时被调用)""" + logger.debug(f"Pong received: {pong_data}") + # 这里会被HostNode的ping-pong处理逻辑调用 + pass + mqtt_client = MQTTClient() diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index f3be457..c68e0d8 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -25,6 +25,9 @@ class Registry: self.ResourceCreateFromOuterEasy = self._replace_type_with_class( "ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource" ) + self.EmptyIn = self._replace_type_with_class( + "EmptyIn", "host_node", f"" + ) self.device_type_registry = {} self.resource_type_registry = {} self._setup_called = False # 跟踪setup是否已调用 @@ -38,58 +41,69 @@ class Registry: return from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type - self.device_type_registry.update({ - "host_node": { - "description": "UniLabOS主机节点", - "class": { - "module": "unilabos.ros.nodes.presets.host_node", - "type": "python", - "status_types": {}, - "action_value_mappings": { - "create_resource_detailed": { - "type": self.ResourceCreateFromOuter, - "goal": { - "resources": "resources", - "device_ids": "device_ids", - "bind_parent_ids": "bind_parent_ids", - "bind_locations": "bind_locations", - "other_calling_params": "other_calling_params", + + self.device_type_registry.update( + { + "host_node": { + "description": "UniLabOS主机节点", + "class": { + "module": "unilabos.ros.nodes.presets.host_node", + "type": "python", + "status_types": {}, + "action_value_mappings": { + "create_resource_detailed": { + "type": self.ResourceCreateFromOuter, + "goal": { + "resources": "resources", + "device_ids": "device_ids", + "bind_parent_ids": "bind_parent_ids", + "bind_locations": "bind_locations", + "other_calling_params": "other_calling_params", + }, + "feedback": {}, + "result": {"success": "success"}, + "schema": ros_action_to_json_schema(self.ResourceCreateFromOuter), + "goal_default": yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal)) + ), }, - "feedback": {}, - "result": {"success": "success"}, - "schema": ros_action_to_json_schema(self.ResourceCreateFromOuter), - "goal_default": yaml.safe_load( - io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal)) - ) - }, - "create_resource": { - "type": self.ResourceCreateFromOuterEasy, - "goal": { - "res_id": "res_id", - "class_name": "class_name", - "parent": "parent", - "device_id": "device_id", - "bind_locations": "bind_locations", - "liquid_input_slot": "liquid_input_slot[]", - "liquid_type": "liquid_type[]", - "liquid_volume": "liquid_volume[]", - "slot_on_deck": "slot_on_deck", + "create_resource": { + "type": self.ResourceCreateFromOuterEasy, + "goal": { + "res_id": "res_id", + "class_name": "class_name", + "parent": "parent", + "device_id": "device_id", + "bind_locations": "bind_locations", + "liquid_input_slot": "liquid_input_slot[]", + "liquid_type": "liquid_type[]", + "liquid_volume": "liquid_volume[]", + "slot_on_deck": "slot_on_deck", + }, + "feedback": {}, + "result": {"success": "success"}, + "schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy), + "goal_default": yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal)) + ), + }, + "test_latency": { + "type": self.EmptyIn, + "goal": {}, + "feedback": {}, + "result": {"latency_ms": "latency_ms", "time_diff_ms": "time_diff_ms"}, + "schema": ros_action_to_json_schema(self.EmptyIn), + "goal_default": {}, }, - "feedback": {}, - "result": {"success": "success"}, - "schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy), - "goal_default": yaml.safe_load( - io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal)) - ) }, }, - }, - "icon": "icon_device.webp", - "registry_type": "device", - "schema": {"properties": {}, "additionalProperties": False, "type": "object"}, - "file_path": "/", + "icon": "icon_device.webp", + "registry_type": "device", + "schema": {"properties": {}, "additionalProperties": False, "type": "object"}, + "file_path": "/", + } } - }) + ) logger.debug(f"[UniLab Registry] ----------Setup----------") self.registry_paths = [Path(path).absolute() for path in self.registry_paths] for i, path in enumerate(self.registry_paths): diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 5346a47..732e8bb 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -93,6 +93,7 @@ class HostNode(BaseROS2DeviceNode): self.__class__._instance = self # 初始化配置 + self.server_latest_timestamp = 0.0 # self.devices_config = devices_config self.resources_config = resources_config self.physical_setup_graph = physical_setup_graph @@ -122,6 +123,12 @@ class HostNode(BaseROS2DeviceNode): "/devices/host_node/create_resource_detailed", callback_group=self.callback_group, ), + "/devices/host_node/test_latency": ActionClient( + self, + lab_registry.EmptyIn, + "/devices/host_node/test_latency", + callback_group=self.callback_group, + ), } # 用来存储多个ActionClient实例 self._action_value_mappings: Dict[str, Dict] = ( {} @@ -207,6 +214,10 @@ class HostNode(BaseROS2DeviceNode): discovery_interval, self._discovery_devices_callback, callback_group=ReentrantCallbackGroup() ) + # 添加ping-pong相关属性 + self._ping_responses = {} # 存储ping响应 + self._ping_lock = threading.Lock() + self.lab_logger().info("[Host Node] Host node initialized.") HostNode._ready_event.set() @@ -379,8 +390,8 @@ class HostNode(BaseROS2DeviceNode): }, } ) # flatten的格式 - resources = [init_new_res] - device_id = [device_id] + resources = init_new_res # initialize_resource已经返回list[dict] + device_ids = [device_id] bind_parent_id = [parent] bind_location = [bind_locations] other_calling_param = [ @@ -395,7 +406,7 @@ class HostNode(BaseROS2DeviceNode): ) ] - return self.create_resource_detailed(resources, device_id, bind_parent_id, bind_location, other_calling_param) + return self.create_resource_detailed(resources, device_ids, bind_parent_id, bind_location, other_calling_param) def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None: """ @@ -526,7 +537,12 @@ class HostNode(BaseROS2DeviceNode): ) def send_goal( - self, device_id: str, action_name: str, action_kwargs: Dict[str, Any], goal_uuid: Optional[str] = None + self, + device_id: str, + action_name: str, + action_kwargs: Dict[str, Any], + goal_uuid: Optional[str] = None, + server_info: Optional[Dict[str, Any]] = None, ) -> None: """ 向设备发送目标请求 @@ -538,6 +554,8 @@ class HostNode(BaseROS2DeviceNode): goal_uuid: 目标UUID,如果为None则自动生成 """ action_id = f"/devices/{device_id}/{action_name}" + if action_name == "test_latency" and server_info is not None: + self.server_latest_timestamp = server_info.get("send_timestamp", 0.0) if action_id not in self._action_clients: self.lab_logger().error(f"[Host Node] ActionClient {action_id} not found.") return @@ -832,3 +850,148 @@ class HostNode(BaseROS2DeviceNode): # 这里可以实现返回资源列表的逻辑 self.lab_logger().debug(f"[Host Node-Resource] List parameters: {request}") return response + + def test_latency(self): + """ + 测试网络延迟的action实现 + 通过5次ping-pong机制校对时间误差并计算实际延迟 + """ + import time + import uuid as uuid_module + + self.lab_logger().info("=" * 60) + self.lab_logger().info("开始网络延迟测试...") + + # 记录任务开始执行的时间 + task_start_time = time.time() + + # 进行5次ping-pong测试 + ping_results = [] + + for i in range(5): + self.lab_logger().info(f"第{i+1}/5次ping-pong测试...") + + # 生成唯一的ping ID + ping_id = str(uuid_module.uuid4()) + + # 记录发送时间 + send_timestamp = time.time() + + # 发送ping + from unilabos.app.mq import mqtt_client + + mqtt_client.send_ping(ping_id, send_timestamp) + + # 等待pong响应 + timeout = 10.0 + start_wait_time = time.time() + + while time.time() - start_wait_time < timeout: + with self._ping_lock: + if ping_id in self._ping_responses: + pong_data = self._ping_responses.pop(ping_id) + break + time.sleep(0.001) + else: + self.lab_logger().error(f"❌ 第{i+1}次测试超时") + continue + + # 计算本次测试结果 + receive_timestamp = time.time() + client_timestamp = pong_data["client_timestamp"] + server_timestamp = pong_data["server_timestamp"] + + # 往返时间 + rtt_ms = (receive_timestamp - send_timestamp) * 1000 + + # 客户端与服务端时间差(客户端时间 - 服务端时间) + # 假设网络延迟对称,取中间点的服务端时间 + mid_point_time = send_timestamp + (receive_timestamp - send_timestamp) / 2 + time_diff_ms = (mid_point_time - server_timestamp) * 1000 + + ping_results.append({"rtt_ms": rtt_ms, "time_diff_ms": time_diff_ms}) + + self.lab_logger().info(f"✅ 第{i+1}次: 往返时间={rtt_ms:.2f}ms, 时间差={time_diff_ms:.2f}ms") + + time.sleep(0.1) + + if not ping_results: + self.lab_logger().error("❌ 所有ping-pong测试都失败了") + return {"status": "all_timeout"} + + # 统计分析 + rtts = [r["rtt_ms"] for r in ping_results] + time_diffs = [r["time_diff_ms"] for r in ping_results] + + avg_rtt_ms = sum(rtts) / len(rtts) + avg_time_diff_ms = sum(time_diffs) / len(time_diffs) + max_time_diff_error_ms = max(abs(min(time_diffs)), abs(max(time_diffs))) + + self.lab_logger().info("-" * 50) + self.lab_logger().info("[测试统计]") + self.lab_logger().info(f"有效测试次数: {len(ping_results)}/5") + self.lab_logger().info(f"平均往返时间: {avg_rtt_ms:.2f}ms") + self.lab_logger().info(f"平均时间差: {avg_time_diff_ms:.2f}ms") + self.lab_logger().info(f"时间差范围: {min(time_diffs):.2f}ms ~ {max(time_diffs):.2f}ms") + self.lab_logger().info(f"最大时间误差: ±{max_time_diff_error_ms:.2f}ms") + + # 计算任务执行延迟 + if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0: + self.lab_logger().info("-" * 50) + self.lab_logger().info("[任务执行延迟分析]") + self.lab_logger().info(f"服务端任务下发时间: {self.server_latest_timestamp:.6f}") + self.lab_logger().info(f"客户端任务开始时间: {task_start_time:.6f}") + + # 原始时间差(不考虑时间同步误差) + raw_delay_ms = (task_start_time - self.server_latest_timestamp) * 1000 + + # 考虑时间同步误差后的延迟(用平均时间差校正) + corrected_delay_ms = raw_delay_ms - avg_time_diff_ms + + self.lab_logger().info(f"📊 原始时间差: {raw_delay_ms:.2f}ms") + self.lab_logger().info(f"🔧 时间同步校正: {avg_time_diff_ms:.2f}ms") + self.lab_logger().info(f"⏰ 实际任务延迟: {corrected_delay_ms:.2f}ms") + self.lab_logger().info(f"📏 误差范围: ±{max_time_diff_error_ms:.2f}ms") + + # 给出延迟范围 + min_delay = corrected_delay_ms - max_time_diff_error_ms + max_delay = corrected_delay_ms + max_time_diff_error_ms + self.lab_logger().info(f"📋 延迟范围: {min_delay:.2f}ms ~ {max_delay:.2f}ms") + + else: + self.lab_logger().warning("⚠️ 无法获取服务端任务下发时间,跳过任务延迟分析") + corrected_delay_ms = -1 + + self.lab_logger().info("=" * 60) + + return { + "avg_rtt_ms": avg_rtt_ms, + "avg_time_diff_ms": avg_time_diff_ms, + "max_time_error_ms": max_time_diff_error_ms, + "task_delay_ms": corrected_delay_ms if corrected_delay_ms > 0 else -1, + "raw_delay_ms": ( + raw_delay_ms if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0 else -1 + ), + "test_count": len(ping_results), + "status": "success", + } + + def handle_pong_response(self, pong_data: dict): + """ + 处理pong响应 + """ + ping_id = pong_data.get("ping_id") + if ping_id: + with self._ping_lock: + self._ping_responses[ping_id] = pong_data + + # 详细信息合并为一条日志 + client_timestamp = pong_data.get("client_timestamp", 0) + server_timestamp = pong_data.get("server_timestamp", 0) + current_time = time.time() + + self.lab_logger().debug( + f"📨 Pong | ID:{ping_id[:8]}.. | C→S→C: {client_timestamp:.3f}→{server_timestamp:.3f}→{current_time:.3f}" + ) + else: + self.lab_logger().warning("⚠️ 收到无效的Pong响应(缺少ping_id)")