mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-18 21:41:16 +00:00
Workstation templates: Resources and its CRUD, and workstation tasks (#95)
* coin_cell_station draft * refactor: rename "station_resource" to "deck" * add standardized BIOYOND resources: bottle_carrier, bottle * refactor and add BIOYOND resources tests * add BIOYOND deck assignment and pass all tests * fix: update resource with correct structure; remove deprecated liquid_handler set_group action * feat: 将新威电池测试系统驱动与配置文件并入 workstation_dev_YB2 (#92) * feat: 新威电池测试系统驱动与注册文件 * feat: bring neware driver & battery.json into workstation_dev_YB2 * add bioyond studio draft * bioyond station with communication init and resource sync * fix bioyond station and registry * create/update resources with POST/PUT for big amount/ small amount data * refactor: add itemized_carrier instead of carrier consists of ResourceHolder * create warehouse by factory func * update bioyond launch json * add child_size for itemized_carrier * fix bioyond resource io --------- Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com> Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com>
This commit is contained in:
0
unilabos/resources/bioyond/__init__.py
Normal file
0
unilabos/resources/bioyond/__init__.py
Normal file
217
unilabos/resources/bioyond/bottle_carriers.py
Normal file
217
unilabos/resources/bioyond/bottle_carriers.py
Normal file
@@ -0,0 +1,217 @@
|
||||
from pylabrobot.resources import create_homogeneous_resources, Coordinate, ResourceHolder, create_ordered_items_2d
|
||||
|
||||
from unilabos.resources.itemized_carrier import Bottle, BottleCarrier
|
||||
from unilabos.resources.bioyond.bottles import BIOYOND_PolymerStation_Solid_Vial, BIOYOND_PolymerStation_Solution_Beaker, BIOYOND_PolymerStation_Reagent_Bottle
|
||||
# 命名约定:试剂瓶-Bottle,烧杯-Beaker,烧瓶-Flask,小瓶-Vial
|
||||
|
||||
|
||||
def BIOYOND_Electrolyte_6VialCarrier(name: str) -> BottleCarrier:
|
||||
"""6瓶载架 - 2x3布局"""
|
||||
|
||||
# 载架尺寸 (mm)
|
||||
carrier_size_x = 127.8
|
||||
carrier_size_y = 85.5
|
||||
carrier_size_z = 50.0
|
||||
|
||||
# 瓶位尺寸
|
||||
bottle_diameter = 30.0
|
||||
bottle_spacing_x = 42.0 # X方向间距
|
||||
bottle_spacing_y = 35.0 # Y方向间距
|
||||
|
||||
# 计算起始位置 (居中排列)
|
||||
start_x = (carrier_size_x - (3 - 1) * bottle_spacing_x - bottle_diameter) / 2
|
||||
start_y = (carrier_size_y - (2 - 1) * bottle_spacing_y - bottle_diameter) / 2
|
||||
|
||||
sites = create_ordered_items_2d(
|
||||
klass=ResourceHolder,
|
||||
num_items_x=3,
|
||||
num_items_y=2,
|
||||
dx=start_x,
|
||||
dy=start_y,
|
||||
dz=5.0,
|
||||
item_dx=bottle_spacing_x,
|
||||
item_dy=bottle_spacing_y,
|
||||
|
||||
size_x=bottle_diameter,
|
||||
size_y=bottle_diameter,
|
||||
size_z=carrier_size_z,
|
||||
)
|
||||
for k, v in sites.items():
|
||||
v.name = f"{name}_{v.name}"
|
||||
|
||||
carrier = BottleCarrier(
|
||||
name=name,
|
||||
size_x=carrier_size_x,
|
||||
size_y=carrier_size_y,
|
||||
size_z=carrier_size_z,
|
||||
sites=sites,
|
||||
model="BIOYOND_Electrolyte_6VialCarrier",
|
||||
)
|
||||
carrier.num_items_x = 3
|
||||
carrier.num_items_y = 2
|
||||
carrier.num_items_z = 1
|
||||
for i in range(6):
|
||||
carrier[i] = BIOYOND_PolymerStation_Solid_Vial(f"{name}_vial_{i+1}")
|
||||
return carrier
|
||||
|
||||
|
||||
def BIOYOND_Electrolyte_1BottleCarrier(name: str) -> BottleCarrier:
|
||||
"""1瓶载架 - 单个中央位置"""
|
||||
|
||||
# 载架尺寸 (mm)
|
||||
carrier_size_x = 127.8
|
||||
carrier_size_y = 85.5
|
||||
carrier_size_z = 100.0
|
||||
|
||||
# 烧杯尺寸
|
||||
beaker_diameter = 80.0
|
||||
|
||||
# 计算中央位置
|
||||
center_x = (carrier_size_x - beaker_diameter) / 2
|
||||
center_y = (carrier_size_y - beaker_diameter) / 2
|
||||
center_z = 5.0
|
||||
|
||||
carrier = BottleCarrier(
|
||||
name=name,
|
||||
size_x=carrier_size_x,
|
||||
size_y=carrier_size_y,
|
||||
size_z=carrier_size_z,
|
||||
sites=create_homogeneous_resources(
|
||||
klass=ResourceHolder,
|
||||
locations=[Coordinate(center_x, center_y, center_z)],
|
||||
resource_size_x=beaker_diameter,
|
||||
resource_size_y=beaker_diameter,
|
||||
name_prefix=name,
|
||||
),
|
||||
model="BIOYOND_Electrolyte_1BottleCarrier",
|
||||
)
|
||||
carrier.num_items_x = 1
|
||||
carrier.num_items_y = 1
|
||||
carrier.num_items_z = 1
|
||||
carrier[0] = BIOYOND_PolymerStation_Solution_Beaker(f"{name}_beaker_1")
|
||||
return carrier
|
||||
|
||||
|
||||
def BIOYOND_PolymerStation_6VialCarrier(name: str) -> BottleCarrier:
|
||||
"""6瓶载架 - 2x3布局"""
|
||||
|
||||
# 载架尺寸 (mm)
|
||||
carrier_size_x = 127.8
|
||||
carrier_size_y = 85.5
|
||||
carrier_size_z = 50.0
|
||||
|
||||
# 瓶位尺寸
|
||||
bottle_diameter = 30.0
|
||||
bottle_spacing_x = 42.0 # X方向间距
|
||||
bottle_spacing_y = 35.0 # Y方向间距
|
||||
|
||||
# 计算起始位置 (居中排列)
|
||||
start_x = (carrier_size_x - (3 - 1) * bottle_spacing_x - bottle_diameter) / 2
|
||||
start_y = (carrier_size_y - (2 - 1) * bottle_spacing_y - bottle_diameter) / 2
|
||||
|
||||
sites = create_ordered_items_2d(
|
||||
klass=ResourceHolder,
|
||||
num_items_x=3,
|
||||
num_items_y=2,
|
||||
dx=start_x,
|
||||
dy=start_y,
|
||||
dz=5.0,
|
||||
item_dx=bottle_spacing_x,
|
||||
item_dy=bottle_spacing_y,
|
||||
|
||||
size_x=bottle_diameter,
|
||||
size_y=bottle_diameter,
|
||||
size_z=carrier_size_z,
|
||||
)
|
||||
for k, v in sites.items():
|
||||
v.name = f"{name}_{v.name}"
|
||||
|
||||
carrier = BottleCarrier(
|
||||
name=name,
|
||||
size_x=carrier_size_x,
|
||||
size_y=carrier_size_y,
|
||||
size_z=carrier_size_z,
|
||||
sites=sites,
|
||||
model="BIOYOND_PolymerStation_6VialCarrier",
|
||||
)
|
||||
carrier.num_items_x = 3
|
||||
carrier.num_items_y = 2
|
||||
carrier.num_items_z = 1
|
||||
ordering = ["A1", "A2", "A3", "B1", "B2", "B3"] # 自定义顺序
|
||||
for i in range(6):
|
||||
carrier[i] = BIOYOND_PolymerStation_Solid_Vial(f"{name}_vial_{ordering[i]}")
|
||||
return carrier
|
||||
|
||||
|
||||
def BIOYOND_PolymerStation_1BottleCarrier(name: str) -> BottleCarrier:
|
||||
"""1瓶载架 - 单个中央位置"""
|
||||
|
||||
# 载架尺寸 (mm)
|
||||
carrier_size_x = 127.8
|
||||
carrier_size_y = 85.5
|
||||
carrier_size_z = 20.0
|
||||
|
||||
# 烧杯尺寸
|
||||
beaker_diameter = 60.0
|
||||
|
||||
# 计算中央位置
|
||||
center_x = (carrier_size_x - beaker_diameter) / 2
|
||||
center_y = (carrier_size_y - beaker_diameter) / 2
|
||||
center_z = 5.0
|
||||
|
||||
carrier = BottleCarrier(
|
||||
name=name,
|
||||
size_x=carrier_size_x,
|
||||
size_y=carrier_size_y,
|
||||
size_z=carrier_size_z,
|
||||
sites=create_homogeneous_resources(
|
||||
klass=ResourceHolder,
|
||||
locations=[Coordinate(center_x, center_y, center_z)],
|
||||
resource_size_x=beaker_diameter,
|
||||
resource_size_y=beaker_diameter,
|
||||
name_prefix=name,
|
||||
),
|
||||
model="BIOYOND_PolymerStation_1BottleCarrier",
|
||||
)
|
||||
carrier.num_items_x = 1
|
||||
carrier.num_items_y = 1
|
||||
carrier.num_items_z = 1
|
||||
carrier[0] = BIOYOND_PolymerStation_Reagent_Bottle(f"{name}_flask_1")
|
||||
return carrier
|
||||
|
||||
|
||||
def BIOYOND_PolymerStation_1FlaskCarrier(name: str) -> BottleCarrier:
|
||||
"""1瓶载架 - 单个中央位置"""
|
||||
|
||||
# 载架尺寸 (mm)
|
||||
carrier_size_x = 127.8
|
||||
carrier_size_y = 85.5
|
||||
carrier_size_z = 20.0
|
||||
|
||||
# 烧杯尺寸
|
||||
beaker_diameter = 70.0
|
||||
|
||||
# 计算中央位置
|
||||
center_x = (carrier_size_x - beaker_diameter) / 2
|
||||
center_y = (carrier_size_y - beaker_diameter) / 2
|
||||
center_z = 5.0
|
||||
|
||||
carrier = BottleCarrier(
|
||||
name=name,
|
||||
size_x=carrier_size_x,
|
||||
size_y=carrier_size_y,
|
||||
size_z=carrier_size_z,
|
||||
sites=create_homogeneous_resources(
|
||||
klass=ResourceHolder,
|
||||
locations=[Coordinate(center_x, center_y, center_z)],
|
||||
resource_size_x=beaker_diameter,
|
||||
resource_size_y=beaker_diameter,
|
||||
name_prefix=name,
|
||||
),
|
||||
model="BIOYOND_PolymerStation_1FlaskCarrier",
|
||||
)
|
||||
carrier.num_items_x = 1
|
||||
carrier.num_items_y = 1
|
||||
carrier.num_items_z = 1
|
||||
carrier[0] = BIOYOND_PolymerStation_Reagent_Bottle(f"{name}_bottle_1")
|
||||
return carrier
|
||||
56
unilabos/resources/bioyond/bottles.py
Normal file
56
unilabos/resources/bioyond/bottles.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from unilabos.resources.itemized_carrier import Bottle, BottleCarrier
|
||||
# 工厂函数
|
||||
|
||||
|
||||
def BIOYOND_PolymerStation_Solid_Vial(
|
||||
name: str,
|
||||
diameter: float = 20.0,
|
||||
height: float = 100.0,
|
||||
max_volume: float = 30000.0, # 30mL
|
||||
barcode: str = None,
|
||||
) -> Bottle:
|
||||
"""创建粉末瓶"""
|
||||
return Bottle(
|
||||
name=name,
|
||||
diameter=diameter,
|
||||
height=height,
|
||||
max_volume=max_volume,
|
||||
barcode=barcode,
|
||||
model="BIOYOND_PolymerStation_Solid_Vial",
|
||||
)
|
||||
|
||||
|
||||
def BIOYOND_PolymerStation_Solution_Beaker(
|
||||
name: str,
|
||||
diameter: float = 60.0,
|
||||
height: float = 70.0,
|
||||
max_volume: float = 200000.0, # 200mL
|
||||
barcode: str = None,
|
||||
) -> Bottle:
|
||||
"""创建溶液烧杯"""
|
||||
return Bottle(
|
||||
name=name,
|
||||
diameter=diameter,
|
||||
height=height,
|
||||
max_volume=max_volume,
|
||||
barcode=barcode,
|
||||
model="BIOYOND_PolymerStation_Solution_Beaker",
|
||||
)
|
||||
|
||||
|
||||
def BIOYOND_PolymerStation_Reagent_Bottle(
|
||||
name: str,
|
||||
diameter: float = 70.0,
|
||||
height: float = 120.0,
|
||||
max_volume: float = 500000.0, # 500mL
|
||||
barcode: str = None,
|
||||
) -> Bottle:
|
||||
"""创建试剂瓶"""
|
||||
return Bottle(
|
||||
name=name,
|
||||
diameter=diameter,
|
||||
height=height,
|
||||
max_volume=max_volume,
|
||||
barcode=barcode,
|
||||
model="BIOYOND_PolymerStation_Reagent_Bottle",
|
||||
)
|
||||
68
unilabos/resources/bioyond/decks.py
Normal file
68
unilabos/resources/bioyond/decks.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from pylabrobot.resources import Deck, Coordinate, Rotation
|
||||
|
||||
from unilabos.resources.bioyond.warehouses import bioyond_warehouse_1x4x4, bioyond_warehouse_1x4x2, bioyond_warehouse_liquid_and_lid_handling
|
||||
|
||||
|
||||
class BIOYOND_PolymerReactionStation_Deck(Deck):
|
||||
def __init__(
|
||||
self,
|
||||
name: str = "PolymerReactionStation_Deck",
|
||||
size_x: float = 2700.0,
|
||||
size_y: float = 1080.0,
|
||||
size_z: float = 1500.0,
|
||||
category: str = "deck",
|
||||
setup: bool = False
|
||||
) -> None:
|
||||
super().__init__(name=name, size_x=2700.0, size_y=1080.0, size_z=1500.0)
|
||||
if setup:
|
||||
self.setup()
|
||||
|
||||
def setup(self) -> None:
|
||||
# 添加仓库
|
||||
self.warehouses = {
|
||||
"堆栈1": bioyond_warehouse_1x4x4("堆栈1"),
|
||||
"堆栈2": bioyond_warehouse_1x4x4("堆栈2"),
|
||||
"站内试剂存放堆栈": bioyond_warehouse_liquid_and_lid_handling("站内试剂存放堆栈"),
|
||||
}
|
||||
self.warehouse_locations = {
|
||||
"堆栈1": Coordinate(0.0, 430.0, 0.0),
|
||||
"堆栈2": Coordinate(2550.0, 430.0, 0.0),
|
||||
"站内试剂存放堆栈": Coordinate(800.0, 475.0, 0.0),
|
||||
}
|
||||
self.warehouses["站内试剂存放堆栈"].rotation = Rotation(z=90)
|
||||
|
||||
for warehouse_name, warehouse in self.warehouses.items():
|
||||
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
|
||||
|
||||
|
||||
class BIOYOND_PolymerPreparationStation_Deck(Deck):
|
||||
def __init__(
|
||||
self,
|
||||
name: str = "PolymerPreparationStation_Deck",
|
||||
size_x: float = 2700.0,
|
||||
size_y: float = 1080.0,
|
||||
size_z: float = 1500.0,
|
||||
category: str = "deck",
|
||||
setup: bool = False
|
||||
) -> None:
|
||||
super().__init__(name=name, size_x=2700.0, size_y=1080.0, size_z=1500.0)
|
||||
if setup:
|
||||
self.setup()
|
||||
|
||||
def setup(self) -> None:
|
||||
# 添加仓库
|
||||
self.warehouses = {
|
||||
"io_warehouse_left": bioyond_warehouse_1x4x4("io_warehouse_left"),
|
||||
"io_warehouse_right": bioyond_warehouse_1x4x4("io_warehouse_right"),
|
||||
"solutions": bioyond_warehouse_1x4x2("warehouse_solutions"),
|
||||
"liquid_and_lid_handling": bioyond_warehouse_liquid_and_lid_handling("warehouse_liquid_and_lid_handling"),
|
||||
}
|
||||
self.warehouse_locations = {
|
||||
"io_warehouse_left": Coordinate(0.0, 650.0, 0.0),
|
||||
"io_warehouse_right": Coordinate(2550.0, 650.0, 0.0),
|
||||
"solutions": Coordinate(1915.0, 900.0, 0.0),
|
||||
"liquid_and_lid_handling": Coordinate(1330.0, 490.0, 0.0),
|
||||
}
|
||||
|
||||
for warehouse_name, warehouse in self.warehouses.items():
|
||||
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
|
||||
54
unilabos/resources/bioyond/warehouses.py
Normal file
54
unilabos/resources/bioyond/warehouses.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from unilabos.resources.warehouse import WareHouse, warehouse_factory
|
||||
|
||||
|
||||
def bioyond_warehouse_1x4x4(name: str) -> WareHouse:
|
||||
"""创建BioYond 4x1x4仓库"""
|
||||
return warehouse_factory(
|
||||
name=name,
|
||||
num_items_x=1,
|
||||
num_items_y=4,
|
||||
num_items_z=4,
|
||||
dx=10.0,
|
||||
dy=10.0,
|
||||
dz=10.0,
|
||||
item_dx=137.0,
|
||||
item_dy=96.0,
|
||||
item_dz=120.0,
|
||||
category="warehouse",
|
||||
)
|
||||
|
||||
|
||||
def bioyond_warehouse_1x4x2(name: str) -> WareHouse:
|
||||
"""创建BioYond 4x1x2仓库"""
|
||||
return warehouse_factory(
|
||||
name=name,
|
||||
num_items_x=1,
|
||||
num_items_y=4,
|
||||
num_items_z=2,
|
||||
dx=10.0,
|
||||
dy=10.0,
|
||||
dz=10.0,
|
||||
item_dx=137.0,
|
||||
item_dy=96.0,
|
||||
item_dz=120.0,
|
||||
category="warehouse",
|
||||
removed_positions=None
|
||||
)
|
||||
|
||||
|
||||
def bioyond_warehouse_liquid_and_lid_handling(name: str) -> WareHouse:
|
||||
"""创建BioYond开关盖加液模块台面"""
|
||||
return warehouse_factory(
|
||||
name=name,
|
||||
num_items_x=2,
|
||||
num_items_y=5,
|
||||
num_items_z=1,
|
||||
dx=10.0,
|
||||
dy=10.0,
|
||||
dz=10.0,
|
||||
item_dx=137.0,
|
||||
item_dy=96.0,
|
||||
item_dz=120.0,
|
||||
category="warehouse",
|
||||
removed_positions=None
|
||||
)
|
||||
@@ -4,6 +4,7 @@ import json
|
||||
from typing import Union, Any, Dict
|
||||
import numpy as np
|
||||
import networkx as nx
|
||||
from pylabrobot.resources import ResourceHolder
|
||||
from unilabos_msgs.msg import Resource
|
||||
|
||||
from unilabos.resources.container import RegularContainer
|
||||
@@ -480,7 +481,54 @@ def resource_plr_to_ulab(resource_plr: "ResourcePLR", parent_name: str = None, w
|
||||
return r
|
||||
|
||||
|
||||
def initialize_resource(resource_config: dict) -> list[dict]:
|
||||
def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: dict = {}, deck: Any = None) -> list[dict]:
|
||||
"""
|
||||
将 bioyond 物料格式转换为 ulab 物料格式
|
||||
|
||||
Args:
|
||||
bioyond_materials: bioyond 系统的物料查询结果列表
|
||||
type_mapping: 物料类型映射字典,格式 {bioyond_type: plr_class_name}
|
||||
location_id_mapping: 库位 ID 到名称的映射字典,格式 {location_id: location_name}
|
||||
|
||||
Returns:
|
||||
pylabrobot 格式的物料列表
|
||||
"""
|
||||
plr_materials = []
|
||||
|
||||
for material in bioyond_materials:
|
||||
className = type_mapping.get(material.get("typeName"), "RegularContainer") if type_mapping else "RegularContainer"
|
||||
|
||||
plr_material: ResourcePLR = initialize_resource({"name": material["name"], "class": className}, resource_type=ResourcePLR)
|
||||
plr_material.code = material.get("code", "") and material.get("barCode", "") or ""
|
||||
|
||||
# 处理子物料(detail)
|
||||
if material.get("detail") and len(material["detail"]) > 0:
|
||||
child_ids = []
|
||||
for detail in material["detail"]:
|
||||
number = (detail.get("z", 0) - 1) * plr_material.num_items_x * plr_material.num_items_y + \
|
||||
(detail.get("x", 0) - 1) * plr_material.num_items_x + \
|
||||
(detail.get("y", 0) - 1)
|
||||
bottle = plr_material[number]
|
||||
bottle.code = detail.get("code", "")
|
||||
bottle.tracker.liquids = [(detail["name"], float(detail.get("quantity", 0)) if detail.get("quantity") else 0)]
|
||||
|
||||
plr_materials.append(plr_material)
|
||||
|
||||
if deck and hasattr(deck, "warehouses"):
|
||||
for loc in material.get("locations", []):
|
||||
if hasattr(deck, "warehouses") and loc.get("whName") in deck.warehouses:
|
||||
warehouse = deck.warehouses[loc["whName"]]
|
||||
idx = (loc.get("y", 0) - 1) * warehouse.num_items_x * warehouse.num_items_y + \
|
||||
(loc.get("x", 0) - 1) * warehouse.num_items_x + \
|
||||
(loc.get("z", 0) - 1)
|
||||
if 0 <= idx < warehouse.capacity:
|
||||
if warehouse[idx] is None or isinstance(warehouse[idx], ResourceHolder):
|
||||
warehouse[idx] = plr_material
|
||||
|
||||
return plr_materials
|
||||
|
||||
|
||||
def initialize_resource(resource_config: dict, resource_type: Any = None) -> Union[list[dict], ResourcePLR]:
|
||||
"""Initializes a resource based on its configuration.
|
||||
|
||||
If the config is detailed, then do nothing;
|
||||
@@ -512,11 +560,14 @@ def initialize_resource(resource_config: dict) -> list[dict]:
|
||||
|
||||
if resource_class_config["type"] == "pylabrobot":
|
||||
resource_plr = RESOURCE(name=resource_config["name"])
|
||||
r = resource_plr_to_ulab(resource_plr=resource_plr, parent_name=resource_config.get("parent", None))
|
||||
# r = resource_plr_to_ulab(resource_plr=resource_plr)
|
||||
if resource_config.get("position") is not None:
|
||||
r["position"] = resource_config["position"]
|
||||
r = tree_to_list([r])
|
||||
if resource_type != ResourcePLR:
|
||||
r = resource_plr_to_ulab(resource_plr=resource_plr, parent_name=resource_config.get("parent", None))
|
||||
# r = resource_plr_to_ulab(resource_plr=resource_plr)
|
||||
if resource_config.get("position") is not None:
|
||||
r["position"] = resource_config["position"]
|
||||
r = tree_to_list([r])
|
||||
else:
|
||||
r = resource_plr
|
||||
elif resource_class_config["type"] == "unilabos":
|
||||
res_instance: RegularContainer = RESOURCE(id=resource_config["name"])
|
||||
res_instance.ulr_resource = convert_to_ros_msg(Resource, {k:v for k,v in resource_config.items() if k != "class"})
|
||||
|
||||
357
unilabos/resources/itemized_carrier.py
Normal file
357
unilabos/resources/itemized_carrier.py
Normal file
@@ -0,0 +1,357 @@
|
||||
"""
|
||||
自动化液体处理工作站物料类定义 - 简化版
|
||||
Automated Liquid Handling Station Resource Classes - Simplified Version
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, Optional
|
||||
|
||||
from pylabrobot.resources.coordinate import Coordinate
|
||||
from pylabrobot.resources.container import Container
|
||||
from pylabrobot.resources.resource_holder import ResourceHolder
|
||||
from pylabrobot.resources import Resource as ResourcePLR
|
||||
|
||||
|
||||
class Bottle(Container):
|
||||
"""瓶子类 - 简化版,不追踪瓶盖"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
diameter: float,
|
||||
height: float,
|
||||
max_volume: float,
|
||||
size_x: float = 0.0,
|
||||
size_y: float = 0.0,
|
||||
size_z: float = 0.0,
|
||||
barcode: Optional[str] = "",
|
||||
category: str = "container",
|
||||
model: Optional[str] = None,
|
||||
):
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=diameter,
|
||||
size_y=diameter,
|
||||
size_z=height,
|
||||
max_volume=max_volume,
|
||||
category=category,
|
||||
model=model,
|
||||
)
|
||||
self.diameter = diameter
|
||||
self.height = height
|
||||
self.barcode = barcode
|
||||
|
||||
def serialize(self) -> dict:
|
||||
return {
|
||||
**super().serialize(),
|
||||
"diameter": self.diameter,
|
||||
"height": self.height,
|
||||
"barcode": self.barcode,
|
||||
}
|
||||
|
||||
|
||||
from string import ascii_uppercase as LETTERS
|
||||
from typing import Dict, List, Optional, Type, TypeVar, Union, Sequence, Tuple
|
||||
|
||||
import pylabrobot
|
||||
from pylabrobot.resources.resource_holder import ResourceHolder
|
||||
|
||||
T = TypeVar("T", bound=ResourceHolder)
|
||||
|
||||
S = TypeVar("S", bound=ResourceHolder)
|
||||
|
||||
|
||||
class ItemizedCarrier(ResourcePLR):
|
||||
"""Base class for all carriers."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
size_x: float,
|
||||
size_y: float,
|
||||
size_z: float,
|
||||
num_items_x: int = 0,
|
||||
num_items_y: int = 0,
|
||||
num_items_z: int = 0,
|
||||
sites: Optional[Dict[Union[int, str], Optional[ResourcePLR]]] = None,
|
||||
category: Optional[str] = "carrier",
|
||||
model: Optional[str] = None,
|
||||
):
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=size_x,
|
||||
size_y=size_y,
|
||||
size_z=size_z,
|
||||
category=category,
|
||||
model=model,
|
||||
)
|
||||
self.num_items = len(sites)
|
||||
self.num_items_x, self.num_items_y, self.num_items_z = num_items_x, num_items_y, num_items_z
|
||||
if isinstance(sites, dict):
|
||||
sites = sites or {}
|
||||
self.sites: List[Optional[ResourcePLR]] = list(sites.values())
|
||||
self._ordering = sites
|
||||
self.child_locations: Dict[str, Coordinate] = {}
|
||||
self.child_size: Dict[str, dict] = {}
|
||||
for spot, resource in sites.items():
|
||||
if resource is not None and getattr(resource, "location", None) is None:
|
||||
raise ValueError(f"resource {resource} has no location")
|
||||
if resource is not None:
|
||||
self.child_locations[spot] = resource.location
|
||||
self.child_size[spot] = {"width": resource._size_x, "height": resource._size_y, "depth": resource._size_z}
|
||||
else:
|
||||
self.child_locations[spot] = Coordinate.zero()
|
||||
self.child_size[spot] = {"width": 0, "height": 0, "depth": 0}
|
||||
elif isinstance(sites, list):
|
||||
# deserialize时走这里;还需要根据 self.sites 索引children
|
||||
self.child_locations = {site["label"]: Coordinate(**site["position"]) for site in sites}
|
||||
self.child_size = {site["label"]: site["size"] for site in sites}
|
||||
self.sites = [site["occupied_by"] for site in sites]
|
||||
self._ordering = {site["label"]: site["position"] for site in sites}
|
||||
else:
|
||||
print("sites:", sites)
|
||||
|
||||
@property
|
||||
def capacity(self):
|
||||
"""The number of sites on this carrier."""
|
||||
return len(self.sites)
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Return the number of sites on this carrier."""
|
||||
return len(self.sites)
|
||||
|
||||
def assign_child_resource(
|
||||
self,
|
||||
resource: ResourcePLR,
|
||||
location: Optional[Coordinate],
|
||||
reassign: bool = True,
|
||||
spot: Optional[int] = None,
|
||||
):
|
||||
idx = spot
|
||||
# 如果只给 location,根据坐标和 deserialize 后的 self.sites(持有names)来寻找 resource 该摆放的位置
|
||||
if spot is not None:
|
||||
idx = spot
|
||||
else:
|
||||
for i, site in enumerate(self.sites):
|
||||
site_location = list(self.child_locations.values())[i]
|
||||
if type(site) == str and site == resource.name:
|
||||
idx = i
|
||||
break
|
||||
if site_location == location:
|
||||
idx = i
|
||||
break
|
||||
|
||||
if not reassign and self.sites[idx] is not None:
|
||||
raise ValueError(f"a site with index {idx} already exists")
|
||||
super().assign_child_resource(resource, location=location, reassign=reassign)
|
||||
self.sites[idx] = resource
|
||||
|
||||
def assign_resource_to_site(self, resource: ResourcePLR, spot: int):
|
||||
if self.sites[spot] is not None and not isinstance(self.sites[spot], ResourceHolder):
|
||||
raise ValueError(f"spot {spot} already has a resource, {resource}")
|
||||
self.assign_child_resource(resource, location=self.child_locations.get(str(spot)), spot=spot)
|
||||
|
||||
def unassign_child_resource(self, resource: ResourcePLR):
|
||||
found = False
|
||||
for spot, res in enumerate(self.sites):
|
||||
if res == resource:
|
||||
self.sites[spot] = None
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
raise ValueError(f"Resource {resource} is not assigned to this carrier")
|
||||
if hasattr(resource, "unassign"):
|
||||
resource.unassign()
|
||||
|
||||
def __getitem__(
|
||||
self,
|
||||
identifier: Union[str, int, Sequence[int], Sequence[str], slice, range],
|
||||
) -> Union[List[T], T]:
|
||||
"""Get the items with the given identifier.
|
||||
|
||||
This is a convenience method for getting the items with the given identifier. It is equivalent
|
||||
to :meth:`get_items`, but adds support for slicing and supports single items in the same
|
||||
functional call. Note that the return type will always be a list, even if a single item is
|
||||
requested.
|
||||
|
||||
Examples:
|
||||
Getting the items with identifiers "A1" through "E1":
|
||||
|
||||
>>> items["A1:E1"]
|
||||
|
||||
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
|
||||
|
||||
Getting the items with identifiers 0 through 4 (note that this is the same as above):
|
||||
|
||||
>>> items[range(5)]
|
||||
|
||||
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
|
||||
|
||||
Getting items with a slice (note that this is the same as above):
|
||||
|
||||
>>> items[0:5]
|
||||
|
||||
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
|
||||
|
||||
Getting a single item:
|
||||
|
||||
>>> items[0]
|
||||
|
||||
[<Item A1>]
|
||||
"""
|
||||
|
||||
if isinstance(identifier, str):
|
||||
if ":" in identifier: # multiple # TODO: deprecate this, use `"A1":"E1"` instead (slice)
|
||||
return self.get_items(identifier)
|
||||
|
||||
return self.get_item(identifier) # single
|
||||
|
||||
if isinstance(identifier, int):
|
||||
return self.get_item(identifier)
|
||||
|
||||
if isinstance(identifier, (slice, range)):
|
||||
start, stop = identifier.start, identifier.stop
|
||||
if isinstance(identifier.start, str):
|
||||
start = list(self._ordering.keys()).index(identifier.start)
|
||||
elif identifier.start is None:
|
||||
start = 0
|
||||
if isinstance(identifier.stop, str):
|
||||
stop = list(self._ordering.keys()).index(identifier.stop)
|
||||
elif identifier.stop is None:
|
||||
stop = self.num_items
|
||||
identifier = list(range(start, stop, identifier.step or 1))
|
||||
return self.get_items(identifier)
|
||||
|
||||
if isinstance(identifier, (list, tuple)):
|
||||
return self.get_items(identifier)
|
||||
|
||||
raise TypeError(f"Invalid identifier type: {type(identifier)}")
|
||||
|
||||
def get_item(self, identifier: Union[str, int, Tuple[int, int]]) -> T:
|
||||
"""Get the item with the given identifier.
|
||||
|
||||
Args:
|
||||
identifier: The identifier of the item. Either a string, an integer, or a tuple. If an
|
||||
integer, it is the index of the item in the list of items (counted from 0, top to bottom, left
|
||||
to right). If a string, it uses transposed MS Excel style notation, e.g. "A1" for the first
|
||||
item, "B1" for the item below that, etc. If a tuple, it is (row, column).
|
||||
|
||||
Raises:
|
||||
IndexError: If the identifier is out of range. The range is 0 to self.num_items-1 (inclusive).
|
||||
"""
|
||||
|
||||
if isinstance(identifier, tuple):
|
||||
row, column = identifier
|
||||
identifier = LETTERS[row] + str(column + 1) # standard transposed-Excel style notation
|
||||
if isinstance(identifier, str):
|
||||
try:
|
||||
identifier = list(self._ordering.keys()).index(identifier)
|
||||
except ValueError as e:
|
||||
raise IndexError(
|
||||
f"Item with identifier '{identifier}' does not exist on " f"resource '{self.name}'."
|
||||
) from e
|
||||
|
||||
if not 0 <= identifier < self.capacity:
|
||||
raise IndexError(
|
||||
f"Item with identifier '{identifier}' does not exist on " f"resource '{self.name}'."
|
||||
)
|
||||
|
||||
# Cast child to item type. Children will always be `T`, but the type checker doesn't know that.
|
||||
return self.sites[identifier]
|
||||
|
||||
def get_items(self, identifiers: Union[str, Sequence[int], Sequence[str]]) -> List[T]:
|
||||
"""Get the items with the given identifier.
|
||||
|
||||
Args:
|
||||
identifier: Deprecated. Use `identifiers` instead. # TODO(deprecate-ordered-items)
|
||||
identifiers: The identifiers of the items. Either a string range or a list of integers. If a
|
||||
string, it uses transposed MS Excel style notation. Regions of items can be specified using
|
||||
a colon, e.g. "A1:H1" for the first column. If a list of integers, it is the indices of the
|
||||
items in the list of items (counted from 0, top to bottom, left to right).
|
||||
|
||||
Examples:
|
||||
Getting the items with identifiers "A1" through "E1":
|
||||
|
||||
>>> items.get_items("A1:E1")
|
||||
|
||||
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
|
||||
|
||||
Getting the items with identifiers 0 through 4:
|
||||
|
||||
>>> items.get_items(range(5))
|
||||
|
||||
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
|
||||
"""
|
||||
|
||||
if isinstance(identifiers, str):
|
||||
identifiers = pylabrobot.utils.expand_string_range(identifiers)
|
||||
return [self.get_item(i) for i in identifiers]
|
||||
|
||||
def __setitem__(self, idx: Union[int, str], resource: Optional[ResourcePLR]):
|
||||
"""Assign a resource to this carrier."""
|
||||
if resource is None: # setting to None
|
||||
assigned_resource = self[idx]
|
||||
if assigned_resource is not None:
|
||||
self.unassign_child_resource(assigned_resource)
|
||||
else:
|
||||
idx = list(self._ordering.keys()).index(idx) if isinstance(idx, str) else idx
|
||||
self.assign_resource_to_site(resource, spot=idx)
|
||||
|
||||
def __delitem__(self, idx: int):
|
||||
"""Unassign a resource from this carrier."""
|
||||
assigned_resource = self[idx]
|
||||
if assigned_resource is not None:
|
||||
self.unassign_child_resource(assigned_resource)
|
||||
|
||||
def get_resources(self) -> List[ResourcePLR]:
|
||||
"""Get all resources assigned to this carrier."""
|
||||
return [resource for resource in self.sites.values() if resource is not None]
|
||||
|
||||
def __eq__(self, other):
|
||||
return super().__eq__(other) and self.sites == other.sites
|
||||
|
||||
def get_free_sites(self) -> List[int]:
|
||||
return [spot for spot, resource in self.sites.items() if resource is None]
|
||||
|
||||
def serialize(self):
|
||||
return {
|
||||
**super().serialize(),
|
||||
"num_items_x": self.num_items_x,
|
||||
"num_items_y": self.num_items_y,
|
||||
"num_items_z": self.num_items_z,
|
||||
"sites": [{
|
||||
"label": str(identifier),
|
||||
"visible": True if self[identifier] is not None else False,
|
||||
"occupied_by": self[identifier].name
|
||||
if isinstance(self[identifier], ResourcePLR) and not isinstance(self[identifier], ResourceHolder) else
|
||||
self[identifier] if isinstance(self[identifier], str) else None,
|
||||
"position": {"x": location.x, "y": location.y, "z": location.z},
|
||||
"size": self.child_size[identifier],
|
||||
"content_type": ["bottle", "container", "tube", "bottle_carrier", "tip_rack"]
|
||||
} for identifier, location in self.child_locations.items()]
|
||||
}
|
||||
|
||||
|
||||
class BottleCarrier(ItemizedCarrier):
|
||||
"""瓶载架 - 直接继承自 TubeCarrier"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
size_x: float,
|
||||
size_y: float,
|
||||
size_z: float,
|
||||
sites: Optional[Dict[Union[int, str], ResourceHolder]] = None,
|
||||
category: str = "bottle_carrier",
|
||||
model: Optional[str] = None,
|
||||
):
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=size_x,
|
||||
size_y=size_y,
|
||||
size_z=size_z,
|
||||
sites=sites,
|
||||
category=category,
|
||||
model=model,
|
||||
)
|
||||
104
unilabos/resources/warehouse.py
Normal file
104
unilabos/resources/warehouse.py
Normal file
@@ -0,0 +1,104 @@
|
||||
from typing import Dict, Optional, List, Union
|
||||
from pylabrobot.resources import Coordinate
|
||||
from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_resources
|
||||
|
||||
from unilabos.resources.itemized_carrier import ItemizedCarrier, ResourcePLR
|
||||
|
||||
|
||||
def warehouse_factory(
|
||||
name: str,
|
||||
num_items_x: int = 4,
|
||||
num_items_y: int = 1,
|
||||
num_items_z: int = 4,
|
||||
dx: float = 137.0,
|
||||
dy: float = 96.0,
|
||||
dz: float = 120.0,
|
||||
item_dx: float = 10.0,
|
||||
item_dy: float = 10.0,
|
||||
item_dz: float = 10.0,
|
||||
removed_positions: Optional[List[int]] = None,
|
||||
empty: bool = False,
|
||||
category: str = "warehouse",
|
||||
model: Optional[str] = None,
|
||||
):
|
||||
# 创建16个板架位 (4层 x 4位置)
|
||||
locations = []
|
||||
for layer in range(num_items_z): # 4层
|
||||
for row in range(num_items_y): # 4行
|
||||
for col in range(num_items_x): # 1列 (每层4x1=4个位置)
|
||||
# 计算位置
|
||||
x = dx + col * item_dx
|
||||
y = dy + (num_items_y - row - 1) * item_dy
|
||||
z = dz + (num_items_z - layer - 1) * item_dz
|
||||
locations.append(Coordinate(x, y, z))
|
||||
if removed_positions:
|
||||
locations = [loc for i, loc in enumerate(locations) if i not in removed_positions]
|
||||
sites = create_homogeneous_resources(
|
||||
klass=ResourceHolder,
|
||||
locations=locations,
|
||||
resource_size_x=127.0,
|
||||
resource_size_y=86.0,
|
||||
name_prefix=name,
|
||||
)
|
||||
|
||||
return WareHouse(
|
||||
name=name,
|
||||
size_x=dx + item_dx * num_items_x,
|
||||
size_y=dy + item_dy * num_items_y,
|
||||
size_z=dz + item_dz * num_items_z,
|
||||
num_items_x = num_items_x,
|
||||
num_items_y = num_items_y,
|
||||
num_items_z = num_items_z,
|
||||
# ordered_items=ordered_items,
|
||||
# ordering=ordering,
|
||||
sites=sites,
|
||||
category=category,
|
||||
model=model,
|
||||
)
|
||||
|
||||
|
||||
class WareHouse(ItemizedCarrier):
|
||||
"""堆栈载体类 - 可容纳16个板位的载体(4层x4行x1列)"""
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
size_x: float,
|
||||
size_y: float,
|
||||
size_z: float,
|
||||
num_items_x: int,
|
||||
num_items_y: int,
|
||||
num_items_z: int,
|
||||
sites: Optional[Dict[Union[int, str], Optional[ResourcePLR]]] = None,
|
||||
category: str = "warehouse",
|
||||
model: Optional[str] = None,
|
||||
):
|
||||
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=size_x,
|
||||
size_y=size_y,
|
||||
size_z=size_z,
|
||||
# ordered_items=ordered_items,
|
||||
# ordering=ordering,
|
||||
num_items_x=num_items_x,
|
||||
num_items_y=num_items_y,
|
||||
num_items_z=num_items_z,
|
||||
sites=sites,
|
||||
category=category,
|
||||
model=model,
|
||||
)
|
||||
|
||||
def get_site_by_layer_position(self, row: int, col: int, layer: int) -> ResourceHolder:
|
||||
if not (0 <= layer < 4 and 0 <= row < 4 and 0 <= col < 1):
|
||||
raise ValueError("无效的位置: layer={}, row={}, col={}".format(layer, row, col))
|
||||
|
||||
site_index = layer * 4 + row * 1 + col
|
||||
return self.sites[site_index]
|
||||
|
||||
def add_rack_to_position(self, row: int, col: int, layer: int, rack) -> None:
|
||||
site = self.get_site_by_layer_position(row, col, layer)
|
||||
site.assign_child_resource(rack)
|
||||
|
||||
def get_rack_at_position(self, row: int, col: int, layer: int):
|
||||
site = self.get_site_by_layer_position(row, col, layer)
|
||||
return site.resource
|
||||
Reference in New Issue
Block a user