use same callback group

This commit is contained in:
Xuwznln
2025-10-23 01:52:33 +08:00
parent 2006406a24
commit 7f0b33b3e3
5 changed files with 23 additions and 21 deletions

View File

@@ -6,6 +6,8 @@ HTTP客户端模块
import json import json
import os import os
import time
from threading import Thread
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
import requests import requests
@@ -84,14 +86,14 @@ class HTTPClient:
f"{self.remote_addr}/edge/material", f"{self.remote_addr}/edge/material",
json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid},
headers={"Authorization": f"Lab {self.auth}"}, headers={"Authorization": f"Lab {self.auth}"},
timeout=100, timeout=60,
) )
else: else:
response = requests.put( response = requests.put(
f"{self.remote_addr}/edge/material", f"{self.remote_addr}/edge/material",
json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid},
headers={"Authorization": f"Lab {self.auth}"}, headers={"Authorization": f"Lab {self.auth}"},
timeout=100, timeout=10,
) )
with open(os.path.join(BasicConfig.working_dir, "res_resource_tree_add.json"), "w", encoding="utf-8") as f: with open(os.path.join(BasicConfig.working_dir, "res_resource_tree_add.json"), "w", encoding="utf-8") as f:

View File

@@ -10,7 +10,7 @@ from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet
from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher
from unilabos_msgs.srv import SerialCommand # type: ignore from unilabos_msgs.srv import SerialCommand # type: ignore
from rclpy.executors import MultiThreadedExecutor from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.node import Node from rclpy.node import Node
from rclpy.timer import Timer from rclpy.timer import Timer

View File

@@ -338,12 +338,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 创建资源管理客户端 # 创建资源管理客户端
self._resource_clients: Dict[str, Client] = { self._resource_clients: Dict[str, Client] = {
"resource_add": self.create_client(ResourceAdd, "/resources/add"), "resource_add": self.create_client(ResourceAdd, "/resources/add", callback_group=self.callback_group),
"resource_get": self.create_client(SerialCommand, "/resources/get"), "resource_get": self.create_client(SerialCommand, "/resources/get", callback_group=self.callback_group),
"resource_delete": self.create_client(ResourceDelete, "/resources/delete"), "resource_delete": self.create_client(ResourceDelete, "/resources/delete", callback_group=self.callback_group),
"resource_update": self.create_client(ResourceUpdate, "/resources/update"), "resource_update": self.create_client(ResourceUpdate, "/resources/update", callback_group=self.callback_group),
"resource_list": self.create_client(ResourceList, "/resources/list"), "resource_list": self.create_client(ResourceList, "/resources/list", callback_group=self.callback_group),
"c2s_update_resource_tree": self.create_client(SerialCommand, "/c2s_update_resource_tree"), "c2s_update_resource_tree": self.create_client(SerialCommand, "/c2s_update_resource_tree", callback_group=self.callback_group),
} }
def re_register_device(req, res): def re_register_device(req, res):
@@ -884,7 +884,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
action_type, action_type,
action_name, action_name,
execute_callback=self._create_execute_callback(action_name, action_value_mapping), execute_callback=self._create_execute_callback(action_name, action_value_mapping),
callback_group=ReentrantCallbackGroup(), callback_group=self.callback_group,
) )
self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}") self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}")
@@ -1505,7 +1505,7 @@ class ROS2DeviceNode:
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
loop.run_forever() loop.run_forever()
ROS2DeviceNode._loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNode") ROS2DeviceNode._loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNodeLoop")
ROS2DeviceNode._loop_thread.start() ROS2DeviceNode._loop_thread.start()
logger.info(f"循环线程已启动") logger.info(f"循环线程已启动")

View File

@@ -285,7 +285,7 @@ class HostNode(BaseROS2DeviceNode):
# 创建定时器,定期发现设备 # 创建定时器,定期发现设备
self._discovery_timer = self.create_timer( self._discovery_timer = self.create_timer(
discovery_interval, self._discovery_devices_callback, callback_group=ReentrantCallbackGroup() discovery_interval, self._discovery_devices_callback, callback_group=self.callback_group
) )
# 添加ping-pong相关属性 # 添加ping-pong相关属性
@@ -618,7 +618,7 @@ class HostNode(BaseROS2DeviceNode):
topic, topic,
lambda msg, d=device_id, p=property_name: self.property_callback(msg, d, p), lambda msg, d=device_id, p=property_name: self.property_callback(msg, d, p),
1, 1,
callback_group=ReentrantCallbackGroup(), callback_group=self.callback_group,
) )
# 标记为已订阅 # 标记为已订阅
self._subscribed_topics.add(topic) self._subscribed_topics.add(topic)
@@ -829,37 +829,37 @@ class HostNode(BaseROS2DeviceNode):
def _init_host_service(self): def _init_host_service(self):
self._resource_services: Dict[str, Service] = { self._resource_services: Dict[str, Service] = {
"resource_add": self.create_service( "resource_add": self.create_service(
ResourceAdd, "/resources/add", self._resource_add_callback, callback_group=ReentrantCallbackGroup() ResourceAdd, "/resources/add", self._resource_add_callback, callback_group=self.callback_group
), ),
"resource_get": self.create_service( "resource_get": self.create_service(
SerialCommand, "/resources/get", self._resource_get_callback, callback_group=ReentrantCallbackGroup() SerialCommand, "/resources/get", self._resource_get_callback, callback_group=self.callback_group
), ),
"resource_delete": self.create_service( "resource_delete": self.create_service(
ResourceDelete, ResourceDelete,
"/resources/delete", "/resources/delete",
self._resource_delete_callback, self._resource_delete_callback,
callback_group=ReentrantCallbackGroup(), callback_group=self.callback_group,
), ),
"resource_update": self.create_service( "resource_update": self.create_service(
ResourceUpdate, ResourceUpdate,
"/resources/update", "/resources/update",
self._resource_update_callback, self._resource_update_callback,
callback_group=ReentrantCallbackGroup(), callback_group=self.callback_group,
), ),
"resource_list": self.create_service( "resource_list": self.create_service(
ResourceList, "/resources/list", self._resource_list_callback, callback_group=ReentrantCallbackGroup() ResourceList, "/resources/list", self._resource_list_callback, callback_group=self.callback_group
), ),
"node_info_update": self.create_service( "node_info_update": self.create_service(
SerialCommand, SerialCommand,
"/node_info_update", "/node_info_update",
self._node_info_update_callback, self._node_info_update_callback,
callback_group=ReentrantCallbackGroup(), callback_group=self.callback_group,
), ),
"c2s_update_resource_tree": self.create_service( "c2s_update_resource_tree": self.create_service(
SerialCommand, SerialCommand,
"/c2s_update_resource_tree", "/c2s_update_resource_tree",
self._resource_tree_update_callback, self._resource_tree_update_callback,
callback_group=ReentrantCallbackGroup(), callback_group=self.callback_group,
), ),
} }

View File

@@ -194,7 +194,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
action_type, action_type,
action_name, action_name,
execute_callback=self._create_protocol_execute_callback(action_name, protocol_steps_generator), execute_callback=self._create_protocol_execute_callback(action_name, protocol_steps_generator),
callback_group=ReentrantCallbackGroup(), callback_group=self.callback_group,
) )
self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}") self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}")
return return