Fix startup with remote resource error

Resource dict fully change to "pose" key

Update oss link

Reduce pylabrobot conversion warning & force enable log dump.

更新 logo 图片
This commit is contained in:
ZiWei
2025-11-28 11:35:05 +08:00
committed by Xuwznln
parent c7c14d2332
commit 5ce433e235
12 changed files with 92 additions and 79 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 326 KiB

After

Width:  |  Height:  |  Size: 262 KiB

View File

@@ -218,7 +218,7 @@ def main():
if hasattr(BasicConfig, "log_level"):
logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.")
configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
if args_dict["addr"] == "test":
print_status("使用测试环境地址", "info")

View File

@@ -34,14 +34,14 @@ def _get_oss_token(
client = http_client
# 构造scene参数: driver_name-exp_type
scene = f"{driver_name}-{exp_type}"
sub_path = f"{driver_name}-{exp_type}"
# 构造请求URL使用client的remote_addr已包含/api/v1/
url = f"{client.remote_addr}/applications/token"
params = {"scene": scene, "filename": filename}
params = {"sub_path": sub_path, "filename": filename, "scene": "job"}
try:
logger.info(f"[OSS] 请求预签名URL: scene={scene}, filename={filename}")
logger.info(f"[OSS] 请求预签名URL: sub_path={sub_path}, filename={filename}")
response = requests.get(url, params=params, headers={"Authorization": f"Lab {client.auth}"}, timeout=10)
if response.status_code == 200:

View File

@@ -45,10 +45,13 @@ def canonicalize_nodes_data(
print_status(f"{len(nodes)} Resources loaded:", "info")
# 第一步基本预处理处理graphml的label字段
for node in nodes:
outer_host_node_id = None
for idx, node in enumerate(nodes):
if node.get("label") is not None:
node_id = node.pop("label")
node["id"] = node["name"] = node_id
if node["id"] == "host_node":
outer_host_node_id = idx
if not isinstance(node.get("config"), dict):
node["config"] = {}
if not node.get("type"):
@@ -58,25 +61,26 @@ def canonicalize_nodes_data(
node["name"] = node.get("id")
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning")
if not isinstance(node.get("position"), dict):
node["position"] = {"position": {}}
node["pose"] = {"position": {}}
x = node.pop("x", None)
if x is not None:
node["position"]["position"]["x"] = x
node["pose"]["position"]["x"] = x
y = node.pop("y", None)
if y is not None:
node["position"]["position"]["y"] = y
node["pose"]["position"]["y"] = y
z = node.pop("z", None)
if z is not None:
node["position"]["position"]["z"] = z
node["pose"]["position"]["z"] = z
if "sample_id" in node:
sample_id = node.pop("sample_id")
if sample_id:
logger.error(f"{node}的sample_id参数已弃用sample_id: {sample_id}")
for k in list(node.keys()):
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children"]:
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children", "pose"]:
v = node.pop(k)
node["config"][k] = v
if outer_host_node_id is not None:
nodes.pop(outer_host_node_id)
# 第二步处理parent_relation
id2idx = {node["id"]: idx for idx, node in enumerate(nodes)}
for parent, children in parent_relation.items():
@@ -582,11 +586,15 @@ def resource_plr_to_ulab(resource_plr: "ResourcePLR", parent_name: str = None, w
"tip_rack": "tip_rack",
"warehouse": "warehouse",
"container": "container",
"tube": "tube",
"bottle_carrier": "bottle_carrier",
"plate_adapter": "plate_adapter",
}
if source in replace_info:
return replace_info[source]
else:
logger.warning(f"转换pylabrobot的时候出现未知类型: {source}")
if source is not None:
logger.warning(f"转换pylabrobot的时候出现未知类型: {source}")
return source
def resource_plr_to_ulab_inner(d: dict, all_states: dict, child=True) -> dict:

View File

@@ -5,6 +5,7 @@ from unilabos.ros.msgs.message_converter import (
get_action_type,
)
from unilabos.ros.nodes.base_device_node import init_wrapper, ROS2DeviceNode
from unilabos.ros.nodes.resource_tracker import ResourceDictInstance
# 定义泛型类型变量
T = TypeVar("T")
@@ -18,12 +19,11 @@ class ROS2DeviceNodeWrapper(ROS2DeviceNode):
def ros2_device_node(
cls: Type[T],
device_config: Optional[Dict[str, Any]] = None,
device_config: Optional[ResourceDictInstance] = None,
status_types: Optional[Dict[str, Any]] = None,
action_value_mappings: Optional[Dict[str, Any]] = None,
hardware_interface: Optional[Dict[str, Any]] = None,
print_publish: bool = False,
children: Optional[Dict[str, Any]] = None,
) -> Type[ROS2DeviceNodeWrapper]:
"""Create a ROS2 Node class for a device class with properties and actions.
@@ -45,7 +45,7 @@ def ros2_device_node(
if status_types is None:
status_types = {}
if device_config is None:
device_config = {}
raise ValueError("device_config cannot be None")
if action_value_mappings is None:
action_value_mappings = {}
if hardware_interface is None:
@@ -82,7 +82,6 @@ def ros2_device_node(
action_value_mappings=action_value_mappings,
hardware_interface=hardware_interface,
print_publish=print_publish,
children=children,
*args,
**kwargs,
),

View File

@@ -4,13 +4,14 @@ from typing import Optional
from unilabos.registry.registry import lab_registry
from unilabos.ros.device_node_wrapper import ros2_device_node
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, DeviceInitError
from unilabos.ros.nodes.resource_tracker import ResourceDictInstance
from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.import_manager import default_manager
def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2DeviceNode]:
def initialize_device_from_dict(device_id, device_config: ResourceDictInstance) -> Optional[ROS2DeviceNode]:
"""Initializes a device based on its configuration.
This function dynamically imports the appropriate device class and creates an instance of it using the provided device configuration.
@@ -24,15 +25,14 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device
None
"""
d = None
original_device_config = copy.deepcopy(device_config)
device_class_config = device_config["class"]
uid = device_config["uuid"]
device_class_config = device_config.res_content.klass
uid = device_config.res_content.uuid
if isinstance(device_class_config, str): # 如果是字符串则直接去lab_registry中查找获取class
if len(device_class_config) == 0:
raise DeviceClassInvalid(f"Device [{device_id}] class cannot be an empty string. {device_config}")
if device_class_config not in lab_registry.device_type_registry:
raise DeviceClassInvalid(f"Device [{device_id}] class {device_class_config} not found. {device_config}")
device_class_config = device_config["class"] = lab_registry.device_type_registry[device_class_config]["class"]
device_class_config = lab_registry.device_type_registry[device_class_config]["class"]
elif isinstance(device_class_config, dict):
raise DeviceClassInvalid(f"Device [{device_id}] class config should be type 'str' but 'dict' got. {device_config}")
if isinstance(device_class_config, dict):
@@ -41,17 +41,16 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device
DEVICE = ros2_device_node(
DEVICE,
status_types=device_class_config.get("status_types", {}),
device_config=original_device_config,
device_config=device_config,
action_value_mappings=device_class_config.get("action_value_mappings", {}),
hardware_interface=device_class_config.get(
"hardware_interface",
{"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []},
),
children=device_config.get("children", {})
)
)
try:
d = DEVICE(
device_id=device_id, device_uuid=uid, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.get("config", {})
device_id=device_id, device_uuid=uid, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.res_content.config
)
except DeviceInitError as ex:
return d

View File

@@ -192,7 +192,7 @@ def slave(
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type == "device":
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
d = initialize_device_from_dict(device_id, device_config)
if d is not None:
devices_instances[device_id] = d
logger.info(f"Device {device_id} initialized.")

View File

@@ -48,7 +48,7 @@ from unilabos_msgs.msg import Resource # type: ignore
from unilabos.ros.nodes.resource_tracker import (
DeviceNodeResourceTracker,
ResourceTreeSet,
ResourceTreeInstance,
ResourceTreeInstance, ResourceDictInstance,
)
from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
@@ -133,12 +133,11 @@ def init_wrapper(
device_id: str,
device_uuid: str,
driver_class: type[T],
device_config: Dict[str, Any],
device_config: ResourceTreeInstance,
status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any],
hardware_interface: Dict[str, Any],
print_publish: bool,
children: Optional[list] = None,
driver_params: Optional[Dict[str, Any]] = None,
driver_is_ros: bool = False,
*args,
@@ -147,8 +146,6 @@ def init_wrapper(
"""初始化设备节点的包装函数和ROS2DeviceNode初始化保持一致"""
if driver_params is None:
driver_params = kwargs.copy()
if children is None:
children = []
kwargs["device_id"] = device_id
kwargs["device_uuid"] = device_uuid
kwargs["driver_class"] = driver_class
@@ -157,7 +154,6 @@ def init_wrapper(
kwargs["status_types"] = status_types
kwargs["action_value_mappings"] = action_value_mappings
kwargs["hardware_interface"] = hardware_interface
kwargs["children"] = children
kwargs["print_publish"] = print_publish
kwargs["driver_is_ros"] = driver_is_ros
super(type(self), self).__init__(*args, **kwargs)
@@ -1582,12 +1578,11 @@ class ROS2DeviceNode:
device_id: str,
device_uuid: str,
driver_class: Type[T],
device_config: Dict[str, Any],
device_config: ResourceDictInstance,
driver_params: Dict[str, Any],
status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any],
hardware_interface: Dict[str, Any],
children: Dict[str, Any],
print_publish: bool = True,
driver_is_ros: bool = False,
):
@@ -1598,7 +1593,7 @@ class ROS2DeviceNode:
device_id: 设备标识符
device_uuid: 设备uuid
driver_class: 设备类
device_config: 原始初始化的json
device_config: 原始初始化的ResourceDictInstance
driver_params: driver初始化的参数
status_types: 状态类型映射
action_value_mappings: 动作值映射
@@ -1612,6 +1607,7 @@ class ROS2DeviceNode:
self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__")
self._driver_class = driver_class
self.device_config = device_config
children: List[ResourceDictInstance] = device_config.children
self.driver_is_ros = driver_is_ros
self.driver_is_workstation = False
self.resource_tracker = DeviceNodeResourceTracker()

View File

@@ -532,7 +532,7 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info(f"[Host Node] Initializing device: {device_id}")
try:
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
d = initialize_device_from_dict(device_id, device_config)
except DeviceClassInvalid as e:
self.lab_logger().error(f"[Host Node] Device class invalid: {e}")
d = None

View File

@@ -24,7 +24,7 @@ from unilabos.ros.msgs.message_converter import (
convert_from_ros_msg_with_mapping,
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode
from unilabos.ros.nodes.resource_tracker import ResourceTreeSet
from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDictInstance
from unilabos.utils.type_check import get_result_info_str
if TYPE_CHECKING:
@@ -47,7 +47,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
def __init__(
self,
protocol_type: List[str],
children: Dict[str, Any],
children: List[ResourceDictInstance],
*,
driver_instance: "WorkstationBase",
device_id: str,
@@ -81,10 +81,11 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
# 初始化子设备
self.communication_node_id_to_instance = {}
for device_id, device_config in self.children.items():
if device_config.get("type", "device") != "device":
for device_config in self.children:
device_id = device_config.res_content.id
if device_config.res_content.type != "device":
self.lab_logger().debug(
f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping."
f"[Protocol Node] Skipping type {device_config.res_content.type} {device_id} already existed, skipping."
)
continue
try:
@@ -101,8 +102,9 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
self.communication_node_id_to_instance[device_id] = d
continue
for device_id, device_config in self.children.items():
if device_config.get("type", "device") != "device":
for device_config in self.children:
device_id = device_config.res_content.id
if device_config.res_content.type != "device":
continue
# 设置硬件接口代理
if device_id not in self.sub_devices:

View File

@@ -1,9 +1,11 @@
import inspect
import traceback
import uuid
from pydantic import BaseModel, field_serializer, field_validator
from pydantic import Field
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
from unilabos.resources.plr_additional_res_reg import register
from unilabos.utils.log import logger
if TYPE_CHECKING:
@@ -62,7 +64,6 @@ class ResourceDict(BaseModel):
parent: Optional["ResourceDict"] = Field(description="Parent resource object", default=None, exclude=True)
type: Union[Literal["device"], str] = Field(description="Resource type")
klass: str = Field(alias="class", description="Resource class name")
position: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
config: Dict[str, Any] = Field(description="Resource configuration")
data: Dict[str, Any] = Field(description="Resource data")
@@ -146,15 +147,16 @@ class ResourceDictInstance(object):
if not content.get("extra"): # MagicCode
content["extra"] = {}
if "pose" not in content:
content["pose"] = content.get("position", {})
content["pose"] = content.pop("position", {})
return ResourceDictInstance(ResourceDict.model_validate(content))
def get_nested_dict(self) -> Dict[str, Any]:
def get_plr_nested_dict(self) -> Dict[str, Any]:
"""获取资源实例的嵌套字典表示"""
res_dict = self.res_content.model_dump(by_alias=True)
res_dict["children"] = {child.res_content.id: child.get_nested_dict() for child in self.children}
res_dict["children"] = {child.res_content.id: child.get_plr_nested_dict() for child in self.children}
res_dict["parent"] = self.res_content.parent_instance_name
res_dict["position"] = self.res_content.position.position.model_dump()
del res_dict["pose"]
return res_dict
@@ -429,9 +431,9 @@ class ResourceTreeSet(object):
Returns:
List[PLRResource]: PLR 资源实例列表
"""
register()
from pylabrobot.resources import Resource as PLRResource
from pylabrobot.utils.object_parsing import find_subclass
import inspect
# 类型映射
TYPE_MAP = {"plate": "Plate", "well": "Well", "deck": "Deck", "container": "RegularContainer"}
@@ -459,9 +461,9 @@ class ResourceTreeSet(object):
"size_y": res.config.get("size_y", 0),
"size_z": res.config.get("size_z", 0),
"location": {
"x": res.position.position.x,
"y": res.position.position.y,
"z": res.position.position.z,
"x": res.pose.position.x,
"y": res.pose.position.y,
"z": res.pose.position.z,
"type": "Coordinate",
},
"rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"},

View File

@@ -9,10 +9,11 @@ import asyncio
import inspect
import traceback
from abc import abstractmethod
from typing import Type, Any, Dict, Optional, TypeVar, Generic
from typing import Type, Any, Dict, Optional, TypeVar, Generic, List
from unilabos.resources.graphio import nested_dict_to_list, resource_ulab_to_plr
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet, ResourceDictInstance, \
ResourceTreeInstance
from unilabos.utils import logger, import_manager
from unilabos.utils.cls_creator import create_instance_from_config
@@ -33,7 +34,7 @@ class DeviceClassCreator(Generic[T]):
这个类提供了从任意类创建实例的通用方法。
"""
def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker):
def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker):
"""
初始化设备类创建器
@@ -50,9 +51,9 @@ class DeviceClassCreator(Generic[T]):
附加资源到设备类实例
"""
if self.device_instance is not None:
for c in self.children.values():
if c["type"] != "device":
self.resource_tracker.add_resource(c)
for c in self.children:
if c.res_content.type != "device":
self.resource_tracker.add_resource(c.get_plr_nested_dict())
def create_instance(self, data: Dict[str, Any]) -> T:
"""
@@ -94,7 +95,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
这个类提供了针对PyLabRobot设备类的实例创建方法特别处理deserialize方法。
"""
def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker):
def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker):
"""
初始化PyLabRobot设备类创建器
@@ -111,12 +112,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
def attach_resource(self):
pass # 只能增加实例化物料,原来默认物料仅为字典查询
def _process_resource_mapping(self, resource, source_type):
if source_type == dict:
from pylabrobot.resources.resource import Resource
return nested_dict_to_list(resource), Resource
return resource, source_type
# def _process_resource_mapping(self, resource, source_type):
# if source_type == dict:
# from pylabrobot.resources.resource import Resource
#
# return nested_dict_to_list(resource), Resource
# return resource, source_type
def _process_resource_references(
self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None
@@ -142,15 +143,21 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
if isinstance(data, dict):
if "_resource_child_name" in data:
child_name = data["_resource_child_name"]
if child_name in self.children:
resource = self.children[child_name]
resource: Optional[ResourceDictInstance] = None
for child in self.children:
if child.res_content.name == child_name:
resource = child
if resource is not None:
if "_resource_type" in data:
type_path = data["_resource_type"]
try:
target_type = import_manager.get_class(type_path)
contain_model = not issubclass(target_type, Deck)
resource, target_type = self._process_resource_mapping(resource, target_type)
resource_instance: Resource = resource_ulab_to_plr(resource, contain_model) # 带state
# target_type = import_manager.get_class(type_path)
# contain_model = not issubclass(target_type, Deck)
# resource, target_type = self._process_resource_mapping(resource, target_type)
res_tree = ResourceTreeInstance(resource)
res_tree_set = ResourceTreeSet([res_tree])
resource_instance: Resource = res_tree_set.to_plr_resources()[0]
# resource_instance: Resource = resource_ulab_to_plr(resource, contain_model) # 带state
states[prefix_path] = resource_instance.serialize_all_state()
# 使用 prefix_path 作为 key 存储资源状态
if to_dict:
@@ -202,12 +209,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
stack = None
# 递归遍历 children 构建 name_to_uuid 映射
def collect_name_to_uuid(children_dict: Dict[str, Any], result: Dict[str, str]):
def collect_name_to_uuid(children_list: List[ResourceDictInstance], result: Dict[str, str]):
"""递归遍历嵌套的 children 字典,收集 name 到 uuid 的映射"""
for child in children_dict.values():
if isinstance(child, dict):
result[child["name"]] = child["uuid"]
collect_name_to_uuid(child["children"], result)
for child in children_list:
if isinstance(child, ResourceDictInstance):
result[child.res_content.name] = child.res_content.uuid
collect_name_to_uuid(child.children, result)
name_to_uuid = {}
collect_name_to_uuid(self.children, name_to_uuid)
@@ -313,7 +320,7 @@ class WorkstationNodeCreator(DeviceClassCreator[T]):
这个类提供了针对WorkstationNode设备类的实例创建方法处理children参数。
"""
def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker):
def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker):
"""
初始化WorkstationNode设备类创建器
@@ -336,9 +343,9 @@ class WorkstationNodeCreator(DeviceClassCreator[T]):
try:
# 创建实例额外补充一个给protocol node的字段后面考虑取消
data["children"] = self.children
for material_id, child in self.children.items():
if child["type"] != "device":
self.resource_tracker.add_resource(self.children[material_id])
for child in self.children:
if child.res_content.type != "device":
self.resource_tracker.add_resource(child.get_plr_nested_dict())
deck_dict = data.get("deck")
if deck_dict:
from pylabrobot.resources import Deck, Resource