Merge branch 'main' into pr/39

This commit is contained in:
Junhan Chang
2025-06-07 23:49:33 +08:00
40 changed files with 814 additions and 342 deletions

View File

@@ -348,10 +348,16 @@ def convert_to_ros_msg(ros_msg_type: Union[Type, Any], obj: Any) -> Any:
if isinstance(td, NamespacedType):
target_class = msg_converter_manager.get_class(f"{'.'.join(td.namespaces)}.{td.name}")
setattr(ros_msg, key, [convert_to_ros_msg(target_class, v) for v in value])
elif isinstance(td, UnboundedString):
setattr(ros_msg, key, value)
else:
logger.warning(f"Not Supported type: {td}")
setattr(ros_msg, key, []) # FIXME
elif "array.array" in str(type(attr)):
setattr(ros_msg, key, value)
if attr.typecode == "f":
setattr(ros_msg, key, [float(i) for i in value])
else:
setattr(ros_msg, key, value)
else:
nested_ros_msg = convert_to_ros_msg(type(attr)(), value)
setattr(ros_msg, key, nested_ros_msg)
@@ -574,6 +580,7 @@ basic_type_map = {
'int64': {'type': 'integer'},
'uint64': {'type': 'integer', 'minimum': 0},
'double': {'type': 'number'},
'float': {'type': 'number'},
'float32': {'type': 'number'},
'float64': {'type': 'number'},
'string': {'type': 'string'},

View File

@@ -1,3 +1,5 @@
import copy
import functools
import json
import threading
import time
@@ -19,7 +21,7 @@ from unilabos_msgs.action import SendCmd
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type, resource_ulab_to_plr, \
initialize_resources
initialize_resources, list_to_nested_dict, dict_to_tree, resource_plr_to_ulab, tree_to_list
from unilabos.ros.msgs.message_converter import (
convert_to_ros_msg,
convert_from_ros_msg,
@@ -311,7 +313,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 物料传输到对应的node节点
rclient = self.create_client(ResourceAdd, "/resources/add")
rclient.wait_for_service()
rclient2 = self.create_client(ResourceAdd, "/resources/add")
rclient2.wait_for_service()
request = ResourceAdd.Request()
request2 = ResourceAdd.Request()
command_json = json.loads(req.command)
namespace = command_json["namespace"]
bind_parent_id = command_json["bind_parent_id"]
@@ -320,11 +325,23 @@ class BaseROS2DeviceNode(Node, Generic[T]):
other_calling_param = command_json["other_calling_param"]
resources = command_json["resource"]
initialize_full = other_calling_param.pop("initialize_full", False)
# 用来增加液体
ADD_LIQUID_TYPE = other_calling_param.pop("ADD_LIQUID_TYPE", [])
LIQUID_VOLUME = other_calling_param.pop("LIQUID_VOLUME", [])
LIQUID_INPUT_SLOT = other_calling_param.pop("LIQUID_INPUT_SLOT", [])
slot = other_calling_param.pop("slot", -1)
if slot >= 0: # slot为负数的时候采用assign方法
other_calling_param["slot"] = slot
# 本地拿到这个物料,可能需要先做初始化?
if isinstance(resources, list):
if initialize_full:
if len(resources) == 1 and isinstance(resources[0], list) and not initialize_full: # 取消,不存在的情况
# 预先initialize过以整组的形式传入
request.resources = [convert_to_ros_msg(Resource, resource_) for resource_ in resources[0]]
elif initialize_full:
resources = initialize_resources(resources)
request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources]
request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources]
else:
request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources]
else:
if initialize_full:
resources = initialize_resources([resources])
@@ -334,20 +351,31 @@ class BaseROS2DeviceNode(Node, Generic[T]):
res.response = "OK"
# 接下来该根据bind_parent_id进行assign了目前只有plr可以进行assign不然没有办法输入到物料系统中
resource = self.resource_tracker.figure_resource({"name": bind_parent_id})
request.resources = [convert_to_ros_msg(Resource, resources)]
# request.resources = [convert_to_ros_msg(Resource, resources)]
try:
from pylabrobot.resources.resource import Resource as ResourcePLR
from pylabrobot.resources.deck import Deck
from pylabrobot.resources import Coordinate
from pylabrobot.resources import OTDeck
from pylabrobot.resources import Plate
contain_model = not isinstance(resource, Deck)
if isinstance(resource, ResourcePLR):
# resources.list()
plr_instance = resource_ulab_to_plr(resources, contain_model)
resources_tree = dict_to_tree(copy.deepcopy({r["id"]: r for r in resources}))
plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model)
if isinstance(plr_instance, Plate):
empty_liquid_info_in = [(None, 0)] * plr_instance.num_items
for liquid_type, liquid_volume, liquid_input_slot in zip(ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT):
empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume)
plr_instance.set_well_liquids(empty_liquid_info_in)
if isinstance(resource, OTDeck) and "slot" in other_calling_param:
resource.assign_child_at_slot(plr_instance, **other_calling_param)
resource.assign_child_resource(plr_instance, Coordinate(location["x"], location["y"], location["z"]), **other_calling_param)
else:
_discard_slot = other_calling_param.pop("slot", -1)
resource.assign_child_resource(plr_instance, Coordinate(location["x"], location["y"], location["z"]), **other_calling_param)
request2.resources = [convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)])]
rclient2.call(request2)
# 发送给ResourceMeshManager
action_client = ActionClient(
self, SendCmd, "/devices/resource_mesh_manager/add_resource_mesh", callback_group=self.callback_group
@@ -404,6 +432,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 加入全局注册表
registered_devices[self.device_id] = device_info
from unilabos.config.config import BasicConfig
from unilabos.ros.nodes.presets.host_node import HostNode
if not BasicConfig.is_host_mode:
sclient = self.create_client(SerialCommand, "/node_info_update")
# 启动线程执行发送任务
@@ -413,6 +442,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
daemon=True,
name=f"ROSDevice{self.device_id}_send_slave_node_info"
).start()
else:
host_node = HostNode.get_instance(0)
if host_node is not None:
host_node.device_machine_names[self.device_id] = "本地"
def send_slave_node_info(self, sclient):
sclient.wait_for_service()
@@ -481,6 +514,17 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.lab_logger().debug(f"发布动作: {action_name}, 类型: {str_action_type}")
def get_real_function(self, instance, attr_name):
if hasattr(instance.__class__, attr_name):
obj = getattr(instance.__class__, attr_name)
if isinstance(obj, property):
return lambda *args, **kwargs: obj.fset(instance, *args, **kwargs), get_type_hints(obj.fset)
obj = getattr(instance, attr_name)
return obj, get_type_hints(obj)
else:
obj = getattr(instance, attr_name)
return obj, get_type_hints(obj)
def _create_execute_callback(self, action_name, action_value_mapping):
"""创建动作执行回调函数"""
@@ -495,22 +539,21 @@ class BaseROS2DeviceNode(Node, Generic[T]):
for i, action in enumerate(self._action_value_mappings["sequence"]):
if i == 0:
self.lab_logger().info(f"执行序列动作第一步: {action}")
getattr(self.driver_instance, action)(**kwargs)
self.get_real_function(self.driver_instance, action)[0](**kwargs)
else:
self.lab_logger().info(f"执行序列动作后续步骤: {action}")
getattr(self.driver_instance, action)()
self.get_real_function(self.driver_instance, action)[0]()
action_paramtypes = get_type_hints(
getattr(self.driver_instance, self._action_value_mappings["sequence"][0])
)
self.get_real_function(self.driver_instance, self._action_value_mappings["sequence"][0])
)[1]
else:
ACTION = getattr(self.driver_instance, action_name)
action_paramtypes = get_type_hints(ACTION)
ACTION, action_paramtypes = self.get_real_function(self.driver_instance, action_name)
action_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"])
self.lab_logger().debug(f"接收到原始目标: {action_kwargs}")
# 向Host查询物料当前状态如果是host本身的增加物料的请求则直接跳过
if action_name != "add_resource_from_outer":
if action_name not in ["create_resource_detailed", "create_resource"]:
for k, v in goal.get_fields_and_field_types().items():
if v in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
self.lab_logger().info(f"查询资源状态: Key: {k} Type: {v}")
@@ -609,7 +652,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
del future
# 向Host更新物料当前状态
if action_name != "add_resource_from_outer":
if action_name not in ["create_resource_detailed", "create_resource"]:
for k, v in goal.get_fields_and_field_types().items():
if v not in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
continue
@@ -748,7 +791,7 @@ class ROS2DeviceNode:
self.resource_tracker = DeviceNodeResourceTracker()
# use_pylabrobot_creator 使用 cls的包路径检测
use_pylabrobot_creator = driver_class.__module__.startswith("pylabrobot") or driver_class.__name__ == "DPLiquidHandler"
use_pylabrobot_creator = driver_class.__module__.startswith("pylabrobot") or driver_class.__name__ == "LiquidHandlerAbstract"
# TODO: 要在创建之前预先请求服务器是否有当前id的物料放到resource_tracker中让pylabrobot进行创建
# 创建设备类实例

View File

@@ -12,11 +12,18 @@ from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.service import Service
from unilabos_msgs.msg import Resource # type: ignore
from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \
SerialCommand # type: ignore
from unilabos_msgs.srv import (
ResourceAdd,
ResourceGet,
ResourceDelete,
ResourceUpdate,
ResourceList,
SerialCommand,
) # type: ignore
from unique_identifier_msgs.msg import UUID
from unilabos.registry.registry import lab_registry
from unilabos.resources.graphio import initialize_resource
from unilabos.resources.registry import add_schema
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
@@ -86,6 +93,7 @@ class HostNode(BaseROS2DeviceNode):
self.__class__._instance = self
# 初始化配置
self.server_latest_timestamp = 0.0 #
self.devices_config = devices_config
self.resources_config = resources_config
self.physical_setup_graph = physical_setup_graph
@@ -99,9 +107,32 @@ class HostNode(BaseROS2DeviceNode):
# 创建设备、动作客户端和目标存储
self.devices_names: Dict[str, str] = {device_id: self.namespace} # 存储设备名称和命名空间的映射
self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例
self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射
self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = {} # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
self.device_machine_names: Dict[str, str] = {
device_id: "本地",
} # 存储设备ID到机器名称的映射
self._action_clients: Dict[str, ActionClient] = { # 为了方便了解实际的数据类型host的默认写好
"/devices/host_node/create_resource": ActionClient(
self,
lab_registry.ResourceCreateFromOuterEasy,
"/devices/host_node/create_resource",
callback_group=self.callback_group,
),
"/devices/host_node/create_resource_detailed": ActionClient(
self,
lab_registry.ResourceCreateFromOuter,
"/devices/host_node/create_resource_detailed",
callback_group=self.callback_group,
),
"/devices/host_node/test_latency": ActionClient(
self,
lab_registry.EmptyIn,
"/devices/host_node/test_latency",
callback_group=self.callback_group,
),
} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = (
{}
) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
self._last_discovery_time = 0.0 # 上次设备发现的时间
@@ -115,8 +146,11 @@ class HostNode(BaseROS2DeviceNode):
self.device_status_timestamps = {} # 用来存储设备状态最后更新时间
from unilabos.app.mq import mqtt_client
for device_config in lab_registry.obtain_registry_device_info():
mqtt_client.publish_registry(device_config["id"], device_config)
for device_info in lab_registry.obtain_registry_device_info():
mqtt_client.publish_registry(device_info["id"], device_info)
for resource_info in lab_registry.obtain_registry_resource_info():
mqtt_client.publish_registry(resource_info["id"], resource_info)
# 首次发现网络中的设备
self._discover_devices()
@@ -141,12 +175,36 @@ class HostNode(BaseROS2DeviceNode):
].items():
controller_config["update_rate"] = update_rate
self.initialize_controller(controller_id, controller_config)
resources_config.insert(
0,
{
"id": "host_node",
"name": "host_node",
"parent": None,
"type": "device",
"class": "host_node",
"position": {"x": 0, "y": 0, "z": 0},
"config": {},
"data": {},
"children": [],
},
)
resource_with_parent_name = []
resource_ids_to_instance = {i["id"]: i for i in resources_config}
for res in resources_config:
if res.get("parent") and res.get("type") == "device" and res.get("class"):
parent_id = res.get("parent")
parent_res = resource_ids_to_instance[parent_id]
if parent_res.get("type") == "device" and parent_res.get("class"):
resource_with_parent_name.append(copy.deepcopy(res))
resource_with_parent_name[-1]["id"] = f"{parent_res['id']}/{res['id']}"
continue
resource_with_parent_name.append(copy.deepcopy(res))
try:
for bridge in self.bridges:
if hasattr(bridge, "resource_add"):
self.lab_logger().info("[Host Node-Resource] Adding resources to bridge.")
bridge.resource_add(add_schema(resources_config))
resource_add_res = bridge.resource_add(add_schema(resource_with_parent_name))
except Exception as ex:
self.lab_logger().error("[Host Node-Resource] 添加物料出错!")
self.lab_logger().error(traceback.format_exc())
@@ -156,6 +214,10 @@ class HostNode(BaseROS2DeviceNode):
discovery_interval, self._discovery_devices_callback, callback_group=ReentrantCallbackGroup()
)
# 添加ping-pong相关属性
self._ping_responses = {} # 存储ping响应
self._ping_lock = threading.Lock()
self.lab_logger().info("[Host Node] Host node initialized.")
HostNode._ready_event.set()
@@ -191,7 +253,7 @@ class HostNode(BaseROS2DeviceNode):
# 如果是新设备记录并创建ActionClient
if edge_device_id not in self.devices_names:
self.lab_logger().info(f"[Host Node] Discovered new device: {device_key}")
self.lab_logger().info(f"[Host Node] Discovered new device: {edge_device_id}")
self.devices_names[edge_device_id] = namespace
self._create_action_clients_for_device(device_id, namespace)
self._online_devices.add(device_key)
@@ -200,7 +262,7 @@ class HostNode(BaseROS2DeviceNode):
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}",
).start()
elif device_key not in self._online_devices:
# 设备重新上线
@@ -211,7 +273,7 @@ class HostNode(BaseROS2DeviceNode):
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}",
).start()
# 检测离线设备
@@ -255,7 +317,7 @@ class HostNode(BaseROS2DeviceNode):
self, action_type, action_id, callback_group=self.callback_group
)
self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}")
action_name = action_id[len(namespace) + 1:]
action_name = action_id[len(namespace) + 1 :]
edge_device_id = namespace[9:]
# from unilabos.app.mq import mqtt_client
# info_with_schema = ros_action_to_json_schema(action_type)
@@ -268,30 +330,84 @@ class HostNode(BaseROS2DeviceNode):
except Exception as e:
self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}")
def add_resource_from_outer(self, resources: list["Resource"], device_ids: list[str], bind_parent_ids: list[str], bind_locations: list[Point], other_calling_params: list[str]):
for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip(resources, device_ids, bind_parent_ids, bind_locations, other_calling_params):
def create_resource_detailed(
self,
resources: list["Resource"],
device_ids: list[str],
bind_parent_ids: list[str],
bind_locations: list[Point],
other_calling_params: list[str],
):
for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip(
resources, device_ids, bind_parent_ids, bind_locations, other_calling_params
):
# 这里要求device_id传入必须是edge_device_id
namespace = "/devices/" + device_id
srv_address = f"/srv{namespace}/append_resource"
sclient = self.create_client(SerialCommand, srv_address)
sclient.wait_for_service()
request = SerialCommand.Request()
request.command = json.dumps({
"resource": resource,
"namespace": namespace,
"edge_device_id": device_id,
"bind_parent_id": bind_parent_id,
"bind_location": {
"x": bind_location.x,
"y": bind_location.y,
"z": bind_location.z,
request.command = json.dumps(
{
"resource": resource, # 单个/单组 可为 list[list[Resource]]
"namespace": namespace,
"edge_device_id": device_id,
"bind_parent_id": bind_parent_id,
"bind_location": {
"x": bind_location.x,
"y": bind_location.y,
"z": bind_location.z,
},
"other_calling_param": json.loads(other_calling_param) if other_calling_param else {},
},
"other_calling_param": json.loads(other_calling_param) if other_calling_param else {},
}, ensure_ascii=False)
ensure_ascii=False,
)
response = sclient.call(request)
pass
pass
def create_resource(
self,
device_id: str,
res_id: str,
class_name: str,
parent: str,
bind_locations: Point,
liquid_input_slot: list[int],
liquid_type: list[str],
liquid_volume: list[int],
slot_on_deck: int,
):
init_new_res = initialize_resource(
{
"name": res_id,
"class": class_name,
"parent": parent,
"position": {
"x": bind_locations.x,
"y": bind_locations.y,
"z": bind_locations.z,
},
}
) # flatten的格式
resources = init_new_res # initialize_resource已经返回list[dict]
device_ids = [device_id]
bind_parent_id = [parent]
bind_location = [bind_locations]
other_calling_param = [
json.dumps(
{
"ADD_LIQUID_TYPE": liquid_type,
"LIQUID_VOLUME": liquid_volume,
"LIQUID_INPUT_SLOT": liquid_input_slot,
"initialize_full": False,
"slot": slot_on_deck,
}
)
]
return self.create_resource_detailed(resources, device_ids, bind_parent_id, bind_location, other_calling_param)
def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None:
"""
根据配置初始化设备,
@@ -319,7 +435,9 @@ class HostNode(BaseROS2DeviceNode):
if action_id not in self._action_clients:
action_type = action_value_mapping["type"]
self._action_clients[action_id] = ActionClient(self, action_type, action_id)
self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的
self.lab_logger().debug(
f"[Host Node] Created ActionClient (Local): {action_id}"
) # 子设备再创建用的是Discover发现的
# from unilabos.app.mq import mqtt_client
# info_with_schema = ros_action_to_json_schema(action_type)
# mqtt_client.publish_actions(action_name, {
@@ -419,7 +537,12 @@ class HostNode(BaseROS2DeviceNode):
)
def send_goal(
self, device_id: str, action_name: str, action_kwargs: Dict[str, Any], goal_uuid: Optional[str] = None
self,
device_id: str,
action_name: str,
action_kwargs: Dict[str, Any],
goal_uuid: Optional[str] = None,
server_info: Optional[Dict[str, Any]] = None,
) -> None:
"""
向设备发送目标请求
@@ -431,6 +554,8 @@ class HostNode(BaseROS2DeviceNode):
goal_uuid: 目标UUID如果为None则自动生成
"""
action_id = f"/devices/{device_id}/{action_name}"
if action_name == "test_latency" and server_info is not None:
self.server_latest_timestamp = server_info.get("send_timestamp", 0.0)
if action_id not in self._action_clients:
self.lab_logger().error(f"[Host Node] ActionClient {action_id} not found.")
return
@@ -725,3 +850,148 @@ class HostNode(BaseROS2DeviceNode):
# 这里可以实现返回资源列表的逻辑
self.lab_logger().debug(f"[Host Node-Resource] List parameters: {request}")
return response
def test_latency(self):
"""
测试网络延迟的action实现
通过5次ping-pong机制校对时间误差并计算实际延迟
"""
import time
import uuid as uuid_module
self.lab_logger().info("=" * 60)
self.lab_logger().info("开始网络延迟测试...")
# 记录任务开始执行的时间
task_start_time = time.time()
# 进行5次ping-pong测试
ping_results = []
for i in range(5):
self.lab_logger().info(f"{i+1}/5次ping-pong测试...")
# 生成唯一的ping ID
ping_id = str(uuid_module.uuid4())
# 记录发送时间
send_timestamp = time.time()
# 发送ping
from unilabos.app.mq import mqtt_client
mqtt_client.send_ping(ping_id, send_timestamp)
# 等待pong响应
timeout = 10.0
start_wait_time = time.time()
while time.time() - start_wait_time < timeout:
with self._ping_lock:
if ping_id in self._ping_responses:
pong_data = self._ping_responses.pop(ping_id)
break
time.sleep(0.001)
else:
self.lab_logger().error(f"❌ 第{i+1}次测试超时")
continue
# 计算本次测试结果
receive_timestamp = time.time()
client_timestamp = pong_data["client_timestamp"]
server_timestamp = pong_data["server_timestamp"]
# 往返时间
rtt_ms = (receive_timestamp - send_timestamp) * 1000
# 客户端与服务端时间差(客户端时间 - 服务端时间)
# 假设网络延迟对称,取中间点的服务端时间
mid_point_time = send_timestamp + (receive_timestamp - send_timestamp) / 2
time_diff_ms = (mid_point_time - server_timestamp) * 1000
ping_results.append({"rtt_ms": rtt_ms, "time_diff_ms": time_diff_ms})
self.lab_logger().info(f"✅ 第{i+1}次: 往返时间={rtt_ms:.2f}ms, 时间差={time_diff_ms:.2f}ms")
time.sleep(0.1)
if not ping_results:
self.lab_logger().error("❌ 所有ping-pong测试都失败了")
return {"status": "all_timeout"}
# 统计分析
rtts = [r["rtt_ms"] for r in ping_results]
time_diffs = [r["time_diff_ms"] for r in ping_results]
avg_rtt_ms = sum(rtts) / len(rtts)
avg_time_diff_ms = sum(time_diffs) / len(time_diffs)
max_time_diff_error_ms = max(abs(min(time_diffs)), abs(max(time_diffs)))
self.lab_logger().info("-" * 50)
self.lab_logger().info("[测试统计]")
self.lab_logger().info(f"有效测试次数: {len(ping_results)}/5")
self.lab_logger().info(f"平均往返时间: {avg_rtt_ms:.2f}ms")
self.lab_logger().info(f"平均时间差: {avg_time_diff_ms:.2f}ms")
self.lab_logger().info(f"时间差范围: {min(time_diffs):.2f}ms ~ {max(time_diffs):.2f}ms")
self.lab_logger().info(f"最大时间误差: ±{max_time_diff_error_ms:.2f}ms")
# 计算任务执行延迟
if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0:
self.lab_logger().info("-" * 50)
self.lab_logger().info("[任务执行延迟分析]")
self.lab_logger().info(f"服务端任务下发时间: {self.server_latest_timestamp:.6f}")
self.lab_logger().info(f"客户端任务开始时间: {task_start_time:.6f}")
# 原始时间差(不考虑时间同步误差)
raw_delay_ms = (task_start_time - self.server_latest_timestamp) * 1000
# 考虑时间同步误差后的延迟(用平均时间差校正)
corrected_delay_ms = raw_delay_ms - avg_time_diff_ms
self.lab_logger().info(f"📊 原始时间差: {raw_delay_ms:.2f}ms")
self.lab_logger().info(f"🔧 时间同步校正: {avg_time_diff_ms:.2f}ms")
self.lab_logger().info(f"⏰ 实际任务延迟: {corrected_delay_ms:.2f}ms")
self.lab_logger().info(f"📏 误差范围: ±{max_time_diff_error_ms:.2f}ms")
# 给出延迟范围
min_delay = corrected_delay_ms - max_time_diff_error_ms
max_delay = corrected_delay_ms + max_time_diff_error_ms
self.lab_logger().info(f"📋 延迟范围: {min_delay:.2f}ms ~ {max_delay:.2f}ms")
else:
self.lab_logger().warning("⚠️ 无法获取服务端任务下发时间,跳过任务延迟分析")
corrected_delay_ms = -1
self.lab_logger().info("=" * 60)
return {
"avg_rtt_ms": avg_rtt_ms,
"avg_time_diff_ms": avg_time_diff_ms,
"max_time_error_ms": max_time_diff_error_ms,
"task_delay_ms": corrected_delay_ms if corrected_delay_ms > 0 else -1,
"raw_delay_ms": (
raw_delay_ms if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0 else -1
),
"test_count": len(ping_results),
"status": "success",
}
def handle_pong_response(self, pong_data: dict):
"""
处理pong响应
"""
ping_id = pong_data.get("ping_id")
if ping_id:
with self._ping_lock:
self._ping_responses[ping_id] = pong_data
# 详细信息合并为一条日志
client_timestamp = pong_data.get("client_timestamp", 0)
server_timestamp = pong_data.get("server_timestamp", 0)
current_time = time.time()
self.lab_logger().debug(
f"📨 Pong | ID:{ping_id[:8]}.. | C→S→C: {client_timestamp:.3f}{server_timestamp:.3f}{current_time:.3f}"
)
else:
self.lab_logger().warning("⚠️ 收到无效的Pong响应缺少ping_id")