Files
Uni-Lab-OS/unilabos/resources/itemized_carrier.py
Xuwznln 4c8022ee95 Workstation yb merge dev ready 260113 (#216)
* feat(bioyond): 添加计算实验设计功能,支持化合物配比和滴定比例参数

* feat(bioyond): 添加测量小瓶功能,支持基本参数配置

* feat(bioyond): 添加测量小瓶配置,支持新设备参数

* feat(bioyond): 更新仓库布局和尺寸,支持竖向排列的测量小瓶和试剂存放堆栈

* feat(bioyond): 优化任务创建流程,确保无论成功与否都清理任务队列以避免重复累积

* feat(bioyond): 添加设置反应器温度功能,支持温度范围和异常处理

* feat(bioyond): 调整反应器位置配置,统一坐标格式

* feat(bioyond): 添加调度器启动功能,支持任务队列执行并处理异常

* feat(bioyond): 优化调度器启动功能,添加异常处理并更新相关配置

* feat(opcua): 增强节点ID解析兼容性和数据类型处理

改进节点ID解析逻辑以支持多种格式,包括字符串和数字标识符
添加数据类型转换处理,确保写入值时类型匹配
优化错误提示信息,便于调试节点连接问题

* feat(registry): 新增后处理站的设备配置文件

添加后处理站的YAML配置文件,包含动作映射、状态类型和设备描述

* 添加调度器启动功能,合并物料参数配置,优化物料参数处理逻辑

* 添加从 Bioyond 系统自动同步工作流序列的功能,并更新相关配置

* fix:兼容 BioyondReactionStation 中 workflow_sequence 被重写为 property

* fix:同步工作流序列

* feat: remove commented workflow synchronization from `reaction_station.py`.

* 添加时间约束功能及相关配置

* fix:自动更新物料缓存功能,添加物料时更新缓存并在删除时移除缓存项

* fix:在添加物料时处理字符串和字典返回值,确保正确更新缓存

* fix:更新奔曜错误处理报送为物料变更报送,调整日志记录和响应消息

* feat:添加实验报告简化功能,去除冗余信息并保留关键信息

* feat: 添加任务状态事件发布功能,监控并报告任务运行、超时、完成和错误状态

* fix: 修复添加物料时数据格式错误

* Refactor bioyond_dispensing_station and reaction_station_bioyond YAML configurations

- Removed redundant action value mappings from bioyond_dispensing_station.
- Updated goal properties in bioyond_dispensing_station to use enums for target_stack and other parameters.
- Changed data types for end_point and start_point in reaction_station_bioyond to use string enums (Start, End).
- Simplified descriptions and updated measurement units from μL to mL where applicable.
- Removed unused commands from reaction_station_bioyond to streamline the configuration.

* fix:Change the material unit from μL to mL

* fix:refresh_material_cache

* feat: 动态获取工作流步骤ID,优化工作流配置

* feat: 添加清空服务端所有非核心工作流功能

* fix:修复Bottle类的序列化和反序列化方法

* feat:增强材料缓存更新逻辑,支持处理返回数据中的详细信息

* Add debug log

* feat(workstation): update bioyond config migration and coin cell material search logic

- Migrate bioyond_cell config to JSON structure and remove global variable dependencies
- Implement material search confirmation dialog auto-handling
- Add documentation: 20260113_物料搜寻确认弹窗自动处理功能.md and 20260113_配置迁移修改总结.md

* Refactor module paths for Bioyond devices in YAML configuration files

- Updated the module path for BioyondDispensingStation in bioyond_dispensing_station.yaml to reflect the new directory structure.
- Updated the module path for BioyondReactionStation and BioyondReactor in reaction_station_bioyond.yaml to align with the revised organization of the codebase.

* fix: WareHouse 的不可哈希类型错误,优化父节点去重逻辑

* refactor: Move config from module to instance initialization

* fix: 修正 reaction_station 目录名拼写错误

* feat: Integrate material search logic and cleanup deprecated files

- Update coin_cell_assembly.py with material search dialog handling
- Update YB_warehouses.py with latest warehouse configurations
- Remove outdated documentation and test data files

* Refactor: Use instance attributes for action names and workflow step IDs

* refactor: Split tipbox storage into left and right warehouses

* refactor: Merge tipbox storage left and right into single warehouse

---------

Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>
Co-authored-by: Andy6M <xieqiming1132@qq.com>
2026-01-17 15:44:18 +08:00

484 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
自动化液体处理工作站物料类定义 - 简化版
Automated Liquid Handling Station Resource Classes - Simplified Version
"""
from __future__ import annotations
from typing import Dict, List, Optional, TypeVar, Union, Sequence, Tuple
import pylabrobot
from pylabrobot.resources import Resource as ResourcePLR
from pylabrobot.resources import Well, ResourceHolder
from pylabrobot.resources.coordinate import Coordinate
LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
class Bottle(Well):
"""瓶子类 - 简化版,不追踪瓶盖"""
def __init__(
self,
name: str,
diameter: float,
height: float,
max_volume: float,
size_x: float = 0.0,
size_y: float = 0.0,
size_z: float = 0.0,
barcode: Optional[str] = None,
category: str = "container",
model: Optional[str] = None,
**kwargs,
):
super().__init__(
name=name,
size_x=diameter,
size_y=diameter,
size_z=height,
max_volume=max_volume,
category=category,
model=model,
bottom_type="flat",
cross_section_type="circle"
)
self.diameter = diameter
self.height = height
self.barcode = barcode
def serialize(self) -> dict:
# Pylabrobot expects barcode to be an object with serialize(), but here it is a str.
# We temporarily unset it to avoid AttributeError in super().serialize().
_barcode = self.barcode
self.barcode = None
try:
data = super().serialize()
finally:
self.barcode = _barcode
return {
**data,
"diameter": self.diameter,
"height": self.height,
}
@classmethod
def deserialize(cls, data: dict, allow_marshal: bool = False):
# Extract barcode before calling parent deserialize to avoid type error
barcode_data = data.pop("barcode", None)
# Call parent deserialize
instance = super(Bottle, cls).deserialize(data, allow_marshal=allow_marshal)
# Set barcode as string (not as Barcode object)
if barcode_data:
if isinstance(barcode_data, str):
instance.barcode = barcode_data
elif isinstance(barcode_data, dict):
# If it's a dict (Barcode serialized format), extract the data field
instance.barcode = barcode_data.get("data", "")
else:
instance.barcode = ""
# Set additional attributes
instance.diameter = data.get("diameter", instance._size_x)
instance.height = data.get("height", instance._size_z)
return instance
T = TypeVar("T", bound=ResourceHolder)
S = TypeVar("S", bound=ResourceHolder)
class ItemizedCarrier(ResourcePLR):
"""Base class for all carriers."""
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
num_items_x: int = 0,
num_items_y: int = 0,
num_items_z: int = 0,
layout: str = "x-y",
sites: Optional[Dict[Union[int, str], Optional[ResourcePLR]]] = None,
category: Optional[str] = "carrier",
model: Optional[str] = None,
invisible_slots: Optional[str] = None,
):
super().__init__(
name=name,
size_x=size_x,
size_y=size_y,
size_z=size_z,
category=category,
model=model,
)
self.num_items = len(sites)
self.num_items_x, self.num_items_y, self.num_items_z = num_items_x, num_items_y, num_items_z
self.invisible_slots = [] if invisible_slots is None else invisible_slots
self.layout = "z-y" if self.num_items_z > 1 and self.num_items_x == 1 else "x-z" if self.num_items_z > 1 and self.num_items_y == 1 else "x-y"
if isinstance(sites, dict):
sites = sites or {}
self.sites: List[Optional[ResourcePLR]] = list(sites.values())
self._ordering = sites
self.child_locations: Dict[str, Coordinate] = {}
self.child_size: Dict[str, dict] = {}
for spot, resource in sites.items():
if resource is not None and getattr(resource, "location", None) is None:
raise ValueError(f"resource {resource} has no location")
if resource is not None:
self.child_locations[spot] = resource.location
self.child_size[spot] = {"width": resource._size_x, "height": resource._size_y, "depth": resource._size_z}
else:
self.child_locations[spot] = Coordinate.zero()
self.child_size[spot] = {"width": 0, "height": 0, "depth": 0}
elif isinstance(sites, list):
# deserialize时走这里还需要根据 self.sites 索引children
self.child_locations = {site["label"]: Coordinate(**site["position"]) for site in sites}
self.child_size = {site["label"]: site["size"] for site in sites}
self.sites = [site["occupied_by"] for site in sites]
self._ordering = {site["label"]: site["position"] for site in sites}
else:
print("sites:", sites)
@property
def capacity(self):
"""The number of sites on this carrier."""
return len(self.sites)
def __len__(self) -> int:
"""Return the number of sites on this carrier."""
return len(self.sites)
def assign_child_resource(
self,
resource: ResourcePLR,
location: Optional[Coordinate],
reassign: bool = True,
spot: Optional[int] = None,
):
idx = spot
# 如果只给 location根据坐标和 deserialize 后的 self.sites持有names来寻找 resource 该摆放的位置
if spot is not None:
idx = spot
else:
for i, site in enumerate(self.sites):
site_location = list(self.child_locations.values())[i]
if type(site) == str and site == resource.name:
idx = i
break
if site_location == location:
idx = i
break
if not reassign and self.sites[idx] is not None:
raise ValueError(f"a site with index {idx} already exists")
location = list(self.child_locations.values())[idx]
super().assign_child_resource(resource, location=location, reassign=reassign)
self.sites[idx] = resource
def assign_resource_to_site(self, resource: ResourcePLR, spot: int):
if self.sites[spot] is not None and not isinstance(self.sites[spot], ResourceHolder):
raise ValueError(f"spot {spot} already has a resource, {resource}")
self.assign_child_resource(resource, location=self.child_locations.get(list(self._ordering.keys())[spot]), spot=spot)
def unassign_child_resource(self, resource: ResourcePLR):
found = False
for spot, res in enumerate(self.sites):
if res == resource:
self.sites[spot] = None
found = True
break
if not found:
raise ValueError(f"Resource {resource} is not assigned to this carrier")
super().unassign_child_resource(resource)
# if hasattr(resource, "unassign"):
# resource.unassign()
def get_child_identifier(self, child: ResourcePLR):
"""Get the identifier information for a given child resource.
Args:
child: The Resource object to find the identifier for
Returns:
dict: A dictionary containing:
- identifier: The string identifier (e.g. "A1", "B2")
- idx: The integer index in the sites list
- x: The x index (column index, 0-based)
- y: The y index (row index, 0-based)
- z: The z index (layer index, 0-based)
Raises:
ValueError: If the child resource is not found in this carrier
"""
# Find the child resource in sites
for idx, resource in enumerate(self.sites):
if resource is child:
# Get the identifier from ordering keys
identifier = list(self._ordering.keys())[idx]
# Parse identifier to get x, y, z indices
x_idx, y_idx, z_idx = self._parse_identifier_to_indices(identifier, idx)
return {
"identifier": identifier,
"idx": idx,
"x": x_idx,
"y": y_idx,
"z": z_idx
}
# If not found, raise an error
raise ValueError(f"Resource {child} is not assigned to this carrier")
def _parse_identifier_to_indices(self, identifier: str, idx: int) -> Tuple[int, int, int]:
"""Parse identifier string to get x, y, z indices.
Args:
identifier: String identifier like "A1", "B2", etc.
idx: Linear index as fallback for calculation
Returns:
Tuple of (x_idx, y_idx, z_idx)
"""
# If we have explicit dimensions, calculate from idx
if self.num_items_x > 0 and self.num_items_y > 0:
# Calculate 3D indices from linear index
z_idx = idx // (self.num_items_x * self.num_items_y) if self.num_items_z > 0 else 0
remaining = idx % (self.num_items_x * self.num_items_y)
y_idx = remaining // self.num_items_x
x_idx = remaining % self.num_items_x
return x_idx, y_idx, z_idx
# Fallback: parse from Excel-style identifier
if isinstance(identifier, str) and len(identifier) >= 2:
# Extract row (letter) and column (number)
row_letters = ""
col_numbers = ""
for char in identifier:
if char.isalpha():
row_letters += char
elif char.isdigit():
col_numbers += char
if row_letters and col_numbers:
# Convert letter(s) to row index (A=0, B=1, etc.)
y_idx = 0
for char in row_letters:
y_idx = y_idx * 26 + (ord(char.upper()) - ord('A'))
# Convert number to column index (1-based to 0-based)
x_idx = int(col_numbers) - 1
z_idx = 0 # Default layer
return x_idx, y_idx, z_idx
# If all else fails, assume linear arrangement
return idx, 0, 0
def __getitem__(
self,
identifier: Union[str, int, Sequence[int], Sequence[str], slice, range],
) -> Union[List[T], T]:
"""Get the items with the given identifier.
This is a convenience method for getting the items with the given identifier. It is equivalent
to :meth:`get_items`, but adds support for slicing and supports single items in the same
functional call. Note that the return type will always be a list, even if a single item is
requested.
Examples:
Getting the items with identifiers "A1" through "E1":
>>> items["A1:E1"]
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
Getting the items with identifiers 0 through 4 (note that this is the same as above):
>>> items[range(5)]
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
Getting items with a slice (note that this is the same as above):
>>> items[0:5]
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
Getting a single item:
>>> items[0]
[<Item A1>]
"""
if isinstance(identifier, str):
if ":" in identifier: # multiple # TODO: deprecate this, use `"A1":"E1"` instead (slice)
return self.get_items(identifier)
return self.get_item(identifier) # single
if isinstance(identifier, int):
return self.get_item(identifier)
if isinstance(identifier, (slice, range)):
start, stop = identifier.start, identifier.stop
if isinstance(identifier.start, str):
start = list(self._ordering.keys()).index(identifier.start)
elif identifier.start is None:
start = 0
if isinstance(identifier.stop, str):
stop = list(self._ordering.keys()).index(identifier.stop)
elif identifier.stop is None:
stop = self.num_items
identifier = list(range(start, stop, identifier.step or 1))
return self.get_items(identifier)
if isinstance(identifier, (list, tuple)):
return self.get_items(identifier)
raise TypeError(f"Invalid identifier type: {type(identifier)}")
def get_item(self, identifier: Union[str, int, Tuple[int, int]]) -> T:
"""Get the item with the given identifier.
Args:
identifier: The identifier of the item. Either a string, an integer, or a tuple. If an
integer, it is the index of the item in the list of items (counted from 0, top to bottom, left
to right). If a string, it uses transposed MS Excel style notation, e.g. "A1" for the first
item, "B1" for the item below that, etc. If a tuple, it is (row, column).
Raises:
IndexError: If the identifier is out of range. The range is 0 to self.num_items-1 (inclusive).
"""
if isinstance(identifier, tuple):
row, column = identifier
identifier = LETTERS[row] + str(column + 1) # standard transposed-Excel style notation
if isinstance(identifier, str):
try:
identifier = list(self._ordering.keys()).index(identifier)
except ValueError as e:
raise IndexError(
f"Item with identifier '{identifier}' does not exist on " f"resource '{self.name}'."
) from e
if not 0 <= identifier < self.capacity:
raise IndexError(
f"Item with identifier '{identifier}' does not exist on " f"resource '{self.name}'."
)
# Cast child to item type. Children will always be `T`, but the type checker doesn't know that.
return self.sites[identifier]
def get_items(self, identifiers: Union[str, Sequence[int], Sequence[str]]) -> List[T]:
"""Get the items with the given identifier.
Args:
identifier: Deprecated. Use `identifiers` instead. # TODO(deprecate-ordered-items)
identifiers: The identifiers of the items. Either a string range or a list of integers. If a
string, it uses transposed MS Excel style notation. Regions of items can be specified using
a colon, e.g. "A1:H1" for the first column. If a list of integers, it is the indices of the
items in the list of items (counted from 0, top to bottom, left to right).
Examples:
Getting the items with identifiers "A1" through "E1":
>>> items.get_items("A1:E1")
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
Getting the items with identifiers 0 through 4:
>>> items.get_items(range(5))
[<Item A1>, <Item B1>, <Item C1>, <Item D1>, <Item E1>]
"""
if isinstance(identifiers, str):
identifiers = pylabrobot.utils.expand_string_range(identifiers)
return [self.get_item(i) for i in identifiers]
def __setitem__(self, idx: Union[int, str], resource: Optional[ResourcePLR]):
"""Assign a resource to this carrier."""
if resource is None: # setting to None
assigned_resource = self[idx]
if assigned_resource is not None:
self.unassign_child_resource(assigned_resource)
else:
idx = list(self._ordering.keys()).index(idx) if isinstance(idx, str) else idx
self.assign_resource_to_site(resource, spot=idx)
def __delitem__(self, idx: int):
"""Unassign a resource from this carrier."""
assigned_resource = self[idx]
if assigned_resource is not None:
self.unassign_child_resource(assigned_resource)
def get_resources(self) -> List[ResourcePLR]:
"""Get all resources assigned to this carrier."""
return [resource for resource in self.sites.values() if resource is not None]
def __eq__(self, other):
return super().__eq__(other) and self.sites == other.sites
def get_free_sites(self) -> List[int]:
return [spot for spot, resource in self.sites.items() if resource is None]
def serialize(self):
return {
**super().serialize(),
"num_items_x": self.num_items_x,
"num_items_y": self.num_items_y,
"num_items_z": self.num_items_z,
"layout": self.layout,
"sites": [{
"label": str(identifier),
"visible": False if identifier in self.invisible_slots else True,
"occupied_by": self[identifier].name
if isinstance(self[identifier], ResourcePLR) and not isinstance(self[identifier], ResourceHolder) else
self[identifier] if isinstance(self[identifier], str) else None,
"position": {"x": location.x, "y": location.y, "z": location.z},
"size": self.child_size[identifier],
"content_type": ["bottle", "container", "tube", "bottle_carrier", "tip_rack"]
} for identifier, location in self.child_locations.items()]
}
class BottleCarrier(ItemizedCarrier):
"""瓶载架 - 直接继承自 TubeCarrier"""
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
sites: Optional[Dict[Union[int, str], ResourceHolder]] = None,
category: str = "bottle_carrier",
model: Optional[str] = None,
invisible_slots: List[str] = None,
**kwargs,
):
super().__init__(
name=name,
size_x=size_x,
size_y=size_y,
size_z=size_z,
sites=sites,
category=category,
model=model,
invisible_slots=invisible_slots,
)