From 10aabb75921ecda3ef973ecbed4cd7e7072f5429 Mon Sep 17 00:00:00 2001 From: Junhan Chang Date: Mon, 29 Sep 2025 20:36:45 +0800 Subject: [PATCH] refactor: add itemized_carrier instead of carrier consists of ResourceHolder --- unilabos/resources/bioyond/bottle_carriers.py | 68 ++-- unilabos/resources/bioyond/bottles.py | 2 +- unilabos/resources/bottle_carrier.py | 72 ---- unilabos/resources/graphio.py | 7 +- unilabos/resources/itemized_carrier.py | 322 ++++++++++++++++++ unilabos/resources/warehouse.py | 23 +- 6 files changed, 373 insertions(+), 121 deletions(-) delete mode 100644 unilabos/resources/bottle_carrier.py create mode 100644 unilabos/resources/itemized_carrier.py diff --git a/unilabos/resources/bioyond/bottle_carriers.py b/unilabos/resources/bioyond/bottle_carriers.py index cd84944b..9f88b88d 100644 --- a/unilabos/resources/bioyond/bottle_carriers.py +++ b/unilabos/resources/bioyond/bottle_carriers.py @@ -1,6 +1,6 @@ -from pylabrobot.resources import create_homogeneous_resources, Coordinate, ResourceHolder +from pylabrobot.resources import create_homogeneous_resources, Coordinate, ResourceHolder, create_ordered_items_2d -from unilabos.resources.bottle_carrier import Bottle, BottleCarrier +from unilabos.resources.itemized_carrier import Bottle, BottleCarrier from unilabos.resources.bioyond.bottles import BIOYOND_PolymerStation_Solid_Vial, BIOYOND_PolymerStation_Solution_Beaker, BIOYOND_PolymerStation_Reagent_Bottle # 命名约定:试剂瓶-Bottle,烧杯-Beaker,烧瓶-Flask,小瓶-Vial @@ -22,27 +22,29 @@ def BIOYOND_Electrolyte_6VialCarrier(name: str) -> BottleCarrier: start_x = (carrier_size_x - (3 - 1) * bottle_spacing_x - bottle_diameter) / 2 start_y = (carrier_size_y - (2 - 1) * bottle_spacing_y - bottle_diameter) / 2 - # 创建6个位置坐标 (2行 x 3列) - locations = [] - for row in range(2): - for col in range(3): - x = start_x + col * bottle_spacing_x - y = start_y + row * bottle_spacing_y - z = 5.0 # 架位底部 - locations.append(Coordinate(x, y, z)) + sites = create_ordered_items_2d( + klass=ResourceHolder, + num_items_x=3, + num_items_y=2, + dx=start_x, + dy=start_y, + dz=5.0, + item_dx=bottle_spacing_x, + item_dy=bottle_spacing_y, + + size_x=bottle_diameter, + size_y=bottle_diameter, + size_z=carrier_size_z, + ) + for k, v in sites.items(): + v.name = f"{name}_{v.name}" carrier = BottleCarrier( name=name, size_x=carrier_size_x, size_y=carrier_size_y, size_z=carrier_size_z, - sites=create_homogeneous_resources( - klass=ResourceHolder, - locations=locations, - resource_size_x=bottle_diameter, - resource_size_y=bottle_diameter, - name_prefix=name, - ), + sites=sites, model="BIOYOND_Electrolyte_6VialCarrier", ) carrier.num_items_x = 3 @@ -107,27 +109,29 @@ def BIOYOND_PolymerStation_6VialCarrier(name: str) -> BottleCarrier: start_x = (carrier_size_x - (3 - 1) * bottle_spacing_x - bottle_diameter) / 2 start_y = (carrier_size_y - (2 - 1) * bottle_spacing_y - bottle_diameter) / 2 - # 创建6个位置坐标 (2行 x 3列) - locations = [] - for row in range(2): - for col in range(3): - x = start_x + col * bottle_spacing_x - y = start_y + row * bottle_spacing_y - z = 5.0 # 架位底部 - locations.append(Coordinate(x, y, z)) + sites = create_ordered_items_2d( + klass=ResourceHolder, + num_items_x=3, + num_items_y=2, + dx=start_x, + dy=start_y, + dz=5.0, + item_dx=bottle_spacing_x, + item_dy=bottle_spacing_y, + + size_x=bottle_diameter, + size_y=bottle_diameter, + size_z=carrier_size_z, + ) + for k, v in sites.items(): + v.name = f"{name}_{v.name}" carrier = BottleCarrier( name=name, size_x=carrier_size_x, size_y=carrier_size_y, size_z=carrier_size_z, - sites=create_homogeneous_resources( - klass=ResourceHolder, - locations=locations, - resource_size_x=bottle_diameter, - resource_size_y=bottle_diameter, - name_prefix=name, - ), + sites=sites, model="BIOYOND_PolymerStation_6VialCarrier", ) carrier.num_items_x = 3 diff --git a/unilabos/resources/bioyond/bottles.py b/unilabos/resources/bioyond/bottles.py index b3725de9..1afac18c 100644 --- a/unilabos/resources/bioyond/bottles.py +++ b/unilabos/resources/bioyond/bottles.py @@ -1,4 +1,4 @@ -from unilabos.resources.bottle_carrier import Bottle, BottleCarrier +from unilabos.resources.itemized_carrier import Bottle, BottleCarrier # 工厂函数 diff --git a/unilabos/resources/bottle_carrier.py b/unilabos/resources/bottle_carrier.py deleted file mode 100644 index e680b26e..00000000 --- a/unilabos/resources/bottle_carrier.py +++ /dev/null @@ -1,72 +0,0 @@ -""" -自动化液体处理工作站物料类定义 - 简化版 -Automated Liquid Handling Station Resource Classes - Simplified Version -""" - -from __future__ import annotations - -from typing import Dict, Optional - -from pylabrobot.resources.coordinate import Coordinate -from pylabrobot.resources.container import Container -from pylabrobot.resources.carrier import TubeCarrier -from pylabrobot.resources.resource_holder import ResourceHolder - - -class Bottle(Container): - """瓶子类 - 简化版,不追踪瓶盖""" - - def __init__( - self, - name: str, - diameter: float, - height: float, - max_volume: float, - barcode: Optional[str] = "", - category: str = "container", - model: Optional[str] = None, - ): - super().__init__( - name=name, - size_x=diameter, - size_y=diameter, - size_z=height, - max_volume=max_volume, - category=category, - model=model, - ) - self.diameter = diameter - self.height = height - self.barcode = barcode - - def serialize(self) -> dict: - return { - **super().serialize(), - "diameter": self.diameter, - "height": self.height, - "barcode": self.barcode, - } - - -class BottleCarrier(TubeCarrier): - """瓶载架 - 直接继承自 TubeCarrier""" - - def __init__( - self, - name: str, - size_x: float, - size_y: float, - size_z: float, - sites: Optional[Dict[int, ResourceHolder]] = None, - category: str = "bottle_carrier", - model: Optional[str] = None, - ): - super().__init__( - name=name, - size_x=size_x, - size_y=size_y, - size_z=size_z, - sites=sites, - category=category, - model=model, - ) diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 4bc611a2..0c55127a 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -4,6 +4,7 @@ import json from typing import Union, Any, Dict import numpy as np import networkx as nx +from pylabrobot.resources import ResourceHolder from unilabos_msgs.msg import Resource from unilabos.resources.container import RegularContainer @@ -507,7 +508,7 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: dict = number = (detail.get("z", 0) - 1) * plr_material.num_items_x * plr_material.num_items_y + \ (detail.get("x", 0) - 1) * plr_material.num_items_x + \ (detail.get("y", 0) - 1) - bottle = plr_material[number].resource + bottle = plr_material[number] bottle.code = detail.get("code", "") bottle.tracker.liquids = [(detail["name"], float(detail.get("quantity", 0)) if detail.get("quantity") else 0)] @@ -520,8 +521,8 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: dict = idx = (loc.get("y", 0) - 1) * warehouse.num_items_x * warehouse.num_items_y + \ (loc.get("x", 0) - 1) * warehouse.num_items_x + \ (loc.get("z", 0) - 1) - if 0 <= idx < warehouse.num_items_x * warehouse.num_items_y * warehouse.num_items_z: - if warehouse[idx].resource is None: + if 0 <= idx < warehouse.capacity: + if warehouse[idx] is None or isinstance(warehouse[idx], ResourceHolder): warehouse[idx] = plr_material return plr_materials diff --git a/unilabos/resources/itemized_carrier.py b/unilabos/resources/itemized_carrier.py new file mode 100644 index 00000000..77191307 --- /dev/null +++ b/unilabos/resources/itemized_carrier.py @@ -0,0 +1,322 @@ +""" +自动化液体处理工作站物料类定义 - 简化版 +Automated Liquid Handling Station Resource Classes - Simplified Version +""" + +from __future__ import annotations + +from typing import Dict, Optional + +from pylabrobot.resources.coordinate import Coordinate +from pylabrobot.resources.container import Container +from pylabrobot.resources.resource_holder import ResourceHolder +from pylabrobot.resources import Resource as ResourcePLR + + +class Bottle(Container): + """瓶子类 - 简化版,不追踪瓶盖""" + + def __init__( + self, + name: str, + diameter: float, + height: float, + max_volume: float, + size_x: float = 0.0, + size_y: float = 0.0, + size_z: float = 0.0, + barcode: Optional[str] = "", + category: str = "container", + model: Optional[str] = None, + ): + super().__init__( + name=name, + size_x=diameter, + size_y=diameter, + size_z=height, + max_volume=max_volume, + category=category, + model=model, + ) + self.diameter = diameter + self.height = height + self.barcode = barcode + + def serialize(self) -> dict: + return { + **super().serialize(), + "diameter": self.diameter, + "height": self.height, + "barcode": self.barcode, + } + + +from string import ascii_uppercase as LETTERS +from typing import Dict, List, Optional, Type, TypeVar, Union, Sequence, Tuple + +import pylabrobot +from pylabrobot.resources.resource_holder import ResourceHolder + +T = TypeVar("T", bound=ResourceHolder) + +S = TypeVar("S", bound=ResourceHolder) + + +class ItemizedCarrier(ResourcePLR): + """Base class for all carriers.""" + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + sites: Optional[Dict[Union[int, str], Optional[ResourcePLR]]] = None, + category: Optional[str] = "carrier", + model: Optional[str] = None, + ): + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + category=category, + model=model, + ) + sites = sites or {} + self.sites: List[Optional[ResourcePLR]] = list(sites.values()) + self._ordering = sites + self.num_items = len(self.sites) + self.child_locations: Dict[str, Coordinate] = {} + for spot, resource in sites.items(): + if resource is not None and getattr(resource, "location", None) is None: + raise ValueError(f"resource {resource} has no location") + if resource is not None: + self.child_locations[spot] = resource.location + else: + self.child_locations[spot] = Coordinate.zero() + + @property + def capacity(self): + """The number of sites on this carrier.""" + return len(self.sites) + + def __len__(self) -> int: + """Return the number of sites on this carrier.""" + return len(self.sites) + + def assign_child_resource( + self, + resource: ResourcePLR, + location: Optional[Coordinate], + reassign: bool = True, + spot: Optional[int] = None, + ): + idx = spot if spot is not None else len(self.sites) + if not reassign and self.sites[idx] is not None: + raise ValueError(f"a site with index {idx} already exists") + super().assign_child_resource(resource, location=location, reassign=reassign) + self.sites[idx] = resource + + def assign_resource_to_site(self, resource: ResourcePLR, spot: int): + if self.sites[spot] is not None and not isinstance(self.sites[spot], ResourceHolder): + raise ValueError(f"spot {spot} already has a resource, {resource}") + self.assign_child_resource(resource, location=self.child_locations.get(str(spot)), spot=spot) + + def unassign_child_resource(self, resource: ResourcePLR): + found = False + for spot, res in enumerate(self.sites): + if res == resource: + self.sites[spot] = None + found = True + break + if not found: + raise ValueError(f"Resource {resource} is not assigned to this carrier") + if hasattr(resource, "unassign"): + resource.unassign() + + def __getitem__( + self, + identifier: Union[str, int, Sequence[int], Sequence[str], slice, range], + ) -> Union[List[T], T]: + """Get the items with the given identifier. + + This is a convenience method for getting the items with the given identifier. It is equivalent + to :meth:`get_items`, but adds support for slicing and supports single items in the same + functional call. Note that the return type will always be a list, even if a single item is + requested. + + Examples: + Getting the items with identifiers "A1" through "E1": + + >>> items["A1:E1"] + + [, , , , ] + + Getting the items with identifiers 0 through 4 (note that this is the same as above): + + >>> items[range(5)] + + [, , , , ] + + Getting items with a slice (note that this is the same as above): + + >>> items[0:5] + + [, , , , ] + + Getting a single item: + + >>> items[0] + + [] + """ + + if isinstance(identifier, str): + if ":" in identifier: # multiple # TODO: deprecate this, use `"A1":"E1"` instead (slice) + return self.get_items(identifier) + + return self.get_item(identifier) # single + + if isinstance(identifier, int): + return self.get_item(identifier) + + if isinstance(identifier, (slice, range)): + start, stop = identifier.start, identifier.stop + if isinstance(identifier.start, str): + start = list(self._ordering.keys()).index(identifier.start) + elif identifier.start is None: + start = 0 + if isinstance(identifier.stop, str): + stop = list(self._ordering.keys()).index(identifier.stop) + elif identifier.stop is None: + stop = self.num_items + identifier = list(range(start, stop, identifier.step or 1)) + return self.get_items(identifier) + + if isinstance(identifier, (list, tuple)): + return self.get_items(identifier) + + raise TypeError(f"Invalid identifier type: {type(identifier)}") + + def get_item(self, identifier: Union[str, int, Tuple[int, int]]) -> T: + """Get the item with the given identifier. + + Args: + identifier: The identifier of the item. Either a string, an integer, or a tuple. If an + integer, it is the index of the item in the list of items (counted from 0, top to bottom, left + to right). If a string, it uses transposed MS Excel style notation, e.g. "A1" for the first + item, "B1" for the item below that, etc. If a tuple, it is (row, column). + + Raises: + IndexError: If the identifier is out of range. The range is 0 to self.num_items-1 (inclusive). + """ + + if isinstance(identifier, tuple): + row, column = identifier + identifier = LETTERS[row] + str(column + 1) # standard transposed-Excel style notation + if isinstance(identifier, str): + try: + identifier = list(self._ordering.keys()).index(identifier) + except ValueError as e: + raise IndexError( + f"Item with identifier '{identifier}' does not exist on " f"resource '{self.name}'." + ) from e + + if not 0 <= identifier < self.capacity: + raise IndexError( + f"Item with identifier '{identifier}' does not exist on " f"resource '{self.name}'." + ) + + # Cast child to item type. Children will always be `T`, but the type checker doesn't know that. + return self.sites[identifier] + + def get_items(self, identifiers: Union[str, Sequence[int], Sequence[str]]) -> List[T]: + """Get the items with the given identifier. + + Args: + identifier: Deprecated. Use `identifiers` instead. # TODO(deprecate-ordered-items) + identifiers: The identifiers of the items. Either a string range or a list of integers. If a + string, it uses transposed MS Excel style notation. Regions of items can be specified using + a colon, e.g. "A1:H1" for the first column. If a list of integers, it is the indices of the + items in the list of items (counted from 0, top to bottom, left to right). + + Examples: + Getting the items with identifiers "A1" through "E1": + + >>> items.get_items("A1:E1") + + [, , , , ] + + Getting the items with identifiers 0 through 4: + + >>> items.get_items(range(5)) + + [, , , , ] + """ + + if isinstance(identifiers, str): + identifiers = pylabrobot.utils.expand_string_range(identifiers) + return [self.get_item(i) for i in identifiers] + + def __setitem__(self, idx: Union[int, str], resource: Optional[ResourcePLR]): + """Assign a resource to this carrier.""" + if resource is None: # setting to None + assigned_resource = self[idx] + if assigned_resource is not None: + self.unassign_child_resource(assigned_resource) + else: + idx = list(self._ordering.keys()).index(idx) if isinstance(idx, str) else idx + self.assign_resource_to_site(resource, spot=idx) + + def __delitem__(self, idx: int): + """Unassign a resource from this carrier.""" + assigned_resource = self[idx] + if assigned_resource is not None: + self.unassign_child_resource(assigned_resource) + + def get_resources(self) -> List[ResourcePLR]: + """Get all resources assigned to this carrier.""" + return [resource for resource in self.sites.values() if resource is not None] + + def __eq__(self, other): + return super().__eq__(other) and self.sites == other.sites + + def get_free_sites(self) -> List[int]: + return [spot for spot, resource in self.sites.items() if resource is None] + + def serialize(self): + return { + **super().serialize(), + "slots": [{ + "label": str(identifier), + "visible": True if self[identifier] is not None else False, + "position": {"x": location.x, "y": location.y, "z": location.z}, + "size": {"width": self._size_x, "height": self._size_y, "depth": self._size_z}, + "content_type": ["bottle", "container", "tube", "bottle_carrier", "tip_rack"] + } for identifier, location in self.child_locations.items()] + } + + +class BottleCarrier(ItemizedCarrier): + """瓶载架 - 直接继承自 TubeCarrier""" + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + sites: Optional[Dict[Union[int, str], ResourceHolder]] = None, + category: str = "bottle_carrier", + model: Optional[str] = None, + ): + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + sites=sites, + category=category, + model=model, + ) diff --git a/unilabos/resources/warehouse.py b/unilabos/resources/warehouse.py index 11b945ed..87424b91 100644 --- a/unilabos/resources/warehouse.py +++ b/unilabos/resources/warehouse.py @@ -1,11 +1,11 @@ -import json from typing import Optional, List -from pylabrobot.resources import Coordinate, Resource -from pylabrobot.resources.carrier import Carrier, PlateHolder, ResourceHolder, create_homogeneous_resources -from pylabrobot.resources.deck import Deck +from pylabrobot.resources import Coordinate +from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_resources + +from unilabos.resources.itemized_carrier import ItemizedCarrier -class WareHouse(Carrier[ResourceHolder]): +class WareHouse(ItemizedCarrier): """4x4x1堆栈载体类 - 可容纳16个板位的载体(4层x4行x1列)""" def __init__( @@ -21,6 +21,7 @@ class WareHouse(Carrier[ResourceHolder]): item_dy: float = 10.0, item_dz: float = 10.0, removed_positions: Optional[List[int]] = None, + empty: bool = False, category: str = "warehouse", model: Optional[str] = None, ): @@ -44,13 +45,7 @@ class WareHouse(Carrier[ResourceHolder]): sites = create_homogeneous_resources( klass=ResourceHolder, - locations=[ - Coordinate(4.0, 8.5, 86.15), - Coordinate(4.0, 104.5, 86.15), - Coordinate(4.0, 200.5, 86.15), - Coordinate(4.0, 296.5, 86.15), - Coordinate(4.0, 392.5, 86.15), - ], + locations=locations, resource_size_x=127.0, resource_size_y=86.0, name_prefix=name, @@ -61,12 +56,14 @@ class WareHouse(Carrier[ResourceHolder]): size_x=dx + item_dx * num_items_x, size_y=dy + item_dy * num_items_y, size_z=dz + item_dz * num_items_z, + # ordered_items=ordered_items, + # ordering=ordering, sites=sites, category=category, model=model, ) - def get_site_by_layer_position(self, row: int, col: int, layer: int) -> PlateHolder: + def get_site_by_layer_position(self, row: int, col: int, layer: int) -> ResourceHolder: if not (0 <= layer < 4 and 0 <= row < 4 and 0 <= col < 1): raise ValueError("无效的位置: layer={}, row={}, col={}".format(layer, row, col))