Feat/samples (#229)

* add sample_material

* adapt to new samples sys
This commit is contained in:
Xuwznln
2026-02-05 00:42:12 +08:00
committed by GitHub
parent 26271bcab8
commit 957fb41a6f
5 changed files with 54 additions and 49 deletions

View File

@@ -54,6 +54,7 @@ class JobAddReq(BaseModel):
action_type: str = Field( action_type: str = Field(
examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default="" examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default=""
) )
sample_material: dict = Field(examples=[{"string": "string"}], description="sample uuid to material uuid", default_factory=dict)
action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict) action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict)
task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="") task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="")
job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="") job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="")

View File

@@ -327,6 +327,7 @@ def job_add(req: JobAddReq) -> JobData:
queue_item, queue_item,
action_type=action_type, action_type=action_type,
action_kwargs=action_args, action_kwargs=action_args,
sample_material=req.sample_material,
server_info=server_info, server_info=server_info,
) )

View File

@@ -545,7 +545,7 @@ class MessageProcessor:
try: try:
message_str = json.dumps(msg, ensure_ascii=False) message_str = json.dumps(msg, ensure_ascii=False)
await self.websocket.send(message_str) await self.websocket.send(message_str)
logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501 # logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501
except Exception as e: except Exception as e:
logger.error(f"[MessageProcessor] Failed to send message: {str(e)}") logger.error(f"[MessageProcessor] Failed to send message: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
@@ -688,6 +688,7 @@ class MessageProcessor:
queue_item, queue_item,
action_type=req.action_type, action_type=req.action_type,
action_kwargs=req.action_args, action_kwargs=req.action_args,
sample_material=req.sample_material,
server_info=req.server_info, server_info=req.server_info,
) )
@@ -1301,7 +1302,7 @@ class WebSocketClient(BaseCommunicationClient):
}, },
} }
self.message_processor.send_message(message) self.message_processor.send_message(message)
logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}") # logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}")
def publish_job_status( def publish_job_status(
self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None

View File

@@ -231,14 +231,15 @@ class PropertyPublisher:
def publish_property(self): def publish_property(self):
try: try:
self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}") # self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}")
value = self.get_property() value = self.get_property()
if self.print_publish: if self.print_publish:
self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}") pass
# self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}")
if value is not None: if value is not None:
msg = convert_to_ros_msg(self.msg_type, value) msg = convert_to_ros_msg(self.msg_type, value)
self.publisher_.publish(msg) self.publisher_.publish(msg)
self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功") # self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功")
except Exception as e: except Exception as e:
self.node.lab_logger().error( self.node.lab_logger().error(
f"【.publish_property】发布属性 {self.publisher_.topic} 出错: {str(e)}\n{traceback.format_exc()}" f"【.publish_property】发布属性 {self.publisher_.topic} 出错: {str(e)}\n{traceback.format_exc()}"
@@ -1594,7 +1595,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
function_name = target["function_name"] function_name = target["function_name"]
function_args = target["function_args"] function_args = target["function_args"]
# 获取 unilabos 系统参数 # 获取 unilabos 系统参数
unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {}) unilabos_param: Dict[str, Any] = target[JSON_UNILABOS_PARAM]
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
function = getattr(self.driver_instance, function_name) function = getattr(self.driver_instance, function_name)
@@ -1612,9 +1613,19 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if arg_name not in function_args: if arg_name not in function_args:
# 处理 sample_uuids 参数注入 # 处理 sample_uuids 参数注入
if arg_name == PARAM_SAMPLE_UUIDS: if arg_name == PARAM_SAMPLE_UUIDS:
function_args[PARAM_SAMPLE_UUIDS] = unilabos_param.get(PARAM_SAMPLE_UUIDS, []) raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {})
# 将 material uuid 转换为 resource 实例
# key: sample_uuid, value: material_uuid -> resource 实例
resolved_sample_uuids: Dict[str, Any] = {}
for sample_uuid, material_uuid in raw_sample_uuids.items():
if material_uuid and self.resource_tracker:
resource = self.resource_tracker.uuid_to_resources.get(material_uuid)
resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid
else:
resolved_sample_uuids[sample_uuid] = material_uuid
function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids
self.lab_logger().debug( self.lab_logger().debug(
f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {function_args[PARAM_SAMPLE_UUIDS]}" f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}"
) )
continue continue
@@ -1754,9 +1765,19 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if arg_name not in function_args: if arg_name not in function_args:
# 处理 sample_uuids 参数注入 # 处理 sample_uuids 参数注入
if arg_name == PARAM_SAMPLE_UUIDS: if arg_name == PARAM_SAMPLE_UUIDS:
function_args[PARAM_SAMPLE_UUIDS] = unilabos_param.get(PARAM_SAMPLE_UUIDS, []) raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {})
# 将 material uuid 转换为 resource 实例
# key: sample_uuid, value: material_uuid -> resource 实例
resolved_sample_uuids: Dict[str, Any] = {}
for sample_uuid, material_uuid in raw_sample_uuids.items():
if material_uuid and self.resource_tracker:
resource = self.resource_tracker.uuid_to_resources.get(material_uuid)
resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid
else:
resolved_sample_uuids[sample_uuid] = material_uuid
function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids
self.lab_logger().debug( self.lab_logger().debug(
f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {function_args[PARAM_SAMPLE_UUIDS]}" f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}"
) )
continue continue

View File

@@ -1,17 +1,17 @@
import collections import collections
from dataclasses import dataclass, field
import json import json
import threading import threading
import time import time
import traceback import traceback
import uuid import uuid
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union
from typing_extensions import TypedDict
from action_msgs.msg import GoalStatus from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Point from geometry_msgs.msg import Point
from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.service import Service from rclpy.service import Service
from typing_extensions import TypedDict
from unilabos_msgs.msg import Resource # type: ignore from unilabos_msgs.msg import Resource # type: ignore
from unilabos_msgs.srv import ( from unilabos_msgs.srv import (
ResourceAdd, ResourceAdd,
@@ -23,10 +23,20 @@ from unilabos_msgs.srv import (
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID from unique_identifier_msgs.msg import UUID
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.registry.registry import lab_registry from unilabos.registry.registry import lab_registry
from unilabos.resources.container import RegularContainer from unilabos.resources.container import RegularContainer
from unilabos.resources.graphio import initialize_resource from unilabos.resources.graphio import initialize_resource
from unilabos.resources.registry import add_schema from unilabos.resources.registry import add_schema
from unilabos.resources.resource_tracker import (
ResourceDict,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
RETURN_UNILABOS_SAMPLES,
JSON_UNILABOS_PARAM,
PARAM_SAMPLE_UUIDS,
)
from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import ( from unilabos.ros.msgs.message_converter import (
get_msg_type, get_msg_type,
@@ -37,20 +47,10 @@ from unilabos.ros.msgs.message_converter import (
) )
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.ros.nodes.presets.controller_node import ControllerNode from unilabos.ros.nodes.presets.controller_node import ControllerNode
from unilabos.resources.resource_tracker import (
ResourceDict,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
EXTRA_SAMPLE_UUID,
EXTRA_UNILABOS_SAMPLE_UUID,
RETURN_UNILABOS_SAMPLES,
)
from unilabos.utils import logger from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.log import warning from unilabos.utils.log import warning
from unilabos.utils.type_check import serialize_result_info from unilabos.utils.type_check import serialize_result_info
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
if TYPE_CHECKING: if TYPE_CHECKING:
from unilabos.app.ws_client import QueueItem from unilabos.app.ws_client import QueueItem
@@ -758,6 +758,7 @@ class HostNode(BaseROS2DeviceNode):
item: "QueueItem", item: "QueueItem",
action_type: str, action_type: str,
action_kwargs: Dict[str, Any], action_kwargs: Dict[str, Any],
sample_material: Dict[str, str],
server_info: Optional[Dict[str, Any]] = None, server_info: Optional[Dict[str, Any]] = None,
) -> None: ) -> None:
""" """
@@ -775,14 +776,14 @@ class HostNode(BaseROS2DeviceNode):
if action_name.startswith("auto-"): if action_name.startswith("auto-"):
action_name = action_name[5:] action_name = action_name[5:]
action_id = f"/devices/{device_id}/_execute_driver_command" action_id = f"/devices/{device_id}/_execute_driver_command"
action_kwargs = { json_command: Dict[str, Any] = {
"string": json.dumps( "function_name": action_name,
{ "function_args": action_kwargs,
"function_name": action_name, JSON_UNILABOS_PARAM: {
"function_args": action_kwargs, PARAM_SAMPLE_UUIDS: sample_material,
} },
)
} }
action_kwargs = {"string": json.dumps(json_command)}
if action_type.startswith("UniLabJsonCommandAsync"): if action_type.startswith("UniLabJsonCommandAsync"):
action_id = f"/devices/{device_id}/_execute_driver_command_async" action_id = f"/devices/{device_id}/_execute_driver_command_async"
else: else:
@@ -793,26 +794,6 @@ class HostNode(BaseROS2DeviceNode):
raise ValueError(f"ActionClient {action_id} not found.") raise ValueError(f"ActionClient {action_id} not found.")
action_client: ActionClient = self._action_clients[action_id] action_client: ActionClient = self._action_clients[action_id]
# 遍历action_kwargs下的所有子dict将sample_uuid的值赋给sample_id
def assign_sample_id(obj):
if isinstance(obj, dict):
# 处理 EXTRA_SAMPLE_UUID ("sample_uuid")
if EXTRA_SAMPLE_UUID in obj:
obj["sample_id"] = obj[EXTRA_SAMPLE_UUID]
obj.pop(EXTRA_SAMPLE_UUID)
# 处理 EXTRA_UNILABOS_SAMPLE_UUID ("unilabos_sample_uuid")
if EXTRA_UNILABOS_SAMPLE_UUID in obj:
obj["sample_id"] = obj[EXTRA_UNILABOS_SAMPLE_UUID]
obj.pop(EXTRA_UNILABOS_SAMPLE_UUID)
for k, v in obj.items():
if k != "unilabos_extra":
assign_sample_id(v)
elif isinstance(obj, list):
for item in obj:
assign_sample_id(item)
assign_sample_id(action_kwargs)
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs) goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
# self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}") # self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}")