Merge branch 'dev' into fork/KCFeng425/fix-protocol-parameter

# Conflicts:
#	unilabos/app/mq.py
#	unilabos/registry/devices/virtual_device.yaml
#	unilabos/registry/devices/work_station.yaml
This commit is contained in:
Xuwznln
2025-07-16 11:10:05 +08:00
33 changed files with 2970 additions and 4331 deletions

View File

@@ -132,7 +132,11 @@ _msg_converter: Dict[Type, Any] = {
Bool: lambda x: Bool(data=bool(x)),
str: str,
String: lambda x: String(data=str(x)),
Point: lambda x: Point(x=x.x, y=x.y, z=x.z) if not isinstance(x, dict) else Point(x=x.get("x", 0.0), y=x.get("y", 0.0), z=x.get("z", 0.0)),
Point: lambda x: (
Point(x=x.x, y=x.y, z=x.z)
if not isinstance(x, dict)
else Point(x=float(x.get("x", 0.0)), y=float(x.get("y", 0.0)), z=float(x.get("z", 0.0)))
),
Resource: lambda x: Resource(
id=x.get("id", ""),
name=x.get("name", ""),
@@ -142,7 +146,13 @@ _msg_converter: Dict[Type, Any] = {
type=x.get("type", ""),
category=x.get("class", "") or x.get("type", ""),
pose=(
Pose(position=Point(x=float(x.get("position", {}).get("x", 0.0)), y=float(x.get("position", {}).get("y", 0.0)), z=float(x.get("position", {}).get("z", 0.0))))
Pose(
position=Point(
x=float(x.get("position", {}).get("x", 0.0)),
y=float(x.get("position", {}).get("y", 0.0)),
z=float(x.get("position", {}).get("z", 0.0)),
)
)
if x.get("position", None) is not None
else Pose()
),
@@ -151,6 +161,7 @@ _msg_converter: Dict[Type, Any] = {
),
}
def json_or_yaml_loads(data: str) -> Any:
try:
return json.loads(data)
@@ -161,6 +172,7 @@ def json_or_yaml_loads(data: str) -> Any:
pass
raise e
# ROS消息到Python转换器
_msg_converter_back: Dict[Type, Any] = {
float: float,
@@ -571,30 +583,30 @@ from unilabos.utils.import_manager import ImportManager
from unilabos.config.config import ROSConfig
basic_type_map = {
'bool': {'type': 'boolean'},
'int8': {'type': 'integer', 'minimum': -128, 'maximum': 127},
'uint8': {'type': 'integer', 'minimum': 0, 'maximum': 255},
'int16': {'type': 'integer', 'minimum': -32768, 'maximum': 32767},
'uint16': {'type': 'integer', 'minimum': 0, 'maximum': 65535},
'int32': {'type': 'integer', 'minimum': -2147483648, 'maximum': 2147483647},
'uint32': {'type': 'integer', 'minimum': 0, 'maximum': 4294967295},
'int64': {'type': 'integer'},
'uint64': {'type': 'integer', 'minimum': 0},
'double': {'type': 'number'},
'float': {'type': 'number'},
'float32': {'type': 'number'},
'float64': {'type': 'number'},
'string': {'type': 'string'},
'boolean': {'type': 'boolean'},
'char': {'type': 'string', 'maxLength': 1},
'byte': {'type': 'integer', 'minimum': 0, 'maximum': 255},
"bool": {"type": "boolean"},
"int8": {"type": "integer", "minimum": -128, "maximum": 127},
"uint8": {"type": "integer", "minimum": 0, "maximum": 255},
"int16": {"type": "integer", "minimum": -32768, "maximum": 32767},
"uint16": {"type": "integer", "minimum": 0, "maximum": 65535},
"int32": {"type": "integer", "minimum": -2147483648, "maximum": 2147483647},
"uint32": {"type": "integer", "minimum": 0, "maximum": 4294967295},
"int64": {"type": "integer"},
"uint64": {"type": "integer", "minimum": 0},
"double": {"type": "number"},
"float": {"type": "number"},
"float32": {"type": "number"},
"float64": {"type": "number"},
"string": {"type": "string"},
"boolean": {"type": "boolean"},
"char": {"type": "string", "maxLength": 1},
"byte": {"type": "integer", "minimum": 0, "maximum": 255},
}
def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str=None) -> Dict[str, Any]:
def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str = None) -> Dict[str, Any]:
"""
将 ROS 字段类型转换为 JSON Schema 类型定义
Args:
type_info: ROS 类型
slot_type: ROS 类型
@@ -603,10 +615,7 @@ def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str=None) ->
对应的 JSON Schema 类型定义
"""
if isinstance(type_info, UnboundedSequence):
return {
'type': 'array',
'items': ros_field_type_to_json_schema(type_info.value_type)
}
return {"type": "array", "items": ros_field_type_to_json_schema(type_info.value_type)}
if isinstance(type_info, NamespacedType):
cls_name = ".".join(type_info.namespaces) + ":" + type_info.name
type_class = msg_converter_manager.get_class(cls_name)
@@ -614,20 +623,20 @@ def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str=None) ->
elif isinstance(type_info, BasicType):
return ros_field_type_to_json_schema(type_info.typename)
elif isinstance(type_info, UnboundedString):
return basic_type_map['string']
return basic_type_map["string"]
elif isinstance(type_info, str):
if type_info in basic_type_map:
return basic_type_map[type_info]
# 处理时间和持续时间类型
if type_info in ('time', 'duration', 'builtin_interfaces/Time', 'builtin_interfaces/Duration'):
if type_info in ("time", "duration", "builtin_interfaces/Time", "builtin_interfaces/Duration"):
return {
'type': 'object',
'properties': {
'sec': {'type': 'integer', 'description': ''},
'nanosec': {'type': 'integer', 'description': '纳秒'}
"type": "object",
"properties": {
"sec": {"type": "integer", "description": ""},
"nanosec": {"type": "integer", "description": "纳秒"},
},
'required': ['sec', 'nanosec']
"required": ["sec", "nanosec"],
}
else:
return ros_message_to_json_schema(type_info)
@@ -638,9 +647,7 @@ def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str=None) ->
# 'type': 'array',
# 'items': ros_field_type_to_json_schema(item_type)
# }
# # 处理复杂类型(尝试加载并处理)
# try:
# # 如果它是一个完整的消息类型规范 (包名/msg/类型名)
@@ -655,34 +662,31 @@ def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str=None) ->
# logger.debug(f"无法解析类型 {field_type}: {str(e)}")
# return {'type': 'object', 'description': f'未知类型: {field_type}'}
def ros_message_to_json_schema(msg_class: Any) -> Dict[str, Any]:
"""
将 ROS 消息类转换为 JSON Schema
Args:
msg_class: ROS 消息类
Returns:
对应的 JSON Schema 定义
"""
schema = {
'type': 'object',
'properties': {},
'required': []
}
schema = {"type": "object", "properties": {}, "required": []}
# 获取类名作为标题
if hasattr(msg_class, '__name__'):
schema['title'] = msg_class.__name__
if hasattr(msg_class, "__name__"):
schema["title"] = msg_class.__name__
# 获取消息的字段和字段类型
try:
for ind, slot_info in enumerate(msg_class._fields_and_field_types.items()):
slot_name, slot_type = slot_info
type_info = msg_class.SLOT_TYPES[ind]
field_schema = ros_field_type_to_json_schema(type_info, slot_type)
schema['properties'][slot_name] = field_schema
schema['required'].append(slot_name)
schema["properties"][slot_name] = field_schema
schema["required"].append(slot_name)
# if hasattr(msg_class, 'get_fields_and_field_types'):
# fields_and_types = msg_class.get_fields_and_field_types()
#
@@ -707,61 +711,66 @@ def ros_message_to_json_schema(msg_class: Any) -> Dict[str, Any]:
# schema['required'].append(clean_name)
except Exception as e:
# 如果获取字段类型失败,添加错误信息
schema['description'] = f"解析消息字段时出错: {str(e)}"
schema["description"] = f"解析消息字段时出错: {str(e)}"
logger.error(f"解析 {msg_class.__name__} 消息字段失败: {str(e)}")
return schema
def ros_action_to_json_schema(action_class: Any) -> Dict[str, Any]:
def ros_action_to_json_schema(action_class: Any, description="") -> Dict[str, Any]:
"""
将 ROS Action 类转换为 JSON Schema
Args:
action_class: ROS Action 类
description: 描述
Returns:
完整的 JSON Schema 定义
"""
if not hasattr(action_class, 'Goal') or not hasattr(action_class, 'Feedback') or not hasattr(action_class, 'Result'):
if (
not hasattr(action_class, "Goal")
or not hasattr(action_class, "Feedback")
or not hasattr(action_class, "Result")
):
raise ValueError(f"{action_class.__name__} 不是有效的 ROS Action 类")
# 创建基础 schema
schema = {
'title': action_class.__name__,
'description': f"ROS Action {action_class.__name__} 的 JSON Schema",
'type': 'object',
'properties': {
'goal': {
'description': 'Action 目标 - 从客户端发送到服务器',
"title": action_class.__name__,
"description": description,
"type": "object",
"properties": {
"goal": {
# 'description': 'Action 目标 - 从客户端发送到服务器',
**ros_message_to_json_schema(action_class.Goal)
},
'feedback': {
'description': 'Action 反馈 - 执行过程中从服务器发送到客户端',
"feedback": {
# 'description': 'Action 反馈 - 执行过程中从服务器发送到客户端',
**ros_message_to_json_schema(action_class.Feedback)
},
'result': {
'description': 'Action 结果 - 完成后从服务器发送到客户端',
"result": {
# 'description': 'Action 结果 - 完成后从服务器发送到客户端',
**ros_message_to_json_schema(action_class.Result)
}
},
},
'required': ['goal']
"required": ["goal"],
}
return schema
def convert_ros_action_to_jsonschema(
action_name_or_type: Union[str, Type],
output_file: Optional[str] = None,
format: str = 'json'
action_name_or_type: Union[str, Type], output_file: Optional[str] = None, format: str = "json"
) -> Dict[str, Any]:
"""
将 ROS Action 类型转换为 JSON Schema并可选地保存到文件
Args:
action_name_or_type: ROS Action 类型名称或类
output_file: 可选,输出 JSON Schema 的文件路径
format: 输出格式,'json''yaml'
Returns:
JSON Schema 定义(字典)
"""
@@ -771,21 +780,21 @@ def convert_ros_action_to_jsonschema(
action_type = get_ros_type_by_msgname(action_name_or_type)
else:
action_type = action_name_or_type
# 生成 JSON Schema
schema = ros_action_to_json_schema(action_type)
# 如果指定了输出文件,将 Schema 保存到文件
if output_file:
if format.lower() == 'json':
with open(output_file, 'w', encoding='utf-8') as f:
if format.lower() == "json":
with open(output_file, "w", encoding="utf-8") as f:
json.dump(schema, f, indent=2, ensure_ascii=False)
elif format.lower() == 'yaml':
with open(output_file, 'w', encoding='utf-8') as f:
elif format.lower() == "yaml":
with open(output_file, "w", encoding="utf-8") as f:
yaml.safe_dump(schema, f, default_flow_style=False, allow_unicode=True)
else:
raise ValueError(f"不支持的格式: {format},请使用 'json''yaml'")
return schema
@@ -794,14 +803,14 @@ if __name__ == "__main__":
# 示例:转换 NavigateToPose action
try:
from nav2_msgs.action import NavigateToPose
# 转换为 JSON Schema 并打印
schema = convert_ros_action_to_jsonschema(NavigateToPose)
print(json.dumps(schema, indent=2, ensure_ascii=False))
# 保存到文件
# convert_ros_action_to_jsonschema(NavigateToPose, "navigate_to_pose_schema.json")
# 或者使用字符串形式的 action 名称
# schema = convert_ros_action_to_jsonschema("nav2_msgs/action/NavigateToPose")
except ImportError:

View File

@@ -307,7 +307,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 创建动作服务
if self.create_action_server:
for action_name, action_value_mapping in self._action_value_mappings.items():
if action_name.startswith("auto-"):
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith("UniLabJsonCommand"):
continue
self.create_ros_action_server(action_name, action_value_mapping)
@@ -923,11 +923,18 @@ class ROS2DeviceNode:
driver_class.__module__.startswith("pylabrobot")
or driver_class.__name__ == "LiquidHandlerAbstract"
or driver_class.__name__ == "LiquidHandlerBiomek"
or driver_class.__name__ == "PRCXI9300Handler"
)
# TODO: 要在创建之前预先请求服务器是否有当前id的物料放到resource_tracker中让pylabrobot进行创建
# 创建设备类实例
if use_pylabrobot_creator:
# 先对pylabrobot的子资源进行加载不然subclass无法认出
# 在下方对于加载Deck等Resource要手动import
# noinspection PyUnresolvedReferences
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300Deck
# noinspection PyUnresolvedReferences
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300Container
self._driver_creator = PyLabRobotCreator(
driver_class, children=children, resource_tracker=self.resource_tracker
)

View File

@@ -459,7 +459,7 @@ class HostNode(BaseROS2DeviceNode):
self.devices_instances[device_id] = d
# noinspection PyProtectedMember
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
if action_name.startswith("auto-"):
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith("UniLabJsonCommand"):
continue
action_id = f"/devices/{device_id}/{action_name}"
if action_id not in self._action_clients:
@@ -603,8 +603,7 @@ class HostNode(BaseROS2DeviceNode):
if action_name == "test_latency" and server_info is not None:
self.server_latest_timestamp = server_info.get("send_timestamp", 0.0)
if action_id not in self._action_clients:
self.lab_logger().error(f"[Host Node] ActionClient {action_id} not found.")
return
raise ValueError(f"ActionClient {action_id} not found.")
action_client: ActionClient = self._action_clients[action_id]

View File

@@ -134,7 +134,7 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
if d is not None and hasattr(d, "ros_node_instance"):
node = d.ros_node_instance
for action_name, action_mapping in node._action_value_mappings.items():
if action_name.startswith("auto-"):
if action_name.startswith("auto-") or str(action_mapping.get("type", "")).startswith("UniLabJsonCommand"):
continue
action_id = f"/devices/{device_id_abs}/{action_name}"
if action_id not in self._action_clients:

View File

@@ -148,7 +148,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
contain_model = not issubclass(target_type, Deck)
resource, target_type = self._process_resource_mapping(resource, target_type)
resource_instance: Resource = resource_ulab_to_plr(resource, contain_model)
states[prefix_path] = resource_instance.serialize_all_state()
# 使用 prefix_path 作为 key 存储资源状态
if to_dict:
serialized = resource_instance.serialize()
@@ -199,7 +199,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
spect = inspect.signature(deserialize_method)
spec_args = spect.parameters
for param_name, param_value in data.copy().items():
if "_resource_child_name" in param_value and "_resource_type" not in param_value:
if isinstance(param_value, dict) and "_resource_child_name" in param_value and "_resource_type" not in param_value:
arg_value = spec_args[param_name].annotation
data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value
logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}")
@@ -230,7 +230,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
spect = inspect.signature(self.device_cls.__init__)
spec_args = spect.parameters
for param_name, param_value in data.copy().items():
if "_resource_child_name" in param_value and "_resource_type" not in param_value:
if isinstance(param_value, dict) and "_resource_child_name" in param_value and "_resource_type" not in param_value:
arg_value = spec_args[param_name].annotation
data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value
logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}")