Dev backward (#228)

* Workbench example, adjust log level, and ci check (#220)

* TestLatency Return Value Example & gitignore update

* Adjust log level & Add workbench virtual example & Add not action decorator & Add check_mode &

* Add CI Check

* CI Check Fix 1

* CI Check Fix 2

* CI Check Fix 3

* CI Check Fix 4

* CI Check Fix 5

* Upgrade to py 3.11.14; ros 0.7; unilabos 0.10.16

* Update to ROS2 Humble 0.7

* Fix Build 1

* Fix Build 2

* Fix Build 3

* Fix Build 4

* Fix Build 5

* Fix Build 6

* Fix Build 7

* ci(deps): bump actions/configure-pages from 4 to 5 (#222)

Bumps [actions/configure-pages](https://github.com/actions/configure-pages) from 4 to 5.
- [Release notes](https://github.com/actions/configure-pages/releases)
- [Commits](https://github.com/actions/configure-pages/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/configure-pages
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* ci(deps): bump actions/upload-artifact from 4 to 6 (#224)

Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 4 to 6.
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](https://github.com/actions/upload-artifact/compare/v4...v6)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* ci(deps): bump actions/upload-pages-artifact from 3 to 4 (#225)

Bumps [actions/upload-pages-artifact](https://github.com/actions/upload-pages-artifact) from 3 to 4.
- [Release notes](https://github.com/actions/upload-pages-artifact/releases)
- [Commits](https://github.com/actions/upload-pages-artifact/compare/v3...v4)

---
updated-dependencies:
- dependency-name: actions/upload-pages-artifact
  dependency-version: '4'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* ci(deps): bump actions/checkout from 4 to 6 (#223)

Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to 6.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v6)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Fix Build 8

* Fix Build 9

* Fix Build 10

* Fix Build 11

* Fix Build 12

* Fix Build 13

* v0.10.17

(cherry picked from commit 176de521b4)

* CI Check use production mode

* Fix OT2 & ReAdd Virtual Devices

* add msg goal

* transfer liquid handles

* gather query

* add unilabos_class

* Support root node change pos

* save class name when deserialize & protocol execute test

* fix upload workflow json

* workflow upload & set liquid fix & add set liquid with plate

* speed up registry load

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: hanhua@dp.tech <2509856570@qq.com>
This commit is contained in:
Xuwznln
2026-02-02 23:57:13 +08:00
committed by GitHub
parent c4a3be1498
commit e30c01d54e
65 changed files with 4603 additions and 1655 deletions

View File

@@ -770,13 +770,16 @@ def ros_message_to_json_schema(msg_class: Any, field_name: str) -> Dict[str, Any
return schema
def ros_action_to_json_schema(action_class: Any, description="") -> Dict[str, Any]:
def ros_action_to_json_schema(
action_class: Any, description="", previous_schema: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
将 ROS Action 类转换为 JSON Schema
Args:
action_class: ROS Action 类
description: 描述
previous_schema: 之前的 schema用于保留 goal/feedback/result 下一级字段的 description
Returns:
完整的 JSON Schema 定义
@@ -810,9 +813,44 @@ def ros_action_to_json_schema(action_class: Any, description="") -> Dict[str, An
"required": ["goal"],
}
# 保留之前 schema 中 goal/feedback/result 下一级字段的 description
if previous_schema:
_preserve_field_descriptions(schema, previous_schema)
return schema
def _preserve_field_descriptions(
new_schema: Dict[str, Any], previous_schema: Dict[str, Any]
) -> None:
"""
保留之前 schema 中 goal/feedback/result 下一级字段的 description 和 title
Args:
new_schema: 新生成的 schema会被修改
previous_schema: 之前的 schema
"""
for section in ["goal", "feedback", "result"]:
new_section = new_schema.get("properties", {}).get(section, {})
prev_section = previous_schema.get("properties", {}).get(section, {})
if not new_section or not prev_section:
continue
new_props = new_section.get("properties", {})
prev_props = prev_section.get("properties", {})
for field_name, field_schema in new_props.items():
if field_name in prev_props:
prev_field = prev_props[field_name]
# 保留字段的 description
if "description" in prev_field and prev_field["description"]:
field_schema["description"] = prev_field["description"]
# 保留字段的 title用户自定义的中文名
if "title" in prev_field and prev_field["title"]:
field_schema["title"] = prev_field["title"]
def convert_ros_action_to_jsonschema(
action_name_or_type: Union[str, Type], output_file: Optional[str] = None, format: str = "json"
) -> Dict[str, Any]:

View File

@@ -49,7 +49,6 @@ from unilabos.resources.resource_tracker import (
ResourceTreeInstance,
ResourceDictInstance,
)
from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
from rclpy.task import Task, Future
from unilabos.utils.import_manager import default_manager
@@ -185,7 +184,7 @@ class PropertyPublisher:
f"创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}"
)
self.timer = node.create_timer(self.timer_period, self.publish_property)
self.__loop = get_event_loop()
self.__loop = ROS2DeviceNode.get_asyncio_loop()
str_msg_type = str(msg_type)[8:-2]
self.node.lab_logger().trace(f"发布属性: {name}, 类型: {str_msg_type}, 周期: {initial_period}秒, QoS: {qos}")
@@ -885,6 +884,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
parent_appended = True
# 加载状态
original_instance.location = plr_resource.location
original_instance.rotation = plr_resource.rotation
original_instance.barcode = plr_resource.barcode
original_instance.load_all_state(states)
child_count = len(original_instance.get_all_children())
self.lab_logger().info(
@@ -1320,19 +1322,32 @@ class BaseROS2DeviceNode(Node, Generic[T]):
resource_inputs = action_kwargs[k] if is_sequence else [action_kwargs[k]]
# 批量查询资源
queried_resources = []
for resource_data in resource_inputs:
queried_resources: list = [None] * len(resource_inputs)
uuid_indices: list[tuple[int, str, dict]] = [] # (index, uuid, resource_data)
# 第一遍处理没有uuid的资源收集有uuid的资源信息
for idx, resource_data in enumerate(resource_inputs):
unilabos_uuid = resource_data.get("data", {}).get("unilabos_uuid")
if unilabos_uuid is None:
plr_resource = await self.get_resource_with_dir(
resource_id=resource_data["id"], with_children=True
)
if "sample_id" in resource_data:
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
queried_resources[idx] = plr_resource
else:
resource_tree = await self.get_resource([unilabos_uuid])
plr_resource = resource_tree.to_plr_resources()[0]
if "sample_id" in resource_data:
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
queried_resources.append(plr_resource)
uuid_indices.append((idx, unilabos_uuid, resource_data))
# 第二遍批量查询有uuid的资源
if uuid_indices:
uuids = [item[1] for item in uuid_indices]
resource_tree = await self.get_resource(uuids)
plr_resources = resource_tree.to_plr_resources()
for i, (idx, _, resource_data) in enumerate(uuid_indices):
plr_resource = plr_resources[i]
if "sample_id" in resource_data:
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
queried_resources[idx] = plr_resource
self.lab_logger().debug(f"资源查询结果: 共 {len(queried_resources)} 个资源")
@@ -1757,6 +1772,15 @@ class ROS2DeviceNode:
它不继承设备类,而是通过代理模式访问设备类的属性和方法。
"""
# 类变量,用于循环管理
_asyncio_loop = None
_asyncio_loop_running = False
_asyncio_loop_thread = None
@classmethod
def get_asyncio_loop(cls):
return cls._asyncio_loop
@staticmethod
async def safe_task_wrapper(trace_callback, func, **kwargs):
try:
@@ -1833,6 +1857,11 @@ class ROS2DeviceNode:
print_publish: 是否打印发布信息
driver_is_ros:
"""
# 在初始化时检查循环状态
if ROS2DeviceNode._asyncio_loop_running and ROS2DeviceNode._asyncio_loop_thread is not None:
pass
elif ROS2DeviceNode._asyncio_loop_thread is None:
self._start_loop()
# 保存设备类是否支持异步上下文
self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__")
@@ -1924,6 +1953,17 @@ class ROS2DeviceNode:
except Exception as e:
self._ros_node.lab_logger().error(f"设备后初始化失败: {e}")
def _start_loop(self):
def run_event_loop():
loop = asyncio.new_event_loop()
ROS2DeviceNode._asyncio_loop = loop
asyncio.set_event_loop(loop)
loop.run_forever()
ROS2DeviceNode._asyncio_loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNode")
ROS2DeviceNode._asyncio_loop_thread.start()
logger.info(f"循环线程已启动")
class DeviceInfoType(TypedDict):
id: str

View File

@@ -5,7 +5,8 @@ import threading
import time
import traceback
import uuid
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, TypedDict, Union
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union
from typing_extensions import TypedDict
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Point
@@ -62,6 +63,18 @@ class TestResourceReturn(TypedDict):
devices: List[DeviceSlot]
class TestLatencyReturn(TypedDict):
"""test_latency方法的返回值类型"""
avg_rtt_ms: float
avg_time_diff_ms: float
max_time_error_ms: float
task_delay_ms: float
raw_delay_ms: float
test_count: int
status: str
class HostNode(BaseROS2DeviceNode):
"""
主机节点类,负责管理设备、资源和控制器
@@ -795,6 +808,7 @@ class HostNode(BaseROS2DeviceNode):
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
self.lab_logger().info(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}")
self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {action_kwargs}")
self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {goal_msg}")
action_client.wait_for_server()
goal_uuid_obj = UUID(uuid=list(u.bytes))
@@ -853,8 +867,13 @@ class HostNode(BaseROS2DeviceNode):
# 适配后端的一些额外处理
return_value = return_info.get("return_value")
if isinstance(return_value, dict):
unilabos_samples = return_info.get("unilabos_samples")
if isinstance(unilabos_samples, list):
unilabos_samples = return_value.pop("unilabos_samples", None)
if isinstance(unilabos_samples, list) and unilabos_samples:
self.lab_logger().info(
f"[Host Node] Job {job_id[:8]} returned {len(unilabos_samples)} sample(s): "
f"{[s.get('name', s.get('id', 'unknown')) if isinstance(s, dict) else str(s)[:20] for s in unilabos_samples[:5]]}"
f"{'...' if len(unilabos_samples) > 5 else ''}"
)
return_info["unilabos_samples"] = unilabos_samples
suc = return_info.get("suc", False)
if not suc:
@@ -881,7 +900,7 @@ class HostNode(BaseROS2DeviceNode):
# 清理 _goals 中的记录
if job_id in self._goals:
del self._goals[job_id]
self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals")
self.lab_logger().trace(f"[Host Node] Removed goal {job_id[:8]} from _goals")
# 存储结果供 HTTP API 查询
try:
@@ -1326,10 +1345,20 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().debug(f"[Host Node-Resource] List parameters: {request}")
return response
def test_latency(self):
def test_latency(self) -> TestLatencyReturn:
"""
测试网络延迟的action实现
通过5次ping-pong机制校对时间误差并计算实际延迟
Returns:
TestLatencyReturn: 包含延迟测试结果的字典,包括:
- avg_rtt_ms: 平均往返时间(毫秒)
- avg_time_diff_ms: 平均时间差(毫秒)
- max_time_error_ms: 最大时间误差(毫秒)
- task_delay_ms: 实际任务延迟(毫秒),-1表示无法计算
- raw_delay_ms: 原始时间差(毫秒),-1表示无法计算
- test_count: 有效测试次数
- status: 测试状态,"success"表示成功,"all_timeout"表示全部超时
"""
import uuid as uuid_module
@@ -1392,7 +1421,15 @@ class HostNode(BaseROS2DeviceNode):
if not ping_results:
self.lab_logger().error("❌ 所有ping-pong测试都失败了")
return {"status": "all_timeout"}
return {
"avg_rtt_ms": -1.0,
"avg_time_diff_ms": -1.0,
"max_time_error_ms": -1.0,
"task_delay_ms": -1.0,
"raw_delay_ms": -1.0,
"test_count": 0,
"status": "all_timeout",
}
# 统计分析
rtts = [r["rtt_ms"] for r in ping_results]
@@ -1400,7 +1437,7 @@ class HostNode(BaseROS2DeviceNode):
avg_rtt_ms = sum(rtts) / len(rtts)
avg_time_diff_ms = sum(time_diffs) / len(time_diffs)
max_time_diff_error_ms = max(abs(min(time_diffs)), abs(max(time_diffs)))
max_time_diff_error_ms: float = max(abs(min(time_diffs)), abs(max(time_diffs)))
self.lab_logger().info("-" * 50)
self.lab_logger().info("[测试统计]")
@@ -1440,7 +1477,7 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info("=" * 60)
return {
res: TestLatencyReturn = {
"avg_rtt_ms": avg_rtt_ms,
"avg_time_diff_ms": avg_time_diff_ms,
"max_time_error_ms": max_time_diff_error_ms,
@@ -1451,9 +1488,14 @@ class HostNode(BaseROS2DeviceNode):
"test_count": len(ping_results),
"status": "success",
}
return res
def test_resource(
self, resource: ResourceSlot = None, resources: List[ResourceSlot] = None, device: DeviceSlot = None, devices: List[DeviceSlot] = None
self,
resource: ResourceSlot = None,
resources: List[ResourceSlot] = None,
device: DeviceSlot = None,
devices: List[DeviceSlot] = None,
) -> TestResourceReturn:
if resources is None:
resources = []
@@ -1514,7 +1556,9 @@ class HostNode(BaseROS2DeviceNode):
# 构建服务地址
srv_address = f"/srv{namespace}/s2c_resource_tree"
self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation started -------")
self.lab_logger().trace(
f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation started -------"
)
# 创建服务客户端
sclient = self.create_client(SerialCommand, srv_address)
@@ -1549,7 +1593,9 @@ class HostNode(BaseROS2DeviceNode):
time.sleep(0.05)
response = future.result()
self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation completed -------")
self.lab_logger().trace(
f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation completed -------"
)
return True
except Exception as e:

View File

@@ -6,8 +6,6 @@ from typing import List, Dict, Any, Optional, TYPE_CHECKING
import rclpy
from rosidl_runtime_py import message_to_ordereddict
from unilabos_msgs.msg import Resource
from unilabos_msgs.srv import ResourceUpdate
from unilabos.messages import * # type: ignore # protocol names
from rclpy.action import ActionServer, ActionClient
@@ -15,7 +13,6 @@ from rclpy.action.server import ServerGoalHandle
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unilabos.compile import action_protocol_generators
from unilabos.resources.graphio import nested_dict_to_list
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
get_action_type,
@@ -231,15 +228,15 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
try:
# 统一处理单个或多个资源
resource_id = (
protocol_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else protocol_kwargs[k][0]["id"]
protocol_kwargs[k]["id"]
if v == "unilabos_msgs/Resource"
else protocol_kwargs[k][0]["id"]
)
resource_uuid = protocol_kwargs[k].get("uuid", None)
r = SerialCommand_Request()
r.command = json.dumps({"id": resource_id, "uuid": resource_uuid, "with_children": True})
# 发送请求并等待响应
response: SerialCommand_Response = await self._resource_clients[
"resource_get"
].call_async(
response: SerialCommand_Response = await self._resource_clients["resource_get"].call_async(
r
) # type: ignore
raw_data = json.loads(response.response)
@@ -307,12 +304,52 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
# 向Host更新物料当前状态
for k, v in goal.get_fields_and_field_types().items():
if v in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
r = ResourceUpdate.Request()
r.resources = [
convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k])
]
response = await self._resource_clients["resource_update"].call_async(r)
if v not in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
continue
self.lab_logger().info(f"更新资源状态: {k}")
try:
# 去重:使用 seen 集合获取唯一的资源对象
seen = set()
unique_resources = []
# 获取资源数据,统一转换为列表
resource_data = protocol_kwargs[k]
is_sequence = v != "unilabos_msgs/Resource"
if not is_sequence:
resource_list = [resource_data] if isinstance(resource_data, dict) else resource_data
else:
# 处理序列类型,可能是嵌套列表
resource_list = []
if isinstance(resource_data, list):
for item in resource_data:
if isinstance(item, list):
resource_list.extend(item)
else:
resource_list.append(item)
else:
resource_list = [resource_data]
for res_data in resource_list:
if not isinstance(res_data, dict):
continue
res_name = res_data.get("id") or res_data.get("name")
if not res_name:
continue
# 使用 resource_tracker 获取本地 PLR 实例
plr = self.resource_tracker.figure_resource({"name": res_name}, try_mode=False)
# 获取父资源
res = self.resource_tracker.parent_resource(plr)
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)
# 使用新的资源树接口更新
if unique_resources:
await self.update_resource(unique_resources)
except Exception as e:
self.lab_logger().error(f"资源更新失败: {e}")
self.lab_logger().error(traceback.format_exc())
# 设置成功状态和返回值
execution_success = True

View File

@@ -1,182 +0,0 @@
import asyncio
from asyncio import events
import threading
import rclpy
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.executors import await_or_execute, Executor
from rclpy.action import ActionClient, ActionServer
from rclpy.action.server import ServerGoalHandle, GoalResponse, GoalInfo, GoalStatus
from std_msgs.msg import String
from action_tutorials_interfaces.action import Fibonacci
loop = None
def get_event_loop():
global loop
return loop
async def default_handle_accepted_callback_async(goal_handle):
"""Execute the goal."""
await goal_handle.execute()
class ServerGoalHandleX(ServerGoalHandle):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def execute(self, execute_callback=None):
# It's possible that there has been a request to cancel the goal prior to executing.
# In this case we want to avoid the illegal state transition to EXECUTING
# but still call the users execute callback to let them handle canceling the goal.
if not self.is_cancel_requested:
self._update_state(_rclpy.GoalEvent.EXECUTE)
await self._action_server.notify_execute_async(self, execute_callback)
class ActionServerX(ActionServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.register_handle_accepted_callback(default_handle_accepted_callback_async)
async def _execute_goal_request(self, request_header_and_message):
request_header, goal_request = request_header_and_message
goal_uuid = goal_request.goal_id
goal_info = GoalInfo()
goal_info.goal_id = goal_uuid
self._node.get_logger().debug('New goal request with ID: {0}'.format(goal_uuid.uuid))
# Check if goal ID is already being tracked by this action server
with self._lock:
goal_id_exists = self._handle.goal_exists(goal_info)
accepted = False
if not goal_id_exists:
# Call user goal callback
response = await await_or_execute(self._goal_callback, goal_request.goal)
if not isinstance(response, GoalResponse):
self._node.get_logger().warning(
'Goal request callback did not return a GoalResponse type. Rejecting goal.')
else:
accepted = GoalResponse.ACCEPT == response
if accepted:
# Stamp time of acceptance
goal_info.stamp = self._node.get_clock().now().to_msg()
# Create a goal handle
try:
with self._lock:
goal_handle = ServerGoalHandleX(self, goal_info, goal_request.goal)
except RuntimeError as e:
self._node.get_logger().error(
'Failed to accept new goal with ID {0}: {1}'.format(goal_uuid.uuid, e))
accepted = False
else:
self._goal_handles[bytes(goal_uuid.uuid)] = goal_handle
# Send response
response_msg = self._action_type.Impl.SendGoalService.Response()
response_msg.accepted = accepted
response_msg.stamp = goal_info.stamp
self._handle.send_goal_response(request_header, response_msg)
if not accepted:
self._node.get_logger().debug('New goal rejected: {0}'.format(goal_uuid.uuid))
return
self._node.get_logger().debug('New goal accepted: {0}'.format(goal_uuid.uuid))
# Provide the user a reference to the goal handle
# await await_or_execute(self._handle_accepted_callback, goal_handle)
asyncio.create_task(self._handle_accepted_callback(goal_handle))
async def notify_execute_async(self, goal_handle, execute_callback):
# Use provided callback, defaulting to a previously registered callback
if execute_callback is None:
if self._execute_callback is None:
return
execute_callback = self._execute_callback
# Schedule user callback for execution
self._node.get_logger().info(f"{events.get_running_loop()}")
asyncio.create_task(self._execute_goal(execute_callback, goal_handle))
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# task = loop.create_task(self._execute_goal(execute_callback, goal_handle))
# await task
class ActionClientX(ActionClient):
feedback_queue = asyncio.Queue()
async def feedback_cb(self, msg):
await self.feedback_queue.put(msg)
async def send_goal_async(self, goal_msg):
goal_future = super().send_goal_async(
goal_msg,
feedback_callback=self.feedback_cb
)
client_goal_handle = await asyncio.ensure_future(goal_future)
if not client_goal_handle.accepted:
raise Exception("Goal rejected.")
result_future = client_goal_handle.get_result_async()
while True:
feedback_future = asyncio.ensure_future(self.feedback_queue.get())
tasks = [result_future, feedback_future]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
if result_future.done():
result = result_future.result().result
yield (None, result)
break
else:
feedback = feedback_future.result().feedback
yield (feedback, None)
async def main(node):
print('Node started.')
action_client = ActionClientX(node, Fibonacci, 'fibonacci')
goal_msg = Fibonacci.Goal()
goal_msg.order = 10
async for (feedback, result) in action_client.send_goal_async(goal_msg):
if feedback:
print(f'Feedback: {feedback}')
else:
print(f'Result: {result}')
print('Finished.')
async def ros_loop_node(node):
while rclpy.ok():
rclpy.spin_once(node, timeout_sec=0)
await asyncio.sleep(1e-4)
async def ros_loop(executor: Executor):
while rclpy.ok():
executor.spin_once(timeout_sec=0)
await asyncio.sleep(1e-4)
def run_event_loop():
global loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_forever()
def run_event_loop_in_thread():
thread = threading.Thread(target=run_event_loop, args=())
thread.start()
if __name__ == "__main__":
rclpy.init()
node = rclpy.create_node('async_subscriber')
future = asyncio.wait([ros_loop(node), main()])
asyncio.get_event_loop().run_until_complete(future)