新增延迟统计

This commit is contained in:
Xuwznln
2025-05-29 00:46:15 +08:00
parent 8a50081c1e
commit 87099523c2
5 changed files with 266 additions and 57 deletions

View File

@@ -31,6 +31,6 @@ def job_add(req: JobAddReq) -> JobData:
action_kwargs = {"command": json.dumps(action_kwargs)}
elif "command" in action_kwargs:
action_kwargs = action_kwargs["command"]
print(f"job_add:{req.device_id} {action_name} {action_kwargs}")
HostNode.get_instance().send_goal(req.device_id, action_name=action_name, action_kwargs=action_kwargs, goal_uuid=req.job_id)
# print(f"job_add:{req.device_id} {action_name} {action_kwargs}")
HostNode.get_instance().send_goal(req.device_id, action_name=action_name, action_kwargs=action_kwargs, goal_uuid=req.job_id, server_info=req.server_info)
return JobData(jobId=req.job_id)

View File

@@ -51,8 +51,9 @@ class Resp(BaseModel):
class JobAddReq(BaseModel):
device_id: str = Field(examples=["Gripper"], description="device id")
data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}])
job_id: str = Field(examples=["sfsfsfeq"], description="goal uuid")
node_id: str = Field(examples=["sfsfsfeq"], description="node uuid")
job_id: str = Field(examples=["job_id"], description="goal uuid")
node_id: str = Field(examples=["node_id"], description="node uuid")
server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info")
class JobStepFinishReq(BaseModel):

View File

@@ -12,7 +12,7 @@ import tempfile
import os
from unilabos.config.config import MQConfig
from unilabos.app.controler import devices, job_add
from unilabos.app.controler import job_add
from unilabos.app.model import JobAddReq
from unilabos.utils import logger
from unilabos.utils.type_check import TypeEncoder
@@ -43,9 +43,10 @@ class MQTTClient:
def _on_connect(self, client, userdata, flags, rc, properties=None):
logger.info("[MQTT] Connected with result code " + str(rc))
client.subscribe(f"labs/{MQConfig.lab_id}/job/start/", 0)
client.subscribe(f"labs/{MQConfig.lab_id}/pong/", 0)
def _on_message(self, client, userdata, msg) -> None:
logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload))
# logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload))
try:
payload_str = msg.payload.decode("utf-8")
payload_json = json.loads(payload_str)
@@ -59,6 +60,14 @@ class MQTTClient:
job_req = JobAddReq.model_validate(payload_json)
data = job_add(job_req)
return
elif msg.topic == f"labs/{MQConfig.lab_id}/pong/":
# 处理pong响应通知HostNode
from unilabos.ros.nodes.presets.host_node import HostNode
host_instance = HostNode.get_instance(0)
if host_instance:
host_instance.handle_pong_response(payload_json)
return
except json.JSONDecodeError as e:
logger.error(f"[MQTT] JSON 解析错误: {e}")
@@ -175,6 +184,28 @@ class MQTTClient:
self.client.publish(address, json.dumps(action_info), qos=2)
logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}")
def send_ping(self, ping_id: str, timestamp: float):
"""发送ping消息到服务端"""
if self.mqtt_disable:
return
address = f"labs/{MQConfig.lab_id}/ping/"
ping_data = {"ping_id": ping_id, "client_timestamp": timestamp, "type": "ping"}
self.client.publish(address, json.dumps(ping_data), qos=2)
def setup_pong_subscription(self):
"""设置pong消息订阅"""
if self.mqtt_disable:
return
pong_topic = f"labs/{MQConfig.lab_id}/pong/"
self.client.subscribe(pong_topic, 0)
logger.debug(f"Subscribed to pong topic: {pong_topic}")
def handle_pong(self, pong_data: dict):
"""处理pong响应这个方法会在收到pong消息时被调用"""
logger.debug(f"Pong received: {pong_data}")
# 这里会被HostNode的ping-pong处理逻辑调用
pass
mqtt_client = MQTTClient()

View File

@@ -25,6 +25,9 @@ class Registry:
self.ResourceCreateFromOuterEasy = self._replace_type_with_class(
"ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource"
)
self.EmptyIn = self._replace_type_with_class(
"EmptyIn", "host_node", f""
)
self.device_type_registry = {}
self.resource_type_registry = {}
self._setup_called = False # 跟踪setup是否已调用
@@ -38,58 +41,69 @@ class Registry:
return
from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type
self.device_type_registry.update({
"host_node": {
"description": "UniLabOS主机节点",
"class": {
"module": "unilabos.ros.nodes.presets.host_node",
"type": "python",
"status_types": {},
"action_value_mappings": {
"create_resource_detailed": {
"type": self.ResourceCreateFromOuter,
"goal": {
"resources": "resources",
"device_ids": "device_ids",
"bind_parent_ids": "bind_parent_ids",
"bind_locations": "bind_locations",
"other_calling_params": "other_calling_params",
self.device_type_registry.update(
{
"host_node": {
"description": "UniLabOS主机节点",
"class": {
"module": "unilabos.ros.nodes.presets.host_node",
"type": "python",
"status_types": {},
"action_value_mappings": {
"create_resource_detailed": {
"type": self.ResourceCreateFromOuter,
"goal": {
"resources": "resources",
"device_ids": "device_ids",
"bind_parent_ids": "bind_parent_ids",
"bind_locations": "bind_locations",
"other_calling_params": "other_calling_params",
},
"feedback": {},
"result": {"success": "success"},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuter),
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal))
),
},
"feedback": {},
"result": {"success": "success"},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuter),
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal))
)
},
"create_resource": {
"type": self.ResourceCreateFromOuterEasy,
"goal": {
"res_id": "res_id",
"class_name": "class_name",
"parent": "parent",
"device_id": "device_id",
"bind_locations": "bind_locations",
"liquid_input_slot": "liquid_input_slot[]",
"liquid_type": "liquid_type[]",
"liquid_volume": "liquid_volume[]",
"slot_on_deck": "slot_on_deck",
"create_resource": {
"type": self.ResourceCreateFromOuterEasy,
"goal": {
"res_id": "res_id",
"class_name": "class_name",
"parent": "parent",
"device_id": "device_id",
"bind_locations": "bind_locations",
"liquid_input_slot": "liquid_input_slot[]",
"liquid_type": "liquid_type[]",
"liquid_volume": "liquid_volume[]",
"slot_on_deck": "slot_on_deck",
},
"feedback": {},
"result": {"success": "success"},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy),
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
),
},
"test_latency": {
"type": self.EmptyIn,
"goal": {},
"feedback": {},
"result": {"latency_ms": "latency_ms", "time_diff_ms": "time_diff_ms"},
"schema": ros_action_to_json_schema(self.EmptyIn),
"goal_default": {},
},
"feedback": {},
"result": {"success": "success"},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy),
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
)
},
},
},
"icon": "icon_device.webp",
"registry_type": "device",
"schema": {"properties": {}, "additionalProperties": False, "type": "object"},
"file_path": "/",
"icon": "icon_device.webp",
"registry_type": "device",
"schema": {"properties": {}, "additionalProperties": False, "type": "object"},
"file_path": "/",
}
}
})
)
logger.debug(f"[UniLab Registry] ----------Setup----------")
self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths):

View File

@@ -93,6 +93,7 @@ class HostNode(BaseROS2DeviceNode):
self.__class__._instance = self
# 初始化配置
self.server_latest_timestamp = 0.0 #
self.devices_config = devices_config
self.resources_config = resources_config
self.physical_setup_graph = physical_setup_graph
@@ -122,6 +123,12 @@ class HostNode(BaseROS2DeviceNode):
"/devices/host_node/create_resource_detailed",
callback_group=self.callback_group,
),
"/devices/host_node/test_latency": ActionClient(
self,
lab_registry.EmptyIn,
"/devices/host_node/test_latency",
callback_group=self.callback_group,
),
} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = (
{}
@@ -207,6 +214,10 @@ class HostNode(BaseROS2DeviceNode):
discovery_interval, self._discovery_devices_callback, callback_group=ReentrantCallbackGroup()
)
# 添加ping-pong相关属性
self._ping_responses = {} # 存储ping响应
self._ping_lock = threading.Lock()
self.lab_logger().info("[Host Node] Host node initialized.")
HostNode._ready_event.set()
@@ -379,8 +390,8 @@ class HostNode(BaseROS2DeviceNode):
},
}
) # flatten的格式
resources = [init_new_res]
device_id = [device_id]
resources = init_new_res # initialize_resource已经返回list[dict]
device_ids = [device_id]
bind_parent_id = [parent]
bind_location = [bind_locations]
other_calling_param = [
@@ -395,7 +406,7 @@ class HostNode(BaseROS2DeviceNode):
)
]
return self.create_resource_detailed(resources, device_id, bind_parent_id, bind_location, other_calling_param)
return self.create_resource_detailed(resources, device_ids, bind_parent_id, bind_location, other_calling_param)
def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None:
"""
@@ -526,7 +537,12 @@ class HostNode(BaseROS2DeviceNode):
)
def send_goal(
self, device_id: str, action_name: str, action_kwargs: Dict[str, Any], goal_uuid: Optional[str] = None
self,
device_id: str,
action_name: str,
action_kwargs: Dict[str, Any],
goal_uuid: Optional[str] = None,
server_info: Optional[Dict[str, Any]] = None,
) -> None:
"""
向设备发送目标请求
@@ -538,6 +554,8 @@ class HostNode(BaseROS2DeviceNode):
goal_uuid: 目标UUID如果为None则自动生成
"""
action_id = f"/devices/{device_id}/{action_name}"
if action_name == "test_latency" and server_info is not None:
self.server_latest_timestamp = server_info.get("send_timestamp", 0.0)
if action_id not in self._action_clients:
self.lab_logger().error(f"[Host Node] ActionClient {action_id} not found.")
return
@@ -832,3 +850,148 @@ class HostNode(BaseROS2DeviceNode):
# 这里可以实现返回资源列表的逻辑
self.lab_logger().debug(f"[Host Node-Resource] List parameters: {request}")
return response
def test_latency(self):
"""
测试网络延迟的action实现
通过5次ping-pong机制校对时间误差并计算实际延迟
"""
import time
import uuid as uuid_module
self.lab_logger().info("=" * 60)
self.lab_logger().info("开始网络延迟测试...")
# 记录任务开始执行的时间
task_start_time = time.time()
# 进行5次ping-pong测试
ping_results = []
for i in range(5):
self.lab_logger().info(f"{i+1}/5次ping-pong测试...")
# 生成唯一的ping ID
ping_id = str(uuid_module.uuid4())
# 记录发送时间
send_timestamp = time.time()
# 发送ping
from unilabos.app.mq import mqtt_client
mqtt_client.send_ping(ping_id, send_timestamp)
# 等待pong响应
timeout = 10.0
start_wait_time = time.time()
while time.time() - start_wait_time < timeout:
with self._ping_lock:
if ping_id in self._ping_responses:
pong_data = self._ping_responses.pop(ping_id)
break
time.sleep(0.001)
else:
self.lab_logger().error(f"❌ 第{i+1}次测试超时")
continue
# 计算本次测试结果
receive_timestamp = time.time()
client_timestamp = pong_data["client_timestamp"]
server_timestamp = pong_data["server_timestamp"]
# 往返时间
rtt_ms = (receive_timestamp - send_timestamp) * 1000
# 客户端与服务端时间差(客户端时间 - 服务端时间)
# 假设网络延迟对称,取中间点的服务端时间
mid_point_time = send_timestamp + (receive_timestamp - send_timestamp) / 2
time_diff_ms = (mid_point_time - server_timestamp) * 1000
ping_results.append({"rtt_ms": rtt_ms, "time_diff_ms": time_diff_ms})
self.lab_logger().info(f"✅ 第{i+1}次: 往返时间={rtt_ms:.2f}ms, 时间差={time_diff_ms:.2f}ms")
time.sleep(0.1)
if not ping_results:
self.lab_logger().error("❌ 所有ping-pong测试都失败了")
return {"status": "all_timeout"}
# 统计分析
rtts = [r["rtt_ms"] for r in ping_results]
time_diffs = [r["time_diff_ms"] for r in ping_results]
avg_rtt_ms = sum(rtts) / len(rtts)
avg_time_diff_ms = sum(time_diffs) / len(time_diffs)
max_time_diff_error_ms = max(abs(min(time_diffs)), abs(max(time_diffs)))
self.lab_logger().info("-" * 50)
self.lab_logger().info("[测试统计]")
self.lab_logger().info(f"有效测试次数: {len(ping_results)}/5")
self.lab_logger().info(f"平均往返时间: {avg_rtt_ms:.2f}ms")
self.lab_logger().info(f"平均时间差: {avg_time_diff_ms:.2f}ms")
self.lab_logger().info(f"时间差范围: {min(time_diffs):.2f}ms ~ {max(time_diffs):.2f}ms")
self.lab_logger().info(f"最大时间误差: ±{max_time_diff_error_ms:.2f}ms")
# 计算任务执行延迟
if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0:
self.lab_logger().info("-" * 50)
self.lab_logger().info("[任务执行延迟分析]")
self.lab_logger().info(f"服务端任务下发时间: {self.server_latest_timestamp:.6f}")
self.lab_logger().info(f"客户端任务开始时间: {task_start_time:.6f}")
# 原始时间差(不考虑时间同步误差)
raw_delay_ms = (task_start_time - self.server_latest_timestamp) * 1000
# 考虑时间同步误差后的延迟(用平均时间差校正)
corrected_delay_ms = raw_delay_ms - avg_time_diff_ms
self.lab_logger().info(f"📊 原始时间差: {raw_delay_ms:.2f}ms")
self.lab_logger().info(f"🔧 时间同步校正: {avg_time_diff_ms:.2f}ms")
self.lab_logger().info(f"⏰ 实际任务延迟: {corrected_delay_ms:.2f}ms")
self.lab_logger().info(f"📏 误差范围: ±{max_time_diff_error_ms:.2f}ms")
# 给出延迟范围
min_delay = corrected_delay_ms - max_time_diff_error_ms
max_delay = corrected_delay_ms + max_time_diff_error_ms
self.lab_logger().info(f"📋 延迟范围: {min_delay:.2f}ms ~ {max_delay:.2f}ms")
else:
self.lab_logger().warning("⚠️ 无法获取服务端任务下发时间,跳过任务延迟分析")
corrected_delay_ms = -1
self.lab_logger().info("=" * 60)
return {
"avg_rtt_ms": avg_rtt_ms,
"avg_time_diff_ms": avg_time_diff_ms,
"max_time_error_ms": max_time_diff_error_ms,
"task_delay_ms": corrected_delay_ms if corrected_delay_ms > 0 else -1,
"raw_delay_ms": (
raw_delay_ms if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0 else -1
),
"test_count": len(ping_results),
"status": "success",
}
def handle_pong_response(self, pong_data: dict):
"""
处理pong响应
"""
ping_id = pong_data.get("ping_id")
if ping_id:
with self._ping_lock:
self._ping_responses[ping_id] = pong_data
# 详细信息合并为一条日志
client_timestamp = pong_data.get("client_timestamp", 0)
server_timestamp = pong_data.get("server_timestamp", 0)
current_time = time.time()
self.lab_logger().debug(
f"📨 Pong | ID:{ping_id[:8]}.. | C→S→C: {client_timestamp:.3f}{server_timestamp:.3f}{current_time:.3f}"
)
else:
self.lab_logger().warning("⚠️ 收到无效的Pong响应缺少ping_id")