mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-04 13:25:13 +00:00
Compare commits
3 Commits
e8d1263488
...
feat/sampl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
337789e270 | ||
|
|
26271bcab8 | ||
|
|
84a8223173 |
@@ -54,6 +54,7 @@ class JobAddReq(BaseModel):
|
||||
action_type: str = Field(
|
||||
examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default=""
|
||||
)
|
||||
sample_material: dict = Field(examples=[{"string": "string"}], description="sample uuid to material uuid", default_factory=dict)
|
||||
action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict)
|
||||
task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="")
|
||||
job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="")
|
||||
|
||||
@@ -688,6 +688,7 @@ class MessageProcessor:
|
||||
queue_item,
|
||||
action_type=req.action_type,
|
||||
action_kwargs=req.action_args,
|
||||
sample_material=req.sample_material,
|
||||
server_info=req.server_info,
|
||||
)
|
||||
|
||||
|
||||
@@ -27,7 +27,12 @@ from typing_extensions import TypedDict
|
||||
|
||||
from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend
|
||||
from unilabos.registry.placeholder_type import ResourceSlot
|
||||
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict
|
||||
from unilabos.resources.resource_tracker import (
|
||||
ResourceTreeSet,
|
||||
ResourceDict,
|
||||
EXTRA_SAMPLE_UUID,
|
||||
EXTRA_UNILABOS_SAMPLE_UUID,
|
||||
)
|
||||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
|
||||
|
||||
|
||||
@@ -231,12 +236,11 @@ class LiquidHandlerMiddleware(LiquidHandler):
|
||||
res_samples = []
|
||||
res_volumes = []
|
||||
for resource, volume, channel in zip(resources, vols, use_channels):
|
||||
res_samples.append(
|
||||
{"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)}
|
||||
)
|
||||
sample_uuid_value = resource.unilabos_extra.get(EXTRA_SAMPLE_UUID, None)
|
||||
res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: sample_uuid_value})
|
||||
res_volumes.append(volume)
|
||||
self.pending_liquids_dict[channel] = {
|
||||
"sample_uuid": resource.unilabos_extra.get("sample_uuid", None),
|
||||
EXTRA_SAMPLE_UUID: sample_uuid_value,
|
||||
"volume": volume,
|
||||
}
|
||||
return SimpleReturn(samples=res_samples, volumes=res_volumes)
|
||||
@@ -278,10 +282,10 @@ class LiquidHandlerMiddleware(LiquidHandler):
|
||||
res_samples = []
|
||||
res_volumes = []
|
||||
for resource, volume, channel in zip(resources, vols, use_channels):
|
||||
res_uuid = self.pending_liquids_dict[channel]["sample_uuid"]
|
||||
res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID]
|
||||
self.pending_liquids_dict[channel]["volume"] -= volume
|
||||
resource.unilabos_extra["sample_uuid"] = res_uuid
|
||||
res_samples.append({"name": resource.name, "sample_uuid": res_uuid})
|
||||
resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid
|
||||
res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: res_uuid})
|
||||
res_volumes.append(volume)
|
||||
|
||||
return SimpleReturn(samples=res_samples, volumes=res_volumes)
|
||||
|
||||
@@ -151,12 +151,40 @@ def canonicalize_links_ports(links: List[Dict[str, Any]], resource_tree_set: Res
|
||||
"""
|
||||
# 构建 id 到 uuid 的映射
|
||||
id_to_uuid: Dict[str, str] = {}
|
||||
uuid_to_id: Dict[str, str] = {}
|
||||
for node in resource_tree_set.all_nodes:
|
||||
id_to_uuid[node.res_content.id] = node.res_content.uuid
|
||||
uuid_to_id[node.res_content.uuid] = node.res_content.id
|
||||
|
||||
# 第三遍处理:为每个 link 添加 source_uuid 和 target_uuid
|
||||
for link in links:
|
||||
source_id = link.get("source")
|
||||
target_id = link.get("target")
|
||||
|
||||
# 添加 source_uuid
|
||||
if source_id and source_id in id_to_uuid:
|
||||
link["source_uuid"] = id_to_uuid[source_id]
|
||||
|
||||
# 添加 target_uuid
|
||||
if target_id and target_id in id_to_uuid:
|
||||
link["target_uuid"] = id_to_uuid[target_id]
|
||||
|
||||
source_uuid = link.get("source_uuid")
|
||||
target_uuid = link.get("target_uuid")
|
||||
|
||||
# 添加 source_uuid
|
||||
if source_uuid and source_uuid in uuid_to_id:
|
||||
link["source"] = uuid_to_id[source_uuid]
|
||||
|
||||
# 添加 target_uuid
|
||||
if target_uuid and target_uuid in uuid_to_id:
|
||||
link["target"] = uuid_to_id[target_uuid]
|
||||
|
||||
# 第一遍处理:将字符串类型的port转换为字典格式
|
||||
for link in links:
|
||||
port = link.get("port")
|
||||
if port is None:
|
||||
continue
|
||||
if link.get("type", "physical") == "physical":
|
||||
link["type"] = "fluid"
|
||||
if isinstance(port, int):
|
||||
@@ -179,13 +207,15 @@ def canonicalize_links_ports(links: List[Dict[str, Any]], resource_tree_set: Res
|
||||
link["port"] = {link["source"]: None, link["target"]: None}
|
||||
|
||||
# 构建边字典,键为(source节点, target节点),值为对应的port信息
|
||||
edges = {(link["source"], link["target"]): link["port"] for link in links}
|
||||
edges = {(link["source"], link["target"]): link["port"] for link in links if link.get("port")}
|
||||
|
||||
# 第二遍处理:填充反向边的dest信息
|
||||
delete_reverses = []
|
||||
for i, link in enumerate(links):
|
||||
s, t = link["source"], link["target"]
|
||||
current_port = link["port"]
|
||||
current_port = link.get("port")
|
||||
if current_port is None:
|
||||
continue
|
||||
if current_port.get(t) is None:
|
||||
reverse_key = (t, s)
|
||||
reverse_port = edges.get(reverse_key)
|
||||
@@ -200,20 +230,6 @@ def canonicalize_links_ports(links: List[Dict[str, Any]], resource_tree_set: Res
|
||||
current_port[t] = current_port[s]
|
||||
# 删除已被使用反向端口信息的反向边
|
||||
standardized_links = [link for i, link in enumerate(links) if i not in delete_reverses]
|
||||
|
||||
# 第三遍处理:为每个 link 添加 source_uuid 和 target_uuid
|
||||
for link in standardized_links:
|
||||
source_id = link.get("source")
|
||||
target_id = link.get("target")
|
||||
|
||||
# 添加 source_uuid
|
||||
if source_id and source_id in id_to_uuid:
|
||||
link["source_uuid"] = id_to_uuid[source_id]
|
||||
|
||||
# 添加 target_uuid
|
||||
if target_id and target_id in id_to_uuid:
|
||||
link["target_uuid"] = id_to_uuid[target_id]
|
||||
|
||||
return standardized_links
|
||||
|
||||
|
||||
@@ -284,6 +300,8 @@ def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]]
|
||||
edge["sourceHandle"] = port[source]
|
||||
elif "source_port" in edge:
|
||||
edge["sourceHandle"] = edge.pop("source_port")
|
||||
elif "source_handle" in edge:
|
||||
edge["sourceHandle"] = edge.pop("source_handle")
|
||||
else:
|
||||
typ = edge.get("type")
|
||||
if typ == "communication":
|
||||
@@ -292,6 +310,8 @@ def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]]
|
||||
edge["targetHandle"] = port[target]
|
||||
elif "target_port" in edge:
|
||||
edge["targetHandle"] = edge.pop("target_port")
|
||||
elif "target_handle" in edge:
|
||||
edge["targetHandle"] = edge.pop("target_handle")
|
||||
else:
|
||||
typ = edge.get("type")
|
||||
if typ == "communication":
|
||||
|
||||
@@ -14,6 +14,20 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
EXTRA_CLASS = "unilabos_resource_class"
|
||||
EXTRA_SAMPLE_UUID = "sample_uuid"
|
||||
EXTRA_UNILABOS_SAMPLE_UUID = "unilabos_sample_uuid"
|
||||
|
||||
# 函数参数名常量 - 用于自动注入 sample_uuids 列表
|
||||
PARAM_SAMPLE_UUIDS = "sample_uuids"
|
||||
|
||||
# JSON Command 中的系统参数字段名
|
||||
JSON_UNILABOS_PARAM = "unilabos_param"
|
||||
|
||||
# 返回值中的 samples 字段名
|
||||
RETURN_UNILABOS_SAMPLES = "unilabos_samples"
|
||||
|
||||
# sample_uuids 参数类型 (用于 virtual bench 等设备添加 sample_uuids 参数)
|
||||
SampleUUIDsType = Dict[str, Optional["PLRResource"]]
|
||||
|
||||
|
||||
class ResourceDictPositionSize(BaseModel):
|
||||
@@ -529,6 +543,7 @@ class ResourceTreeSet(object):
|
||||
plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True)
|
||||
from pylabrobot.resources import Coordinate
|
||||
from pylabrobot.serializer import deserialize
|
||||
|
||||
location = cast(Coordinate, deserialize(plr_dict["location"]))
|
||||
plr_resource.location = location
|
||||
plr_resource.load_all_state(all_states)
|
||||
|
||||
@@ -4,8 +4,20 @@ import json
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \
|
||||
Tuple
|
||||
from typing import (
|
||||
get_type_hints,
|
||||
TypeVar,
|
||||
Generic,
|
||||
Dict,
|
||||
Any,
|
||||
Type,
|
||||
TypedDict,
|
||||
Optional,
|
||||
List,
|
||||
TYPE_CHECKING,
|
||||
Union,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import asyncio
|
||||
@@ -48,6 +60,9 @@ from unilabos.resources.resource_tracker import (
|
||||
ResourceTreeSet,
|
||||
ResourceTreeInstance,
|
||||
ResourceDictInstance,
|
||||
EXTRA_SAMPLE_UUID,
|
||||
PARAM_SAMPLE_UUIDS,
|
||||
JSON_UNILABOS_PARAM,
|
||||
)
|
||||
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
|
||||
from rclpy.task import Task, Future
|
||||
@@ -361,6 +376,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
from pylabrobot.resources.deck import Deck
|
||||
from pylabrobot.resources import Coordinate
|
||||
from pylabrobot.resources import Plate
|
||||
|
||||
# 物料传输到对应的node节点
|
||||
client = self._resource_clients["c2s_update_resource_tree"]
|
||||
request = SerialCommand.Request()
|
||||
@@ -388,9 +404,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources)
|
||||
parent_resource = None
|
||||
if bind_parent_id != self.node_name:
|
||||
parent_resource = self.resource_tracker.figure_resource(
|
||||
{"name": bind_parent_id}
|
||||
)
|
||||
parent_resource = self.resource_tracker.figure_resource({"name": bind_parent_id})
|
||||
for r in rts.root_nodes:
|
||||
# noinspection PyUnresolvedReferences
|
||||
r.res_content.parent_uuid = parent_resource.unilabos_uuid
|
||||
@@ -398,19 +412,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
for r in rts.root_nodes:
|
||||
r.res_content.parent_uuid = self.uuid
|
||||
|
||||
if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer):
|
||||
if (
|
||||
len(LIQUID_INPUT_SLOT)
|
||||
and LIQUID_INPUT_SLOT[0] == -1
|
||||
and len(rts.root_nodes) == 1
|
||||
and isinstance(rts.root_nodes[0], RegularContainer)
|
||||
):
|
||||
# noinspection PyTypeChecker
|
||||
container_instance: RegularContainer = rts.root_nodes[0]
|
||||
found_resources = self.resource_tracker.figure_resource(
|
||||
{"id": container_instance.name}, try_mode=True
|
||||
)
|
||||
found_resources = self.resource_tracker.figure_resource({"id": container_instance.name}, try_mode=True)
|
||||
if not len(found_resources):
|
||||
self.resource_tracker.add_resource(container_instance)
|
||||
logger.info(f"添加物料{container_instance.name}到资源跟踪器")
|
||||
else:
|
||||
assert (
|
||||
len(found_resources) == 1
|
||||
), f"找到多个同名物料: {container_instance.name}, 请检查物料系统"
|
||||
assert len(found_resources) == 1, f"找到多个同名物料: {container_instance.name}, 请检查物料系统"
|
||||
found_resource = found_resources[0]
|
||||
if isinstance(found_resource, RegularContainer):
|
||||
logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}")
|
||||
@@ -422,14 +437,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}"
|
||||
)
|
||||
# noinspection PyUnresolvedReferences
|
||||
request.command = json.dumps({
|
||||
"action": "add",
|
||||
"data": {
|
||||
"data": rts.dump(),
|
||||
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "",
|
||||
"first_add": False,
|
||||
},
|
||||
})
|
||||
request.command = json.dumps(
|
||||
{
|
||||
"action": "add",
|
||||
"data": {
|
||||
"data": rts.dump(),
|
||||
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "",
|
||||
"first_add": False,
|
||||
},
|
||||
}
|
||||
)
|
||||
tree_response: SerialCommand.Response = await client.call_async(request)
|
||||
uuid_maps = json.loads(tree_response.response)
|
||||
plr_instances = rts.to_plr_resources()
|
||||
@@ -471,7 +488,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1:
|
||||
ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT)
|
||||
LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT)
|
||||
self.lab_logger().warning(f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个")
|
||||
self.lab_logger().warning(
|
||||
f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个"
|
||||
)
|
||||
for liquid_type, liquid_volume, liquid_input_slot in zip(
|
||||
ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT
|
||||
):
|
||||
@@ -490,9 +509,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
input_wells = []
|
||||
for r in LIQUID_INPUT_SLOT:
|
||||
input_wells.append(plr_instance.children[r])
|
||||
final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(input_wells).dump()
|
||||
final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(
|
||||
input_wells
|
||||
).dump()
|
||||
res.response = json.dumps(final_response)
|
||||
if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param:
|
||||
if (
|
||||
issubclass(parent_resource.__class__, Deck)
|
||||
and hasattr(parent_resource, "assign_child_at_slot")
|
||||
and "slot" in other_calling_param
|
||||
):
|
||||
other_calling_param["slot"] = int(other_calling_param["slot"])
|
||||
parent_resource.assign_child_at_slot(plr_instance, **other_calling_param)
|
||||
else:
|
||||
@@ -507,14 +532,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
rts_with_parent = ResourceTreeSet.from_plr_resources([parent_resource])
|
||||
if rts_with_parent.root_nodes[0].res_content.uuid_parent is None:
|
||||
rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid
|
||||
request.command = json.dumps({
|
||||
"action": "add",
|
||||
"data": {
|
||||
"data": rts_with_parent.dump(),
|
||||
"mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent,
|
||||
"first_add": False,
|
||||
},
|
||||
})
|
||||
request.command = json.dumps(
|
||||
{
|
||||
"action": "add",
|
||||
"data": {
|
||||
"data": rts_with_parent.dump(),
|
||||
"mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent,
|
||||
"first_add": False,
|
||||
},
|
||||
}
|
||||
)
|
||||
tree_response: SerialCommand.Response = await client.call_async(request)
|
||||
uuid_maps = json.loads(tree_response.response)
|
||||
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
|
||||
@@ -811,7 +838,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
}
|
||||
|
||||
def _handle_update(
|
||||
plr_resources: List[Union[ResourcePLR, ResourceDictInstance]], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
|
||||
plr_resources: List[Union[ResourcePLR, ResourceDictInstance]],
|
||||
tree_set: ResourceTreeSet,
|
||||
additional_add_params: Dict[str, Any],
|
||||
) -> Tuple[Dict[str, Any], List[ResourcePLR]]:
|
||||
"""
|
||||
处理资源更新操作的内部函数
|
||||
@@ -836,7 +865,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
original_parent_resource = original_instance.parent
|
||||
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
|
||||
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
|
||||
not_same_parent = original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None
|
||||
not_same_parent = (
|
||||
original_parent_resource_uuid != target_parent_resource_uuid
|
||||
and original_parent_resource is not None
|
||||
)
|
||||
old_name = original_instance.name
|
||||
new_name = plr_resource.name
|
||||
parent_appended = False
|
||||
@@ -872,8 +904,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
else:
|
||||
# 判断是否变更了resource_site,重新登记
|
||||
target_site = original_instance.unilabos_extra.get("update_resource_site")
|
||||
sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None
|
||||
site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else []
|
||||
sites = (
|
||||
original_instance.parent.sites
|
||||
if original_instance.parent is not None and hasattr(original_instance.parent, "sites")
|
||||
else None
|
||||
)
|
||||
site_names = (
|
||||
list(original_instance.parent._ordering.keys())
|
||||
if original_instance.parent is not None and hasattr(original_instance.parent, "sites")
|
||||
else []
|
||||
)
|
||||
if target_site is not None and sites is not None and site_names is not None:
|
||||
site_index = sites.index(original_instance)
|
||||
site_name = site_names[site_index]
|
||||
@@ -910,9 +950,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
action = i.get("action") # remove, add, update
|
||||
resources_uuid: List[str] = i.get("data") # 资源数据
|
||||
additional_add_params = i.get("additional_add_params", {}) # 额外参数
|
||||
self.lab_logger().trace(
|
||||
f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}"
|
||||
)
|
||||
self.lab_logger().trace(f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}")
|
||||
tree_set = None
|
||||
if action in ["add", "update"]:
|
||||
tree_set = await self.get_resource(
|
||||
@@ -939,9 +977,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
tree.root_node.res_content.parent_uuid = self.uuid
|
||||
r = SerialCommand.Request()
|
||||
r.command = json.dumps(
|
||||
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
|
||||
{"data": {"data": new_tree_set.dump()}, "action": "update"}
|
||||
) # 和Update Resource一致
|
||||
response: SerialCommand_Response = await self._resource_clients[
|
||||
"c2s_update_resource_tree"].call_async(r) # type: ignore
|
||||
"c2s_update_resource_tree"
|
||||
].call_async(
|
||||
r
|
||||
) # type: ignore
|
||||
self.lab_logger().info(f"确认资源云端 Add 结果: {response.response}")
|
||||
results.append(result)
|
||||
elif action == "update":
|
||||
@@ -961,9 +1003,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
tree.root_node.res_content.parent_uuid = self.uuid
|
||||
r = SerialCommand.Request()
|
||||
r.command = json.dumps(
|
||||
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
|
||||
{"data": {"data": new_tree_set.dump()}, "action": "update"}
|
||||
) # 和Update Resource一致
|
||||
response: SerialCommand_Response = await self._resource_clients[
|
||||
"c2s_update_resource_tree"].call_async(r) # type: ignore
|
||||
"c2s_update_resource_tree"
|
||||
].call_async(
|
||||
r
|
||||
) # type: ignore
|
||||
self.lab_logger().info(f"确认资源云端 Update 结果: {response.response}")
|
||||
results.append(result)
|
||||
elif action == "remove":
|
||||
@@ -1333,7 +1379,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
resource_id=resource_data["id"], with_children=True
|
||||
)
|
||||
if "sample_id" in resource_data:
|
||||
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
|
||||
plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"]
|
||||
queried_resources[idx] = plr_resource
|
||||
else:
|
||||
uuid_indices.append((idx, unilabos_uuid, resource_data))
|
||||
@@ -1346,7 +1392,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
for i, (idx, _, resource_data) in enumerate(uuid_indices):
|
||||
plr_resource = plr_resources[i]
|
||||
if "sample_id" in resource_data:
|
||||
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
|
||||
plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"]
|
||||
queried_resources[idx] = plr_resource
|
||||
|
||||
self.lab_logger().debug(f"资源查询结果: 共 {len(queried_resources)} 个资源")
|
||||
@@ -1354,7 +1400,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
# 通过资源跟踪器获取本地实例
|
||||
final_resources = queried_resources if is_sequence else queried_resources[0]
|
||||
if not is_sequence:
|
||||
plr = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)
|
||||
plr = self.resource_tracker.figure_resource(
|
||||
{"name": final_resources.name}, try_mode=False
|
||||
)
|
||||
# 保留unilabos_extra
|
||||
if hasattr(final_resources, "unilabos_extra") and hasattr(plr, "unilabos_extra"):
|
||||
plr.unilabos_extra = getattr(final_resources, "unilabos_extra", {}).copy()
|
||||
@@ -1393,8 +1441,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
execution_success = True
|
||||
except Exception as _:
|
||||
execution_error = traceback.format_exc()
|
||||
error(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}")
|
||||
trace(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}")
|
||||
error(
|
||||
f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}"
|
||||
)
|
||||
trace(
|
||||
f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
|
||||
)
|
||||
|
||||
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs)
|
||||
future.add_done_callback(_handle_future_exception)
|
||||
@@ -1414,9 +1466,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
except Exception as _:
|
||||
execution_error = traceback.format_exc()
|
||||
error(
|
||||
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}")
|
||||
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}"
|
||||
)
|
||||
trace(
|
||||
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}")
|
||||
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
|
||||
)
|
||||
|
||||
future.add_done_callback(_handle_future_exception)
|
||||
|
||||
@@ -1539,20 +1593,29 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
try:
|
||||
function_name = target["function_name"]
|
||||
function_args = target["function_args"]
|
||||
# 获取 unilabos 系统参数
|
||||
unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {})
|
||||
|
||||
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
|
||||
function = getattr(self.driver_instance, function_name)
|
||||
assert callable(
|
||||
function
|
||||
), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}"
|
||||
|
||||
# 处理 ResourceSlot 类型参数
|
||||
args_list = default_manager._analyze_method_signature(function)["args"]
|
||||
# 处理参数(包含 unilabos 系统参数如 sample_uuids)
|
||||
args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"]
|
||||
for arg in args_list:
|
||||
arg_name = arg["name"]
|
||||
arg_type = arg["type"]
|
||||
|
||||
# 跳过不在 function_args 中的参数
|
||||
if arg_name not in function_args:
|
||||
# 处理 sample_uuids 参数注入
|
||||
if arg_name == PARAM_SAMPLE_UUIDS:
|
||||
function_args[PARAM_SAMPLE_UUIDS] = unilabos_param.get(PARAM_SAMPLE_UUIDS, [])
|
||||
self.lab_logger().debug(
|
||||
f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {function_args[PARAM_SAMPLE_UUIDS]}"
|
||||
)
|
||||
continue
|
||||
|
||||
# 处理单个 ResourceSlot
|
||||
@@ -1581,6 +1644,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
|
||||
)
|
||||
raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}")
|
||||
|
||||
# todo: 默认反报送
|
||||
return function(**function_args)
|
||||
except KeyError as ex:
|
||||
@@ -1601,14 +1665,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
raise ValueError("至少需要提供一个 UUID")
|
||||
|
||||
uuids_list = list(uuids)
|
||||
future = self._resource_clients["c2s_update_resource_tree"].call_async(SerialCommand.Request(
|
||||
command=json.dumps(
|
||||
{
|
||||
"data": {"data": uuids_list, "with_children": True},
|
||||
"action": "get",
|
||||
}
|
||||
future = self._resource_clients["c2s_update_resource_tree"].call_async(
|
||||
SerialCommand.Request(
|
||||
command=json.dumps(
|
||||
{
|
||||
"data": {"data": uuids_list, "with_children": True},
|
||||
"action": "get",
|
||||
}
|
||||
)
|
||||
)
|
||||
))
|
||||
)
|
||||
|
||||
# 等待结果(使用while循环,每次sleep 0.05秒,最多等待30秒)
|
||||
timeout = 30.0
|
||||
@@ -1666,6 +1732,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
try:
|
||||
function_name = target["function_name"]
|
||||
function_args = target["function_args"]
|
||||
# 获取 unilabos 系统参数
|
||||
unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {})
|
||||
|
||||
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
|
||||
function = getattr(self.driver_instance, function_name)
|
||||
assert callable(
|
||||
@@ -1675,14 +1744,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
function
|
||||
), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}"
|
||||
|
||||
# 处理 ResourceSlot 类型参数
|
||||
args_list = default_manager._analyze_method_signature(function)["args"]
|
||||
# 处理参数(包含 unilabos 系统参数如 sample_uuids)
|
||||
args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"]
|
||||
for arg in args_list:
|
||||
arg_name = arg["name"]
|
||||
arg_type = arg["type"]
|
||||
|
||||
# 跳过不在 function_args 中的参数
|
||||
if arg_name not in function_args:
|
||||
# 处理 sample_uuids 参数注入
|
||||
if arg_name == PARAM_SAMPLE_UUIDS:
|
||||
function_args[PARAM_SAMPLE_UUIDS] = unilabos_param.get(PARAM_SAMPLE_UUIDS, [])
|
||||
self.lab_logger().debug(
|
||||
f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {function_args[PARAM_SAMPLE_UUIDS]}"
|
||||
)
|
||||
continue
|
||||
|
||||
# 处理单个 ResourceSlot
|
||||
@@ -1960,7 +2035,9 @@ class ROS2DeviceNode:
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_forever()
|
||||
|
||||
ROS2DeviceNode._asyncio_loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNode")
|
||||
ROS2DeviceNode._asyncio_loop_thread = threading.Thread(
|
||||
target=run_event_loop, daemon=True, name="ROS2DeviceNode"
|
||||
)
|
||||
ROS2DeviceNode._asyncio_loop_thread.start()
|
||||
logger.info(f"循环线程已启动")
|
||||
|
||||
|
||||
@@ -42,6 +42,9 @@ from unilabos.resources.resource_tracker import (
|
||||
ResourceDictInstance,
|
||||
ResourceTreeSet,
|
||||
ResourceTreeInstance,
|
||||
EXTRA_SAMPLE_UUID,
|
||||
EXTRA_UNILABOS_SAMPLE_UUID,
|
||||
RETURN_UNILABOS_SAMPLES,
|
||||
)
|
||||
from unilabos.utils import logger
|
||||
from unilabos.utils.exception import DeviceClassInvalid
|
||||
@@ -755,6 +758,7 @@ class HostNode(BaseROS2DeviceNode):
|
||||
item: "QueueItem",
|
||||
action_type: str,
|
||||
action_kwargs: Dict[str, Any],
|
||||
sample_material: Dict[str, str],
|
||||
server_info: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
"""
|
||||
@@ -791,12 +795,17 @@ class HostNode(BaseROS2DeviceNode):
|
||||
|
||||
action_client: ActionClient = self._action_clients[action_id]
|
||||
|
||||
# 遍历action_kwargs下的所有子dict,将"sample_uuid"的值赋给"sample_id"
|
||||
# 遍历action_kwargs下的所有子dict,将sample_uuid的值赋给sample_id
|
||||
def assign_sample_id(obj):
|
||||
if isinstance(obj, dict):
|
||||
if "sample_uuid" in obj:
|
||||
obj["sample_id"] = obj["sample_uuid"]
|
||||
obj.pop("sample_uuid")
|
||||
# 处理 EXTRA_SAMPLE_UUID ("sample_uuid")
|
||||
if EXTRA_SAMPLE_UUID in obj:
|
||||
obj["sample_id"] = obj[EXTRA_SAMPLE_UUID]
|
||||
obj.pop(EXTRA_SAMPLE_UUID)
|
||||
# 处理 EXTRA_UNILABOS_SAMPLE_UUID ("unilabos_sample_uuid")
|
||||
if EXTRA_UNILABOS_SAMPLE_UUID in obj:
|
||||
obj["sample_id"] = obj[EXTRA_UNILABOS_SAMPLE_UUID]
|
||||
obj.pop(EXTRA_UNILABOS_SAMPLE_UUID)
|
||||
for k, v in obj.items():
|
||||
if k != "unilabos_extra":
|
||||
assign_sample_id(v)
|
||||
@@ -867,14 +876,14 @@ class HostNode(BaseROS2DeviceNode):
|
||||
# 适配后端的一些额外处理
|
||||
return_value = return_info.get("return_value")
|
||||
if isinstance(return_value, dict):
|
||||
unilabos_samples = return_value.pop("unilabos_samples", None)
|
||||
unilabos_samples = return_value.pop(RETURN_UNILABOS_SAMPLES, None)
|
||||
if isinstance(unilabos_samples, list) and unilabos_samples:
|
||||
self.lab_logger().info(
|
||||
f"[Host Node] Job {job_id[:8]} returned {len(unilabos_samples)} sample(s): "
|
||||
f"{[s.get('name', s.get('id', 'unknown')) if isinstance(s, dict) else str(s)[:20] for s in unilabos_samples[:5]]}"
|
||||
f"{'...' if len(unilabos_samples) > 5 else ''}"
|
||||
)
|
||||
return_info["unilabos_samples"] = unilabos_samples
|
||||
return_info[RETURN_UNILABOS_SAMPLES] = unilabos_samples
|
||||
suc = return_info.get("suc", False)
|
||||
if not suc:
|
||||
status = "failed"
|
||||
|
||||
@@ -27,6 +27,7 @@ __all__ = [
|
||||
|
||||
from ast import Constant
|
||||
|
||||
from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS
|
||||
from unilabos.utils import logger
|
||||
from unilabos.utils.decorator import is_not_action
|
||||
|
||||
@@ -341,13 +342,18 @@ class ImportManager:
|
||||
result["action_methods"][method_name] = method_info
|
||||
return result
|
||||
|
||||
def _analyze_method_signature(self, method) -> Dict[str, Any]:
|
||||
def _analyze_method_signature(self, method, skip_unilabos_params: bool = True) -> Dict[str, Any]:
|
||||
"""
|
||||
分析方法签名,提取具体的命名参数信息
|
||||
|
||||
注意:此方法会跳过*args和**kwargs,只提取具体的命名参数
|
||||
这样可以确保通过**dict方式传参时的准确性
|
||||
|
||||
Args:
|
||||
method: 要分析的方法
|
||||
skip_unilabos_params: 是否跳过 unilabos 系统参数(如 sample_uuids),
|
||||
registry 补全时为 True,JsonCommand 执行时为 False
|
||||
|
||||
示例用法:
|
||||
method_info = self._analyze_method_signature(some_method)
|
||||
params = {"param1": "value1", "param2": "value2"}
|
||||
@@ -368,6 +374,10 @@ class ImportManager:
|
||||
if param.kind == param.VAR_KEYWORD: # **kwargs
|
||||
continue
|
||||
|
||||
# 跳过 sample_uuids 参数(由系统自动注入,registry 补全时跳过)
|
||||
if skip_unilabos_params and param_name == PARAM_SAMPLE_UUIDS:
|
||||
continue
|
||||
|
||||
is_required = param.default == inspect.Parameter.empty
|
||||
if is_required:
|
||||
num_required += 1
|
||||
@@ -563,6 +573,9 @@ class ImportManager:
|
||||
for i, arg in enumerate(node.args.args):
|
||||
if arg.arg == "self":
|
||||
continue
|
||||
# 跳过 sample_uuids 参数(由系统自动注入)
|
||||
if arg.arg == PARAM_SAMPLE_UUIDS:
|
||||
continue
|
||||
arg_info = {
|
||||
"name": arg.arg,
|
||||
"type": None,
|
||||
|
||||
@@ -60,7 +60,11 @@
|
||||
==================== 连接关系图 ====================
|
||||
|
||||
控制流 (ready 端口串联):
|
||||
create_resource_1 -> create_resource_2 -> ... -> set_liquid_1 -> set_liquid_2 -> ... -> transfer_liquid_1 -> transfer_liquid_2 -> ...
|
||||
- create_resource 之间: 无 ready 连接
|
||||
- set_liquid_from_plate 之间: 无 ready 连接
|
||||
- create_resource 与 set_liquid_from_plate 之间: 无 ready 连接
|
||||
- transfer_liquid 之间: 通过 ready 端口串联
|
||||
transfer_liquid_1 -> transfer_liquid_2 -> transfer_liquid_3 -> ...
|
||||
|
||||
物料流:
|
||||
[create_resource] --labware--> [set_liquid_from_plate] --output_wells--> [transfer_liquid] --sources_out/targets_out--> [下一个 transfer_liquid]
|
||||
@@ -402,7 +406,6 @@ def build_protocol_graph(
|
||||
|
||||
# 为每个唯一的 slot 创建 create_resource 节点
|
||||
res_index = 0
|
||||
last_create_resource_id = None
|
||||
for slot, info in slots_info.items():
|
||||
node_id = str(uuid.uuid4())
|
||||
res_id = info["res_id"]
|
||||
@@ -431,10 +434,7 @@ def build_protocol_graph(
|
||||
)
|
||||
slot_to_create_resource[slot] = node_id
|
||||
|
||||
# create_resource 之间通过 ready 串联
|
||||
if last_create_resource_id is not None:
|
||||
G.add_edge(last_create_resource_id, node_id, source_port="ready", target_port="ready")
|
||||
last_create_resource_id = node_id
|
||||
# create_resource 之间不需要 ready 连接
|
||||
|
||||
# ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ====================
|
||||
# 创建 Group 节点,包含所有 set_liquid_from_plate 节点
|
||||
@@ -453,7 +453,6 @@ def build_protocol_graph(
|
||||
)
|
||||
|
||||
set_liquid_index = 0
|
||||
last_set_liquid_id = last_create_resource_id # set_liquid_from_plate 连接在 create_resource 之后
|
||||
|
||||
for labware_id, item in labware_info.items():
|
||||
# 跳过 Tip/Rack 类型
|
||||
@@ -494,10 +493,7 @@ def build_protocol_graph(
|
||||
},
|
||||
)
|
||||
|
||||
# ready 连接:上一个节点 -> set_liquid_from_plate
|
||||
if last_set_liquid_id is not None:
|
||||
G.add_edge(last_set_liquid_id, node_id, source_port="ready", target_port="ready")
|
||||
last_set_liquid_id = node_id
|
||||
# set_liquid_from_plate 之间不需要 ready 连接
|
||||
|
||||
# 物料流:create_resource 的 labware -> set_liquid_from_plate 的 input_plate
|
||||
create_res_node_id = slot_to_create_resource.get(slot)
|
||||
@@ -507,7 +503,8 @@ def build_protocol_graph(
|
||||
# set_liquid_from_plate 的输出 output_wells 用于连接 transfer_liquid
|
||||
resource_last_writer[labware_id] = f"{node_id}:output_wells"
|
||||
|
||||
last_control_node_id = last_set_liquid_id
|
||||
# transfer_liquid 之间通过 ready 串联,从 None 开始
|
||||
last_control_node_id = None
|
||||
|
||||
# 端口名称映射:JSON 字段名 -> 实际 handle key
|
||||
INPUT_PORT_MAPPING = {
|
||||
|
||||
Reference in New Issue
Block a user