mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 13:01:12 +00:00
更新schema的title字段
This commit is contained in:
@@ -508,7 +508,7 @@ def convert_from_ros_msg_with_mapping(ros_msg: Any, value_mapping: Dict[str, str
|
||||
Python字典
|
||||
"""
|
||||
data: Dict[str, Any] = {}
|
||||
|
||||
|
||||
# # 🔧 添加调试信息
|
||||
# print(f"🔍 convert_from_ros_msg_with_mapping 开始")
|
||||
# print(f"🔍 ros_msg 类型: {type(ros_msg)}")
|
||||
@@ -517,14 +517,14 @@ def convert_from_ros_msg_with_mapping(ros_msg: Any, value_mapping: Dict[str, str
|
||||
# print("-" * 60)
|
||||
|
||||
for msg_name, attr_name in value_mapping.items():
|
||||
# print(f"🔍 处理映射: {msg_name} -> {attr_name}")
|
||||
|
||||
# print(f"🔍 处理映射: {msg_name} -> {attr_name}")
|
||||
|
||||
msg_path = msg_name.split(".")
|
||||
current = ros_msg
|
||||
|
||||
|
||||
# print(f"🔍 msg_path: {msg_path}")
|
||||
# print(f"🔍 current 初始值: {current} (类型: {type(current)})")
|
||||
|
||||
|
||||
try:
|
||||
if not attr_name.endswith("[]"):
|
||||
# 处理单值映射
|
||||
@@ -537,7 +537,7 @@ def convert_from_ros_msg_with_mapping(ros_msg: Any, value_mapping: Dict[str, str
|
||||
else:
|
||||
# print(f"❌ 属性 '{name}' 不存在于 {type(current)}")
|
||||
break
|
||||
|
||||
|
||||
converted_value = convert_from_ros_msg(current)
|
||||
# print(f"🔍 转换后的值: {converted_value} (类型: {type(converted_value)})")
|
||||
data[attr_name] = converted_value
|
||||
@@ -585,13 +585,13 @@ def convert_from_ros_msg_with_mapping(ros_msg: Any, value_mapping: Dict[str, str
|
||||
# print(f"❌ 映射转换错误 {msg_name} -> {attr_name}: {e}")
|
||||
logger.debug(f"Mapping conversion error for {msg_name} -> {attr_name}")
|
||||
continue
|
||||
|
||||
|
||||
# print(f"🔍 当前 data 状态: {data}")
|
||||
# print("-" * 40)
|
||||
|
||||
#print(f"🔍 convert_from_ros_msg_with_mapping 结束")
|
||||
#print(f"🔍 最终 data: {data}")
|
||||
#print("=" * 60)
|
||||
# print(f"🔍 convert_from_ros_msg_with_mapping 结束")
|
||||
# print(f"🔍 最终 data: {data}")
|
||||
# print("=" * 60)
|
||||
return data
|
||||
|
||||
|
||||
@@ -646,25 +646,28 @@ basic_type_map = {
|
||||
}
|
||||
|
||||
|
||||
def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str = None) -> Dict[str, Any]:
|
||||
def ros_field_type_to_json_schema(
|
||||
type_info: Type | str, field_name: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
将 ROS 字段类型转换为 JSON Schema 类型定义
|
||||
|
||||
Args:
|
||||
type_info: ROS 类型
|
||||
slot_type: ROS 类型
|
||||
field_name: 字段名,用于设置复杂类型的title
|
||||
|
||||
Returns:
|
||||
对应的 JSON Schema 类型定义
|
||||
"""
|
||||
if isinstance(type_info, UnboundedSequence):
|
||||
return {"type": "array", "items": ros_field_type_to_json_schema(type_info.value_type)}
|
||||
return {"type": "array", "items": ros_field_type_to_json_schema(type_info.value_type, field_name)} # type: ignore
|
||||
if isinstance(type_info, NamespacedType):
|
||||
cls_name = ".".join(type_info.namespaces) + ":" + type_info.name
|
||||
type_class = msg_converter_manager.get_class(cls_name)
|
||||
return ros_field_type_to_json_schema(type_class)
|
||||
return ros_field_type_to_json_schema(type_class, field_name)
|
||||
elif isinstance(type_info, BasicType):
|
||||
return ros_field_type_to_json_schema(type_info.typename)
|
||||
return ros_field_type_to_json_schema(type_info.typename, field_name)
|
||||
elif isinstance(type_info, UnboundedString):
|
||||
return basic_type_map["string"]
|
||||
elif isinstance(type_info, str):
|
||||
@@ -681,8 +684,9 @@ def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str = None)
|
||||
},
|
||||
"required": ["sec", "nanosec"],
|
||||
}
|
||||
return {}
|
||||
else:
|
||||
return ros_message_to_json_schema(type_info)
|
||||
return ros_message_to_json_schema(type_info, field_name)
|
||||
# # 处理数组类型
|
||||
# if field_type.endswith('[]'):
|
||||
# item_type = field_type[:-2]
|
||||
@@ -706,28 +710,28 @@ def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str = None)
|
||||
# return {'type': 'object', 'description': f'未知类型: {field_type}'}
|
||||
|
||||
|
||||
def ros_message_to_json_schema(msg_class: Any) -> Dict[str, Any]:
|
||||
def ros_message_to_json_schema(msg_class: Any, field_name: str) -> Dict[str, Any]:
|
||||
"""
|
||||
将 ROS 消息类转换为 JSON Schema
|
||||
|
||||
Args:
|
||||
msg_class: ROS 消息类
|
||||
field_name: 字段名,用于设置schema的title,如果为None则使用类名
|
||||
|
||||
Returns:
|
||||
对应的 JSON Schema 定义
|
||||
"""
|
||||
schema = {"type": "object", "properties": {}, "required": []}
|
||||
|
||||
# 获取类名作为标题
|
||||
if hasattr(msg_class, "__name__"):
|
||||
schema["title"] = msg_class.__name__
|
||||
# 优先使用字段名作为标题,否则使用类名
|
||||
schema["title"] = field_name
|
||||
|
||||
# 获取消息的字段和字段类型
|
||||
try:
|
||||
for ind, slot_info in enumerate(msg_class._fields_and_field_types.items()):
|
||||
slot_name, slot_type = slot_info
|
||||
type_info = msg_class.SLOT_TYPES[ind]
|
||||
field_schema = ros_field_type_to_json_schema(type_info, slot_type)
|
||||
field_schema = ros_field_type_to_json_schema(type_info, slot_name)
|
||||
schema["properties"][slot_name] = field_schema
|
||||
schema["required"].append(slot_name)
|
||||
# if hasattr(msg_class, 'get_fields_and_field_types'):
|
||||
@@ -786,15 +790,15 @@ def ros_action_to_json_schema(action_class: Any, description="") -> Dict[str, An
|
||||
"properties": {
|
||||
"goal": {
|
||||
# 'description': 'Action 目标 - 从客户端发送到服务器',
|
||||
**ros_message_to_json_schema(action_class.Goal)
|
||||
**ros_message_to_json_schema(action_class.Goal, action_class.Goal.__name__)
|
||||
},
|
||||
"feedback": {
|
||||
# 'description': 'Action 反馈 - 执行过程中从服务器发送到客户端',
|
||||
**ros_message_to_json_schema(action_class.Feedback)
|
||||
**ros_message_to_json_schema(action_class.Feedback, action_class.Feedback.__name__)
|
||||
},
|
||||
"result": {
|
||||
# 'description': 'Action 结果 - 完成后从服务器发送到客户端',
|
||||
**ros_message_to_json_schema(action_class.Result)
|
||||
**ros_message_to_json_schema(action_class.Result, action_class.Result.__name__)
|
||||
},
|
||||
},
|
||||
"required": ["goal"],
|
||||
|
||||
Reference in New Issue
Block a user