refactor: add itemized_carrier instead of carrier consists of ResourceHolder

This commit is contained in:
Junhan Chang
2025-09-29 20:36:45 +08:00
parent a5397ffe12
commit 10aabb7592
6 changed files with 373 additions and 121 deletions

View File

@@ -1,6 +1,6 @@
from pylabrobot.resources import create_homogeneous_resources, Coordinate, ResourceHolder from pylabrobot.resources import create_homogeneous_resources, Coordinate, ResourceHolder, create_ordered_items_2d
from unilabos.resources.bottle_carrier import Bottle, BottleCarrier from unilabos.resources.itemized_carrier import Bottle, BottleCarrier
from unilabos.resources.bioyond.bottles import BIOYOND_PolymerStation_Solid_Vial, BIOYOND_PolymerStation_Solution_Beaker, BIOYOND_PolymerStation_Reagent_Bottle from unilabos.resources.bioyond.bottles import BIOYOND_PolymerStation_Solid_Vial, BIOYOND_PolymerStation_Solution_Beaker, BIOYOND_PolymerStation_Reagent_Bottle
# 命名约定:试剂瓶-Bottle烧杯-Beaker烧瓶-Flask小瓶-Vial # 命名约定:试剂瓶-Bottle烧杯-Beaker烧瓶-Flask小瓶-Vial
@@ -22,27 +22,29 @@ def BIOYOND_Electrolyte_6VialCarrier(name: str) -> BottleCarrier:
start_x = (carrier_size_x - (3 - 1) * bottle_spacing_x - bottle_diameter) / 2 start_x = (carrier_size_x - (3 - 1) * bottle_spacing_x - bottle_diameter) / 2
start_y = (carrier_size_y - (2 - 1) * bottle_spacing_y - bottle_diameter) / 2 start_y = (carrier_size_y - (2 - 1) * bottle_spacing_y - bottle_diameter) / 2
# 创建6个位置坐标 (2行 x 3列) sites = create_ordered_items_2d(
locations = [] klass=ResourceHolder,
for row in range(2): num_items_x=3,
for col in range(3): num_items_y=2,
x = start_x + col * bottle_spacing_x dx=start_x,
y = start_y + row * bottle_spacing_y dy=start_y,
z = 5.0 # 架位底部 dz=5.0,
locations.append(Coordinate(x, y, z)) item_dx=bottle_spacing_x,
item_dy=bottle_spacing_y,
size_x=bottle_diameter,
size_y=bottle_diameter,
size_z=carrier_size_z,
)
for k, v in sites.items():
v.name = f"{name}_{v.name}"
carrier = BottleCarrier( carrier = BottleCarrier(
name=name, name=name,
size_x=carrier_size_x, size_x=carrier_size_x,
size_y=carrier_size_y, size_y=carrier_size_y,
size_z=carrier_size_z, size_z=carrier_size_z,
sites=create_homogeneous_resources( sites=sites,
klass=ResourceHolder,
locations=locations,
resource_size_x=bottle_diameter,
resource_size_y=bottle_diameter,
name_prefix=name,
),
model="BIOYOND_Electrolyte_6VialCarrier", model="BIOYOND_Electrolyte_6VialCarrier",
) )
carrier.num_items_x = 3 carrier.num_items_x = 3
@@ -107,27 +109,29 @@ def BIOYOND_PolymerStation_6VialCarrier(name: str) -> BottleCarrier:
start_x = (carrier_size_x - (3 - 1) * bottle_spacing_x - bottle_diameter) / 2 start_x = (carrier_size_x - (3 - 1) * bottle_spacing_x - bottle_diameter) / 2
start_y = (carrier_size_y - (2 - 1) * bottle_spacing_y - bottle_diameter) / 2 start_y = (carrier_size_y - (2 - 1) * bottle_spacing_y - bottle_diameter) / 2
# 创建6个位置坐标 (2行 x 3列) sites = create_ordered_items_2d(
locations = [] klass=ResourceHolder,
for row in range(2): num_items_x=3,
for col in range(3): num_items_y=2,
x = start_x + col * bottle_spacing_x dx=start_x,
y = start_y + row * bottle_spacing_y dy=start_y,
z = 5.0 # 架位底部 dz=5.0,
locations.append(Coordinate(x, y, z)) item_dx=bottle_spacing_x,
item_dy=bottle_spacing_y,
size_x=bottle_diameter,
size_y=bottle_diameter,
size_z=carrier_size_z,
)
for k, v in sites.items():
v.name = f"{name}_{v.name}"
carrier = BottleCarrier( carrier = BottleCarrier(
name=name, name=name,
size_x=carrier_size_x, size_x=carrier_size_x,
size_y=carrier_size_y, size_y=carrier_size_y,
size_z=carrier_size_z, size_z=carrier_size_z,
sites=create_homogeneous_resources( sites=sites,
klass=ResourceHolder,
locations=locations,
resource_size_x=bottle_diameter,
resource_size_y=bottle_diameter,
name_prefix=name,
),
model="BIOYOND_PolymerStation_6VialCarrier", model="BIOYOND_PolymerStation_6VialCarrier",
) )
carrier.num_items_x = 3 carrier.num_items_x = 3

View File

@@ -1,4 +1,4 @@
from unilabos.resources.bottle_carrier import Bottle, BottleCarrier from unilabos.resources.itemized_carrier import Bottle, BottleCarrier
# 工厂函数 # 工厂函数

View File

@@ -1,72 +0,0 @@
"""
自动化液体处理工作站物料类定义 - 简化版
Automated Liquid Handling Station Resource Classes - Simplified Version
"""
from __future__ import annotations
from typing import Dict, Optional
from pylabrobot.resources.coordinate import Coordinate
from pylabrobot.resources.container import Container
from pylabrobot.resources.carrier import TubeCarrier
from pylabrobot.resources.resource_holder import ResourceHolder
class Bottle(Container):
"""瓶子类 - 简化版,不追踪瓶盖"""
def __init__(
self,
name: str,
diameter: float,
height: float,
max_volume: float,
barcode: Optional[str] = "",
category: str = "container",
model: Optional[str] = None,
):
super().__init__(
name=name,
size_x=diameter,
size_y=diameter,
size_z=height,
max_volume=max_volume,
category=category,
model=model,
)
self.diameter = diameter
self.height = height
self.barcode = barcode
def serialize(self) -> dict:
return {
**super().serialize(),
"diameter": self.diameter,
"height": self.height,
"barcode": self.barcode,
}
class BottleCarrier(TubeCarrier):
"""瓶载架 - 直接继承自 TubeCarrier"""
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
sites: Optional[Dict[int, ResourceHolder]] = None,
category: str = "bottle_carrier",
model: Optional[str] = None,
):
super().__init__(
name=name,
size_x=size_x,
size_y=size_y,
size_z=size_z,
sites=sites,
category=category,
model=model,
)

View File

@@ -4,6 +4,7 @@ import json
from typing import Union, Any, Dict from typing import Union, Any, Dict
import numpy as np import numpy as np
import networkx as nx import networkx as nx
from pylabrobot.resources import ResourceHolder
from unilabos_msgs.msg import Resource from unilabos_msgs.msg import Resource
from unilabos.resources.container import RegularContainer from unilabos.resources.container import RegularContainer
@@ -507,7 +508,7 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: dict =
number = (detail.get("z", 0) - 1) * plr_material.num_items_x * plr_material.num_items_y + \ number = (detail.get("z", 0) - 1) * plr_material.num_items_x * plr_material.num_items_y + \
(detail.get("x", 0) - 1) * plr_material.num_items_x + \ (detail.get("x", 0) - 1) * plr_material.num_items_x + \
(detail.get("y", 0) - 1) (detail.get("y", 0) - 1)
bottle = plr_material[number].resource bottle = plr_material[number]
bottle.code = detail.get("code", "") bottle.code = detail.get("code", "")
bottle.tracker.liquids = [(detail["name"], float(detail.get("quantity", 0)) if detail.get("quantity") else 0)] bottle.tracker.liquids = [(detail["name"], float(detail.get("quantity", 0)) if detail.get("quantity") else 0)]
@@ -520,8 +521,8 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: dict =
idx = (loc.get("y", 0) - 1) * warehouse.num_items_x * warehouse.num_items_y + \ idx = (loc.get("y", 0) - 1) * warehouse.num_items_x * warehouse.num_items_y + \
(loc.get("x", 0) - 1) * warehouse.num_items_x + \ (loc.get("x", 0) - 1) * warehouse.num_items_x + \
(loc.get("z", 0) - 1) (loc.get("z", 0) - 1)
if 0 <= idx < warehouse.num_items_x * warehouse.num_items_y * warehouse.num_items_z: if 0 <= idx < warehouse.capacity:
if warehouse[idx].resource is None: if warehouse[idx] is None or isinstance(warehouse[idx], ResourceHolder):
warehouse[idx] = plr_material warehouse[idx] = plr_material
return plr_materials return plr_materials

View File

@@ -0,0 +1,322 @@
"""
自动化液体处理工作站物料类定义 - 简化版
Automated Liquid Handling Station Resource Classes - Simplified Version
"""
from __future__ import annotations
from typing import Dict, Optional
from pylabrobot.resources.coordinate import Coordinate
from pylabrobot.resources.container import Container
from pylabrobot.resources.resource_holder import ResourceHolder
from pylabrobot.resources import Resource as ResourcePLR
class Bottle(Container):
"""瓶子类 - 简化版,不追踪瓶盖"""
def __init__(
self,
name: str,
diameter: float,
height: float,
max_volume: float,
size_x: float = 0.0,
size_y: float = 0.0,
size_z: float = 0.0,
barcode: Optional[str] = "",
category: str = "container",
model: Optional[str] = None,
):
super().__init__(
name=name,
size_x=diameter,
size_y=diameter,
size_z=height,
max_volume=max_volume,
category=category,
model=model,
)
self.diameter = diameter
self.height = height
self.barcode = barcode
def serialize(self) -> dict:
return {
**super().serialize(),
"diameter": self.diameter,
"height": self.height,
"barcode": self.barcode,
}
from string import ascii_uppercase as LETTERS
from typing import Dict, List, Optional, Type, TypeVar, Union, Sequence, Tuple
import pylabrobot
from pylabrobot.resources.resource_holder import ResourceHolder
T = TypeVar("T", bound=ResourceHolder)
S = TypeVar("S", bound=ResourceHolder)
class ItemizedCarrier(ResourcePLR):
"""Base class for all carriers."""
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
sites: Optional[Dict[Union[int, str], Optional[ResourcePLR]]] = None,
category: Optional[str] = "carrier",
model: Optional[str] = None,
):
super().__init__(
name=name,
size_x=size_x,
size_y=size_y,
size_z=size_z,
category=category,
model=model,
)
sites = sites or {}
self.sites: List[Optional[ResourcePLR]] = list(sites.values())
self._ordering = sites
self.num_items = len(self.sites)
self.child_locations: Dict[str, Coordinate] = {}
for spot, resource in sites.items():
if resource is not None and getattr(resource, "location", None) is None:
raise ValueError(f"resource {resource} has no location")
if resource is not None:
self.child_locations[spot] = resource.location
else:
self.child_locations[spot] = Coordinate.zero()
@property
def capacity(self):
"""The number of sites on this carrier."""
return len(self.sites)
def __len__(self) -> int:
"""Return the number of sites on this carrier."""
return len(self.sites)
def assign_child_resource(
self,
resource: ResourcePLR,
location: Optional[Coordinate],
reassign: bool = True,
spot: Optional[int] = None,
):
idx = spot if spot is not None else len(self.sites)
if not reassign and self.sites[idx] is not None:
raise ValueError(f"a site with index {idx} already exists")
super().assign_child_resource(resource, location=location, reassign=reassign)
self.sites[idx] = resource
def assign_resource_to_site(self, resource: ResourcePLR, spot: int):
if self.sites[spot] is not None and not isinstance(self.sites[spot], ResourceHolder):
raise ValueError(f"spot {spot} already has a resource, {resource}")
self.assign_child_resource(resource, location=self.child_locations.get(str(spot)), spot=spot)
def unassign_child_resource(self, resource: ResourcePLR):
found = False
for spot, res in enumerate(self.sites):
if res == resource:
self.sites[spot] = None
found = True
break
if not found:
raise ValueError(f"Resource {resource} is not assigned to this carrier")
if hasattr(resource, "unassign"):
resource.unassign()
def __getitem__(
self,
identifier: Union[str, int, Sequence[int], Sequence[str], slice, range],
) -> Union[List[T], T]:
"""Get the items with the given identifier.
This is a convenience method for getting the items with the given identifier. It is equivalent
to :meth:`get_items`, but adds support for slicing and supports single items in the same
functional call. Note that the return type will always be a list, even if a single item is
requested.
Examples:
Getting the items with identifiers "A1" through "E1":
>>> items["A1:E1"]
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
Getting the items with identifiers 0 through 4 (note that this is the same as above):
>>> items[range(5)]
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
Getting items with a slice (note that this is the same as above):
>>> items[0:5]
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
Getting a single item:
>>> items[0]
[<Item A1>]
"""
if isinstance(identifier, str):
if ":" in identifier: # multiple # TODO: deprecate this, use `"A1":"E1"` instead (slice)
return self.get_items(identifier)
return self.get_item(identifier) # single
if isinstance(identifier, int):
return self.get_item(identifier)
if isinstance(identifier, (slice, range)):
start, stop = identifier.start, identifier.stop
if isinstance(identifier.start, str):
start = list(self._ordering.keys()).index(identifier.start)
elif identifier.start is None:
start = 0
if isinstance(identifier.stop, str):
stop = list(self._ordering.keys()).index(identifier.stop)
elif identifier.stop is None:
stop = self.num_items
identifier = list(range(start, stop, identifier.step or 1))
return self.get_items(identifier)
if isinstance(identifier, (list, tuple)):
return self.get_items(identifier)
raise TypeError(f"Invalid identifier type: {type(identifier)}")
def get_item(self, identifier: Union[str, int, Tuple[int, int]]) -> T:
"""Get the item with the given identifier.
Args:
identifier: The identifier of the item. Either a string, an integer, or a tuple. If an
integer, it is the index of the item in the list of items (counted from 0, top to bottom, left
to right). If a string, it uses transposed MS Excel style notation, e.g. "A1" for the first
item, "B1" for the item below that, etc. If a tuple, it is (row, column).
Raises:
IndexError: If the identifier is out of range. The range is 0 to self.num_items-1 (inclusive).
"""
if isinstance(identifier, tuple):
row, column = identifier
identifier = LETTERS[row] + str(column + 1) # standard transposed-Excel style notation
if isinstance(identifier, str):
try:
identifier = list(self._ordering.keys()).index(identifier)
except ValueError as e:
raise IndexError(
f"Item with identifier '{identifier}' does not exist on " f"resource '{self.name}'."
) from e
if not 0 <= identifier < self.capacity:
raise IndexError(
f"Item with identifier '{identifier}' does not exist on " f"resource '{self.name}'."
)
# Cast child to item type. Children will always be `T`, but the type checker doesn't know that.
return self.sites[identifier]
def get_items(self, identifiers: Union[str, Sequence[int], Sequence[str]]) -> List[T]:
"""Get the items with the given identifier.
Args:
identifier: Deprecated. Use `identifiers` instead. # TODO(deprecate-ordered-items)
identifiers: The identifiers of the items. Either a string range or a list of integers. If a
string, it uses transposed MS Excel style notation. Regions of items can be specified using
a colon, e.g. "A1:H1" for the first column. If a list of integers, it is the indices of the
items in the list of items (counted from 0, top to bottom, left to right).
Examples:
Getting the items with identifiers "A1" through "E1":
>>> items.get_items("A1:E1")
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
Getting the items with identifiers 0 through 4:
>>> items.get_items(range(5))
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
"""
if isinstance(identifiers, str):
identifiers = pylabrobot.utils.expand_string_range(identifiers)
return [self.get_item(i) for i in identifiers]
def __setitem__(self, idx: Union[int, str], resource: Optional[ResourcePLR]):
"""Assign a resource to this carrier."""
if resource is None: # setting to None
assigned_resource = self[idx]
if assigned_resource is not None:
self.unassign_child_resource(assigned_resource)
else:
idx = list(self._ordering.keys()).index(idx) if isinstance(idx, str) else idx
self.assign_resource_to_site(resource, spot=idx)
def __delitem__(self, idx: int):
"""Unassign a resource from this carrier."""
assigned_resource = self[idx]
if assigned_resource is not None:
self.unassign_child_resource(assigned_resource)
def get_resources(self) -> List[ResourcePLR]:
"""Get all resources assigned to this carrier."""
return [resource for resource in self.sites.values() if resource is not None]
def __eq__(self, other):
return super().__eq__(other) and self.sites == other.sites
def get_free_sites(self) -> List[int]:
return [spot for spot, resource in self.sites.items() if resource is None]
def serialize(self):
return {
**super().serialize(),
"slots": [{
"label": str(identifier),
"visible": True if self[identifier] is not None else False,
"position": {"x": location.x, "y": location.y, "z": location.z},
"size": {"width": self._size_x, "height": self._size_y, "depth": self._size_z},
"content_type": ["bottle", "container", "tube", "bottle_carrier", "tip_rack"]
} for identifier, location in self.child_locations.items()]
}
class BottleCarrier(ItemizedCarrier):
"""瓶载架 - 直接继承自 TubeCarrier"""
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
sites: Optional[Dict[Union[int, str], ResourceHolder]] = None,
category: str = "bottle_carrier",
model: Optional[str] = None,
):
super().__init__(
name=name,
size_x=size_x,
size_y=size_y,
size_z=size_z,
sites=sites,
category=category,
model=model,
)

View File

@@ -1,11 +1,11 @@
import json
from typing import Optional, List from typing import Optional, List
from pylabrobot.resources import Coordinate, Resource from pylabrobot.resources import Coordinate
from pylabrobot.resources.carrier import Carrier, PlateHolder, ResourceHolder, create_homogeneous_resources from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_resources
from pylabrobot.resources.deck import Deck
from unilabos.resources.itemized_carrier import ItemizedCarrier
class WareHouse(Carrier[ResourceHolder]): class WareHouse(ItemizedCarrier):
"""4x4x1堆栈载体类 - 可容纳16个板位的载体4层x4行x1列""" """4x4x1堆栈载体类 - 可容纳16个板位的载体4层x4行x1列"""
def __init__( def __init__(
@@ -21,6 +21,7 @@ class WareHouse(Carrier[ResourceHolder]):
item_dy: float = 10.0, item_dy: float = 10.0,
item_dz: float = 10.0, item_dz: float = 10.0,
removed_positions: Optional[List[int]] = None, removed_positions: Optional[List[int]] = None,
empty: bool = False,
category: str = "warehouse", category: str = "warehouse",
model: Optional[str] = None, model: Optional[str] = None,
): ):
@@ -44,13 +45,7 @@ class WareHouse(Carrier[ResourceHolder]):
sites = create_homogeneous_resources( sites = create_homogeneous_resources(
klass=ResourceHolder, klass=ResourceHolder,
locations=[ locations=locations,
Coordinate(4.0, 8.5, 86.15),
Coordinate(4.0, 104.5, 86.15),
Coordinate(4.0, 200.5, 86.15),
Coordinate(4.0, 296.5, 86.15),
Coordinate(4.0, 392.5, 86.15),
],
resource_size_x=127.0, resource_size_x=127.0,
resource_size_y=86.0, resource_size_y=86.0,
name_prefix=name, name_prefix=name,
@@ -61,12 +56,14 @@ class WareHouse(Carrier[ResourceHolder]):
size_x=dx + item_dx * num_items_x, size_x=dx + item_dx * num_items_x,
size_y=dy + item_dy * num_items_y, size_y=dy + item_dy * num_items_y,
size_z=dz + item_dz * num_items_z, size_z=dz + item_dz * num_items_z,
# ordered_items=ordered_items,
# ordering=ordering,
sites=sites, sites=sites,
category=category, category=category,
model=model, model=model,
) )
def get_site_by_layer_position(self, row: int, col: int, layer: int) -> PlateHolder: def get_site_by_layer_position(self, row: int, col: int, layer: int) -> ResourceHolder:
if not (0 <= layer < 4 and 0 <= row < 4 and 0 <= col < 1): if not (0 <= layer < 4 and 0 <= row < 4 and 0 <= col < 1):
raise ValueError("无效的位置: layer={}, row={}, col={}".format(layer, row, col)) raise ValueError("无效的位置: layer={}, row={}, col={}".format(layer, row, col))