mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-17 05:05:12 +00:00
Compare commits
15 Commits
4e92a26057
...
88ae56806c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
88ae56806c | ||
|
|
95dd8beb81 | ||
|
|
4ab3fadbec | ||
|
|
229888f834 | ||
|
|
b443b39ebf | ||
|
|
0434bbc15b | ||
|
|
5791b81954 | ||
|
|
bd51c74fab | ||
|
|
ba81cbddf8 | ||
|
|
c2895bb197 | ||
|
|
0423f4f452 | ||
|
|
98bdb4e7e4 | ||
|
|
9d2c93807d | ||
|
|
2d26c3fac6 | ||
|
|
f5753afb7c |
68
test/resources/test_resourcetreeset.py
Normal file
68
test/resources/test_resourcetreeset.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import pytest
|
||||
import json
|
||||
import os
|
||||
|
||||
from pylabrobot.resources import Resource as ResourcePLR
|
||||
from unilabos.resources.graphio import resource_bioyond_to_plr
|
||||
from unilabos.ros.nodes.resource_tracker import ResourceTreeSet
|
||||
from unilabos.registry.registry import lab_registry
|
||||
|
||||
from unilabos.resources.bioyond.decks import BIOYOND_PolymerReactionStation_Deck
|
||||
|
||||
lab_registry.setup()
|
||||
|
||||
|
||||
type_mapping = {
|
||||
"烧杯": ("BIOYOND_PolymerStation_1FlaskCarrier", "3a14196b-24f2-ca49-9081-0cab8021bf1a"),
|
||||
"试剂瓶": ("BIOYOND_PolymerStation_1BottleCarrier", ""),
|
||||
"样品板": ("BIOYOND_PolymerStation_6StockCarrier", "3a14196e-b7a0-a5da-1931-35f3000281e9"),
|
||||
"分装板": ("BIOYOND_PolymerStation_6VialCarrier", "3a14196e-5dfe-6e21-0c79-fe2036d052c4"),
|
||||
"样品瓶": ("BIOYOND_PolymerStation_Solid_Stock", "3a14196a-cf7d-8aea-48d8-b9662c7dba94"),
|
||||
"90%分装小瓶": ("BIOYOND_PolymerStation_Solid_Vial", "3a14196c-cdcf-088d-dc7d-5cf38f0ad9ea"),
|
||||
"10%分装小瓶": ("BIOYOND_PolymerStation_Liquid_Vial", "3a14196c-76be-2279-4e22-7310d69aed68"),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bioyond_materials_reaction() -> list[dict]:
|
||||
print("加载 BioYond 物料数据...")
|
||||
print(os.getcwd())
|
||||
with open("bioyond_materials_reaction.json", "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(f"加载了 {len(data)} 条物料数据")
|
||||
return data
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bioyond_materials_liquidhandling_1() -> list[dict]:
|
||||
print("加载 BioYond 物料数据...")
|
||||
print(os.getcwd())
|
||||
with open("bioyond_materials_liquidhandling_1.json", "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(f"加载了 {len(data)} 条物料数据")
|
||||
return data
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bioyond_materials_liquidhandling_2() -> list[dict]:
|
||||
print("加载 BioYond 物料数据...")
|
||||
print(os.getcwd())
|
||||
with open("bioyond_materials_liquidhandling_2.json", "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(f"加载了 {len(data)} 条物料数据")
|
||||
return data
|
||||
|
||||
|
||||
@pytest.mark.parametrize("materials_fixture", [
|
||||
"bioyond_materials_reaction",
|
||||
"bioyond_materials_liquidhandling_1",
|
||||
])
|
||||
def test_resourcetreeset_from_plr(materials_fixture, request) -> list[dict]:
|
||||
materials = request.getfixturevalue(materials_fixture)
|
||||
deck = BIOYOND_PolymerReactionStation_Deck("test_deck")
|
||||
output = resource_bioyond_to_plr(materials, type_mapping=type_mapping, deck=deck)
|
||||
print(deck.summary())
|
||||
|
||||
r = ResourceTreeSet.from_plr_resources([deck])
|
||||
print(r.dump())
|
||||
# json.dump(deck.serialize(), open("test.json", "w", encoding="utf-8"), indent=4)
|
||||
@@ -545,7 +545,7 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
# 清空工作流序列和参数,防止下次执行时累积重复
|
||||
self.pending_task_params = []
|
||||
self.clear_workflows()
|
||||
self.clear_workflows() # 清空工作流序列,避免重复累积
|
||||
|
||||
# print(f"\n✅ 任务创建成功: {result}")
|
||||
# print(f"\n✅ 任务创建成功")
|
||||
|
||||
@@ -668,7 +668,7 @@ __all__ = [
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 简单测试HTTP服务
|
||||
class DummyWorkstation:
|
||||
class BioyondWorkstation:
|
||||
device_id = "WS-001"
|
||||
|
||||
def process_step_finish_report(self, report_request):
|
||||
|
||||
@@ -643,15 +643,15 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
|
||||
for detail in material["detail"]:
|
||||
number = (
|
||||
(detail.get("z", 0) - 1) * plr_material.num_items_x * plr_material.num_items_y
|
||||
+ (detail.get("x", 0) - 1) * plr_material.num_items_x
|
||||
+ (detail.get("y", 0) - 1)
|
||||
+ (detail.get("y", 0) - 1) * plr_material.num_items_y
|
||||
+ (detail.get("x", 0) - 1)
|
||||
)
|
||||
bottle = plr_material[number]
|
||||
if detail["name"] in type_mapping:
|
||||
if detail["typeName"] in type_mapping:
|
||||
# plr_material.unassign_child_resource(bottle)
|
||||
plr_material.sites[number] = None
|
||||
plr_material[number] = initialize_resource(
|
||||
{"name": f'{detail["name"]}_{number}', "class": type_mapping[detail["name"]][0]}, resource_type=ResourcePLR
|
||||
{"name": f'{detail["name"]}_{number}', "class": type_mapping[detail["typeName"]][0]}, resource_type=ResourcePLR
|
||||
)
|
||||
else:
|
||||
bottle.tracker.liquids = [
|
||||
|
||||
@@ -74,6 +74,7 @@ class ItemizedCarrier(ResourcePLR):
|
||||
num_items_x: int = 0,
|
||||
num_items_y: int = 0,
|
||||
num_items_z: int = 0,
|
||||
layout: str = "x-y",
|
||||
sites: Optional[Dict[Union[int, str], Optional[ResourcePLR]]] = None,
|
||||
category: Optional[str] = "carrier",
|
||||
model: Optional[str] = None,
|
||||
@@ -88,6 +89,8 @@ class ItemizedCarrier(ResourcePLR):
|
||||
)
|
||||
self.num_items = len(sites)
|
||||
self.num_items_x, self.num_items_y, self.num_items_z = num_items_x, num_items_y, num_items_z
|
||||
self.layout = "z-y" if self.num_items_z > 1 and self.num_items_x == 1 else "x-z" if self.num_items_z > 1 and self.num_items_y == 1 else "x-y"
|
||||
|
||||
if isinstance(sites, dict):
|
||||
sites = sites or {}
|
||||
self.sites: List[Optional[ResourcePLR]] = list(sites.values())
|
||||
@@ -150,7 +153,7 @@ class ItemizedCarrier(ResourcePLR):
|
||||
def assign_resource_to_site(self, resource: ResourcePLR, spot: int):
|
||||
if self.sites[spot] is not None and not isinstance(self.sites[spot], ResourceHolder):
|
||||
raise ValueError(f"spot {spot} already has a resource, {resource}")
|
||||
self.assign_child_resource(resource, location=self.child_locations.get(str(spot)), spot=spot)
|
||||
self.assign_child_resource(resource, location=self.child_locations.get(list(self._ordering.keys())[spot]), spot=spot)
|
||||
|
||||
def unassign_child_resource(self, resource: ResourcePLR):
|
||||
found = False
|
||||
@@ -403,6 +406,7 @@ class ItemizedCarrier(ResourcePLR):
|
||||
"num_items_x": self.num_items_x,
|
||||
"num_items_y": self.num_items_y,
|
||||
"num_items_z": self.num_items_z,
|
||||
"layout": self.layout,
|
||||
"sites": [{
|
||||
"label": str(identifier),
|
||||
"visible": True if self[identifier] is not None else False,
|
||||
|
||||
@@ -5,10 +5,13 @@ from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_reso
|
||||
from unilabos.resources.itemized_carrier import ItemizedCarrier, ResourcePLR
|
||||
|
||||
|
||||
LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
|
||||
|
||||
def warehouse_factory(
|
||||
name: str,
|
||||
num_items_x: int = 4,
|
||||
num_items_y: int = 1,
|
||||
num_items_x: int = 1,
|
||||
num_items_y: int = 4,
|
||||
num_items_z: int = 4,
|
||||
dx: float = 137.0,
|
||||
dy: float = 96.0,
|
||||
@@ -33,13 +36,16 @@ def warehouse_factory(
|
||||
locations.append(Coordinate(x, y, z))
|
||||
if removed_positions:
|
||||
locations = [loc for i, loc in enumerate(locations) if i not in removed_positions]
|
||||
sites = create_homogeneous_resources(
|
||||
_sites = create_homogeneous_resources(
|
||||
klass=ResourceHolder,
|
||||
locations=locations,
|
||||
resource_size_x=127.0,
|
||||
resource_size_y=86.0,
|
||||
name_prefix=name,
|
||||
)
|
||||
len_x, len_y = (num_items_x, num_items_y) if num_items_z == 1 else (num_items_y, num_items_z) if num_items_x == 1 else (num_items_x, num_items_z)
|
||||
keys = [f"{LETTERS[j]}{i + 1}" for i in range(len_x) for j in range(len_y)]
|
||||
sites = {i: site for i, site in zip(keys, _sites.values())}
|
||||
|
||||
return WareHouse(
|
||||
name=name,
|
||||
|
||||
@@ -32,7 +32,7 @@ class ResourceDictPositionObject(BaseModel):
|
||||
class ResourceDictPosition(BaseModel):
|
||||
size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize)
|
||||
scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale)
|
||||
layout: Literal["2d"] = Field(description="Resource layout", default="2d")
|
||||
layout: Literal["2d", "x-y", "z-y", "x-z"] = Field(description="Resource layout", default="x-y")
|
||||
position: ResourceDictPositionObject = Field(
|
||||
description="Resource position", default_factory=ResourceDictPositionObject
|
||||
)
|
||||
@@ -42,6 +42,7 @@ class ResourceDictPosition(BaseModel):
|
||||
rotation: ResourceDictPositionObject = Field(
|
||||
description="Resource rotation", default_factory=ResourceDictPositionObject
|
||||
)
|
||||
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field(description="Cross section type", default="rectangle")
|
||||
|
||||
|
||||
# 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化
|
||||
@@ -58,6 +59,7 @@ class ResourceDict(BaseModel):
|
||||
type: Literal["device"] | str = Field(description="Resource type")
|
||||
klass: str = Field(alias="class", description="Resource class name")
|
||||
position: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
|
||||
pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
|
||||
config: Dict[str, Any] = Field(description="Resource configuration")
|
||||
data: Dict[str, Any] = Field(description="Resource data")
|
||||
|
||||
@@ -136,6 +138,8 @@ class ResourceDictInstance(object):
|
||||
content["config"] = {}
|
||||
if not content.get("data"):
|
||||
content["data"] = {}
|
||||
if "pose" not in content:
|
||||
content["pose"] = content.get("position", {})
|
||||
return ResourceDictInstance(ResourceDict.model_validate(content))
|
||||
|
||||
def get_nested_dict(self) -> Dict[str, Any]:
|
||||
@@ -330,6 +334,21 @@ class ResourceTreeSet(object):
|
||||
) -> ResourceDictInstance:
|
||||
current_uuid = uuids.pop(0)
|
||||
|
||||
raw_pos = (
|
||||
{"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]}
|
||||
if d["location"]
|
||||
else {"x": 0, "y": 0, "z": 0}
|
||||
)
|
||||
pos = {
|
||||
"size": {"width": d["size_x"], "height": d["size_y"], "depth": d["size_z"]},
|
||||
"scale": {"x": 1.0, "y": 1.0, "z": 1.0},
|
||||
"layout": d.get("layout", "x-y"),
|
||||
"position": raw_pos,
|
||||
"position3d": raw_pos,
|
||||
"rotation": d["rotation"],
|
||||
"cross_section_type": d.get("cross_section_type", "rectangle"),
|
||||
}
|
||||
|
||||
# 先构建当前节点的字典(不包含children)
|
||||
r_dict = {
|
||||
"id": d["name"],
|
||||
@@ -338,12 +357,10 @@ class ResourceTreeSet(object):
|
||||
"parent": parent_resource, # 直接传入 ResourceDict 对象
|
||||
"type": replace_plr_type(d.get("category", "")),
|
||||
"class": d.get("class", ""),
|
||||
"position": (
|
||||
{"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]}
|
||||
if d["location"]
|
||||
else {"x": 0, "y": 0, "z": 0}
|
||||
),
|
||||
"config": {k: v for k, v in d.items() if k not in ["name", "children", "parent_name", "location"]},
|
||||
"position": pos,
|
||||
"pose": pos,
|
||||
"config": {k: v for k, v in d.items() if k not in
|
||||
["name", "children", "parent_name", "location", "rotation", "size_x", "size_y", "size_z", "cross_section_type", "bottom_type"]},
|
||||
"data": states[d["name"]],
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user