新增test_resource动作

This commit is contained in:
Xuwznln
2025-10-11 02:53:18 +08:00
parent 0c42d60cf2
commit 704e13f030
2 changed files with 99 additions and 29 deletions

View File

@@ -7,16 +7,17 @@ import importlib
from pathlib import Path
from typing import Any, Dict, List, Union, Tuple
import msgcenterpy
import yaml
from rosidl_parser.definition import UnboundedSequence
from unilabos_msgs.action import LiquidHandlerTransfer
from unilabos_msgs.msg import Resource
from unilabos.config.config import BasicConfig
from unilabos.resources.graphio import resource_plr_to_ulab, tree_to_list
from unilabos.ros.msgs.message_converter import msg_converter_manager, ros_action_to_json_schema, String, \
ros_message_to_json_schema
from unilabos.ros.msgs.message_converter import (
msg_converter_manager,
ros_action_to_json_schema,
String,
ros_message_to_json_schema,
)
from unilabos.utils import logger
from unilabos.utils.decorator import singleton
from unilabos.utils.import_manager import get_enhanced_class_info, get_class
@@ -24,6 +25,7 @@ from unilabos.utils.type_check import NoAliasDumper
DEFAULT_PATHS = [Path(__file__).absolute().parent]
class ROSMsgNotFound(Exception):
pass
@@ -139,13 +141,57 @@ class Registry:
"type": self.EmptyIn,
"goal": {},
"feedback": {},
"result": {"latency_ms": "latency_ms", "time_diff_ms": "time_diff_ms"},
"result": {},
"schema": ros_action_to_json_schema(
self.EmptyIn, "用于测试延迟的动作,返回延迟时间和时间差。"
),
"goal_default": {},
"handles": {},
},
"test_resource": {
"type": "UniLabJsonCommand",
"goal": {},
"feedback": {},
"result": {},
"schema": {
"description": "",
"properties": {
"feedback": {},
"goal": {
"properties": {
"resource": {
"properties": ros_message_to_json_schema(Resource, "resource"),
"type": "object"
},
"resources": {
"items": {
"properties": ros_message_to_json_schema(Resource, "resources"),
"title": "mount_resource",
"type": "object"
},
"type": "array"
},
"device": {
"type": "object"
},
"devices": {
"items": {
"type": "object"
},
"type": "array"
},
},
"type": "object"
},
"result": {}
},
"required": ["goal"],
"title": "transfer_resource_to_another参数",
"type": "object",
},
"goal_default": {},
"handles": {},
},
},
},
"version": "1.0.0",
@@ -436,10 +482,12 @@ class Registry:
elif param_type == ("list", "unilabos.registry.placeholder_type:ResourceSlot"):
schema["properties"][param_name] = {
"items": ros_message_to_json_schema(Resource, param_name),
"type": "array"
"type": "array",
}
else:
schema["properties"][param_name] = self._generate_schema_from_info(param_name, param_type, param_default)
schema["properties"][param_name] = self._generate_schema_from_info(
param_name, param_type, param_default
)
if param_required:
schema["required"].append(param_name)
@@ -512,7 +560,9 @@ class Registry:
status_type = "String" # 替换成ROS的String便于显示
device_config["class"]["status_types"][status_name] = status_type
try:
target_type = self._replace_type_with_class(status_type, device_id, f"状态 {status_name}")
target_type = self._replace_type_with_class(
status_type, device_id, f"状态 {status_name}"
)
except ROSMsgNotFound:
continue
if target_type in [
@@ -550,10 +600,22 @@ class Registry:
"goal_default": {i["name"]: i["default"] for i in v["args"]},
"handles": [],
"placeholder_keys": {
i["name"]: "unilabos_resources" if i["type"] == "unilabos.registry.placeholder_type:ResourceSlot" or i["type"] == ("list", "unilabos.registry.placeholder_type:ResourceSlot") else "unilabos_devices"
i["name"]: (
"unilabos_resources"
if i["type"] == "unilabos.registry.placeholder_type:ResourceSlot"
or i["type"]
== ("list", "unilabos.registry.placeholder_type:ResourceSlot")
else "unilabos_devices"
)
for i in v["args"]
if i.get("type", "") in ["unilabos.registry.placeholder_type:ResourceSlot", "unilabos.registry.placeholder_type:DeviceSlot", ("list", "unilabos.registry.placeholder_type:ResourceSlot"), ("list", "unilabos.registry.placeholder_type:DeviceSlot")]
}
if i.get("type", "")
in [
"unilabos.registry.placeholder_type:ResourceSlot",
"unilabos.registry.placeholder_type:DeviceSlot",
("list", "unilabos.registry.placeholder_type:ResourceSlot"),
("list", "unilabos.registry.placeholder_type:DeviceSlot"),
]
},
}
# 不生成已配置action的动作
for k, v in enhanced_info["action_methods"].items()