This commit is contained in:
Xianwei Qi
2025-12-26 16:25:19 +08:00
88 changed files with 83331 additions and 15223 deletions

View File

@@ -153,7 +153,7 @@ class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend):
if self.hardware_interface.tip_status == TipStatus.TIP_ATTACHED:
print("已有枪头,无需重复拾取")
return
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=100)
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200)
self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height,speed=100)
# self.joint_state_publisher.send_resource_action(ops[0].resource.name, x, y, z, "pick",channels=use_channels)
# goback()
@@ -202,7 +202,7 @@ class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend):
if self.hardware_interface.tip_status == TipStatus.NO_TIP:
print("无枪头,无需丢弃")
return
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z)
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200)
self.hardware_interface.eject_tip
self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height)
@@ -267,7 +267,7 @@ class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend):
return
# 移动到吸液位置
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z)
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200)
self.pipette_aspirate(volume=ops[0].volume, flow_rate=flow_rate)
@@ -340,7 +340,7 @@ class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend):
# 移动到排液位置
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z)
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200)
self.pipette_dispense(volume=ops[0].volume, flow_rate=flow_rate)

View File

@@ -128,6 +128,7 @@ class PipetteController:
baudrate=115200
)
self.pipette = SOPAPipette(self.config)
self.pipette_port = port
self.tip_status = TipStatus.NO_TIP
self.current_volume = 0.0
self.max_volume = 1000.0 # 默认1000ul
@@ -154,7 +155,7 @@ class PipetteController:
logger.info("移液器连接成功")
# 连接XYZ步进电机控制器如果提供了端口
if self.xyz_port:
if self.xyz_port != self.pipette_port:
try:
self.xyz_controller = XYZController(self.xyz_port)
if self.xyz_controller.connect():
@@ -168,7 +169,12 @@ class PipetteController:
self.xyz_controller = None
self.xyz_connected = False
else:
logger.info("未配置XYZ步进电机端口跳过运动控制器连接")
try:
self.xyz_controller = XYZController(self.xyz_port, auto_connect=False)
self.xyz_controller.serial_conn = self.pipette.serial_port
self.xyz_controller.is_connected = True
except Exception as e:
logger.info("未配置XYZ步进电机端口跳过运动控制器连接")
return True
except Exception as e:

View File

@@ -6,6 +6,7 @@ import traceback
from collections import Counter
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
from typing_extensions import TypedDict
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend
from unilabos.devices.liquid_handling.laiyu.backend.laiyu_v_backend import UniLiquidHandlerLaiyuBackend
@@ -28,12 +29,15 @@ from pylabrobot.resources import (
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class SimpleReturn(TypedDict):
samples: list
volumes: list
class LiquidHandlerMiddleware(LiquidHandler):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8, **kwargs):
self._simulator = simulator
self.channel_num = channel_num
self.pending_liquids_dict = {}
joint_config = kwargs.get("joint_config", None)
if simulator:
if joint_config:
@@ -131,7 +135,9 @@ class LiquidHandlerMiddleware(LiquidHandler):
return await self._simulate_handler.drop_tips(
tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs
)
return await super().drop_tips(tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs)
await super().drop_tips(tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs)
self.pending_liquids_dict = {}
return
async def return_tips(
self, use_channels: Optional[list[int]] = None, allow_nonzero_volume: bool = False, **backend_kwargs
@@ -154,8 +160,10 @@ class LiquidHandlerMiddleware(LiquidHandler):
offsets = [Coordinate.zero()] * len(use_channels)
if self._simulator:
return await self._simulate_handler.discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
return await super().discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
await super().discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
self.pending_liquids_dict = {}
return
def _check_containers(self, resources: Sequence[Resource]):
super()._check_containers(resources)
@@ -171,6 +179,8 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
if self._simulator:
return await self._simulate_handler.aspirate(
resources,
@@ -183,7 +193,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread,
**backend_kwargs,
)
return await super().aspirate(
await super().aspirate(
resources,
vols,
use_channels,
@@ -195,6 +205,18 @@ class LiquidHandlerMiddleware(LiquidHandler):
**backend_kwargs,
)
res_samples = []
res_volumes = []
for resource, volume, channel in zip(resources, vols, use_channels):
res_samples.append({"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)})
res_volumes.append(volume)
self.pending_liquids_dict[channel] = {
"sample_uuid": resource.unilabos_extra.get("sample_uuid", None),
"volume": volume
}
return SimpleReturn(samples=res_samples, volumes=res_volumes)
async def dispense(
self,
resources: Sequence[Container],
@@ -206,7 +228,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
blow_out_air_volume: Optional[List[Optional[float]]] = None,
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
) -> SimpleReturn:
if self._simulator:
return await self._simulate_handler.dispense(
resources,
@@ -219,7 +241,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread,
**backend_kwargs,
)
return await super().dispense(
await super().dispense(
resources,
vols,
use_channels,
@@ -229,7 +251,17 @@ class LiquidHandlerMiddleware(LiquidHandler):
blow_out_air_volume,
**backend_kwargs,
)
res_samples = []
res_volumes = []
for resource, volume, channel in zip(resources, vols, use_channels):
res_uuid = self.pending_liquids_dict[channel]["sample_uuid"]
self.pending_liquids_dict[channel]["volume"] -= volume
resource.unilabos_extra["sample_uuid"] = res_uuid
res_samples.append({"name": resource.name, "sample_uuid": res_uuid})
res_volumes.append(volume)
return SimpleReturn(samples=res_samples, volumes=res_volumes)
async def transfer(
self,
source: Well,
@@ -549,25 +581,66 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
support_touch_tip = True
_ros_node: BaseROS2DeviceNode
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool=False, channel_num:int = 8):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool=False, channel_num:int = 8, total_height:float = 310):
"""Initialize a LiquidHandler.
Args:
backend: Backend to use.
deck: Deck to use.
"""
backend_type = None
if isinstance(backend, dict) and "type" in backend:
backend_dict = backend.copy()
type_str = backend_dict.pop("type")
try:
# Try to get class from string using globals (current module), or fallback to pylabrobot or unilabos namespaces
backend_cls = None
if type_str in globals():
backend_cls = globals()[type_str]
else:
# Try resolving dotted notation, e.g. "xxx.yyy.ClassName"
components = type_str.split(".")
mod = None
if len(components) > 1:
module_name = ".".join(components[:-1])
try:
import importlib
mod = importlib.import_module(module_name)
except ImportError:
mod = None
if mod is not None:
backend_cls = getattr(mod, components[-1], None)
if backend_cls is None:
# Try pylabrobot style import (if available)
try:
import pylabrobot
backend_cls = getattr(pylabrobot, type_str, None)
except Exception:
backend_cls = None
if backend_cls is not None and isinstance(backend_cls, type):
backend_type = backend_cls(**backend_dict) # pass the rest of dict as kwargs
except Exception as exc:
raise RuntimeError(f"Failed to convert backend type '{type_str}' to class: {exc}")
else:
backend_type = backend
self._simulator = simulator
self.group_info = dict()
super().__init__(backend, deck, simulator, channel_num)
super().__init__(backend_type, deck, simulator, channel_num)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
@classmethod
def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]):
def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SimpleReturn:
"""Set the liquid in a well."""
res_samples = []
res_volumes = []
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
well.set_liquids([(liquid_name, volume)]) # type: ignore
res_samples.append({"name": well.name, "sample_uuid": well.unilabos_extra.get("sample_uuid", None)})
res_volumes.append(volume)
return SimpleReturn(samples=res_samples, volumes=res_volumes)
# ---------------------------------------------------------------
# REMOVE LIQUID --------------------------------------------------
# ---------------------------------------------------------------

View File

@@ -1,12 +1,14 @@
import asyncio
import collections
from collections import OrderedDict
import contextlib
import json
import os
import socket
import time
import uuid
from typing import Any, List, Dict, Optional, OrderedDict, Tuple, TypedDict, Union, Sequence, Iterator, Literal
from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal
from pylabrobot.liquid_handling.standard import GripDirection
from pylabrobot.liquid_handling import (
LiquidHandlerBackend,
@@ -28,9 +30,9 @@ from pylabrobot.liquid_handling.standard import (
ResourceMove,
ResourceDrop,
)
from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash, TubeRack, PlateAdapter
from pylabrobot.resources import ResourceHolder, ResourceStack, Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash, PlateAdapter, TubeRack
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract, SimpleReturn
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
@@ -69,7 +71,35 @@ class PRCXI9300Deck(Deck):
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs):
super().__init__(name, size_x, size_y, size_z)
self.slots = [None] * 6 # PRCXI 9300 有 6 个槽位
class PRCXI9300Container(Plate):
"""PRCXI 9300 的专用 Container 类,继承自 Plate用于槽位定位和未知模块。
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
category: str,
ordering: collections.OrderedDict,
model: Optional[str] = None,
**kwargs,
):
super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model)
self._unilabos_state = {}
def load_state(self, state: Dict[str, Any]) -> None:
"""从给定的状态加载工作台信息。"""
super().load_state(state)
self._unilabos_state = state
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
data = super().serialize_state()
data.update(self._unilabos_state)
return data
class PRCXI9300Plate(Plate):
"""
专用孔板类:
@@ -83,11 +113,43 @@ class PRCXI9300Plate(Plate):
model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
**kwargs):
items = ordered_items if ordered_items is not None else ordering
super().__init__(name, size_x, size_y, size_z,
ordered_items=items,
category=category,
model=model, **kwargs)
# 如果 ordered_items 不为 None直接使用
if ordered_items is not None:
items = ordered_items
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# 如果是字符串,说明这是位置名称,需要让 Plate 自己创建 Well 对象
# 我们只传递位置信息(键),不传递值,使用 ordering 参数
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 Plate 自己创建 Well 对象
items = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,可以直接使用
items = ordering
ordering_param = None
else:
items = None
ordering_param = None
# 根据情况传递不同的参数
if items is not None:
super().__init__(name, size_x, size_y, size_z,
ordered_items=items,
category=category,
model=model, **kwargs)
elif ordering_param is not None:
# 传递 ordering 参数,让 Plate 自己创建 Well 对象
super().__init__(name, size_x, size_y, size_z,
ordering=ordering_param,
category=category,
model=model, **kwargs)
else:
super().__init__(name, size_x, size_y, size_z,
category=category,
model=model, **kwargs)
self._unilabos_state = {}
if material_info:
@@ -124,8 +186,7 @@ class PRCXI9300Plate(Plate):
safe_state[k] = v
data.update(safe_state)
return data
return data # 其他顶层属性也进行类型检查
class PRCXI9300TipRack(TipRack):
""" 专用吸头盒类 """
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
@@ -135,11 +196,43 @@ class PRCXI9300TipRack(TipRack):
model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
**kwargs):
items = ordered_items if ordered_items is not None else ordering
super().__init__(name, size_x, size_y, size_z,
ordered_items=items,
category=category,
model=model, **kwargs)
# 如果 ordered_items 不为 None直接使用
if ordered_items is not None:
items = ordered_items
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# 如果是字符串,说明这是位置名称,需要让 TipRack 自己创建 Tip 对象
# 我们只传递位置信息(键),不传递值,使用 ordering 参数
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TipRack 自己创建 Tip 对象
items = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,可以直接使用
items = ordering
ordering_param = None
else:
items = None
ordering_param = None
# 根据情况传递不同的参数
if items is not None:
super().__init__(name, size_x, size_y, size_z,
ordered_items=items,
category=category,
model=model, **kwargs)
elif ordering_param is not None:
# 传递 ordering 参数,让 TipRack 自己创建 Tip 对象
super().__init__(name, size_x, size_y, size_z,
ordering=ordering_param,
category=category,
model=model, **kwargs)
else:
super().__init__(name, size_x, size_y, size_z,
category=category,
model=model, **kwargs)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
@@ -235,16 +328,53 @@ class PRCXI9300TubeRack(TubeRack):
category: str = "tube_rack",
items: Optional[Dict[str, Any]] = None,
ordered_items: Optional[OrderedDict] = None,
ordering: Optional[OrderedDict] = None,
model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
**kwargs):
# 兼容处理PLR 的 TubeRack 构造函数可能接受 items 或 ordered_items
items_to_pass = items if items is not None else ordered_items
super().__init__(name, size_x, size_y, size_z,
ordered_items=ordered_items,
model=model,
**kwargs)
# 如果 ordered_items 不为 None直接使用
if ordered_items is not None:
items_to_pass = ordered_items
ordering_param = None
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# 如果是字符串,说明这是位置名称,需要让 TubeRack 自己创建 Tube 对象
# 我们只传递位置信息(键),不传递值,使用 ordering 参数
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象
items_to_pass = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,可以直接使用
items_to_pass = ordering
ordering_param = None
elif items is not None:
# 兼容旧的 items 参数
items_to_pass = items
ordering_param = None
else:
items_to_pass = None
ordering_param = None
# 根据情况传递不同的参数
if items_to_pass is not None:
super().__init__(name, size_x, size_y, size_z,
ordered_items=items_to_pass,
model=model,
**kwargs)
elif ordering_param is not None:
# 传递 ordering 参数,让 TubeRack 自己创建 Tube 对象
super().__init__(name, size_x, size_y, size_z,
ordering=ordering_param,
model=model,
**kwargs)
else:
super().__init__(name, size_x, size_y, size_z,
model=model,
**kwargs)
self._unilabos_state = {}
if material_info:
@@ -375,16 +505,12 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
tablets_info = []
count = 0
for child in deck.children:
child_state = getattr(child, "_unilabos_state", {})
if "Material" in child_state:
count += 1
tablets_info.append(
WorkTablets(
Number=count,
Code=f"T{count}",
Material=child_state["Material"]
if child.children:
if "Material" in child.children[0]._unilabos_state:
number = int(child.name.replace("T", ""))
tablets_info.append(
WorkTablets(Number=number, Code=f"T{number}", Material=child.children[0]._unilabos_state["Material"])
)
)
if is_9320:
print("当前设备是9320")
# 始终初始化 step_mode 属性
@@ -403,7 +529,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
super().post_init(ros_node)
self._unilabos_backend.post_init(ros_node)
def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]):
def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SimpleReturn:
return super().set_liquid(wells, liquid_names, volumes)
def set_group(self, group_name: str, wells: List[Well], volumes: List[float]):
@@ -660,6 +786,37 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0):
return await super().move_to(well, dis_to_top, channel)
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
return await self._unilabos_backend.shaker_action(time, module_no, amplitude, is_wait)
async def heater_action(self, temperature: float, time: int):
return await self._unilabos_backend.heater_action(temperature, time)
async def move_plate(
self,
plate: Plate,
to: Resource,
intermediate_locations: Optional[List[Coordinate]] = None,
pickup_offset: Coordinate = Coordinate.zero(),
destination_offset: Coordinate = Coordinate.zero(),
drop_direction: GripDirection = GripDirection.FRONT,
pickup_direction: GripDirection = GripDirection.FRONT,
pickup_distance_from_top: float = 13.2 - 3.33,
**backend_kwargs,
):
return await super().move_plate(
plate,
to,
intermediate_locations,
pickup_offset,
destination_offset,
drop_direction,
pickup_direction,
pickup_distance_from_top,
target_plate_number = to,
**backend_kwargs,
)
class PRCXI9300Backend(LiquidHandlerBackend):
"""PRCXI 9300 的后端实现,继承自 LiquidHandlerBackend。
@@ -700,6 +857,55 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self._num_channels = channel_num
self._execute_setup = setup
self.debug = debug
self.axis = "Left"
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
step = self.api_client.shaker_action(
time=time,
module_no=module_no,
amplitude=amplitude,
is_wait=is_wait,
)
self.steps_todo_list.append(step)
return step
async def pick_up_resource(self, pickup: ResourcePickup, **backend_kwargs):
resource=pickup.resource
offset=pickup.offset
pickup_distance_from_top=pickup.pickup_distance_from_top
direction=pickup.direction
plate_number = int(resource.parent.name.replace("T", ""))
is_whole_plate = True
balance_height = 0
step = self.api_client.clamp_jaw_pick_up(plate_number, is_whole_plate, balance_height)
self.steps_todo_list.append(step)
return step
async def drop_resource(self, drop: ResourceDrop, **backend_kwargs):
plate_number = None
target_plate_number = backend_kwargs.get("target_plate_number", None)
if target_plate_number is not None:
plate_number = int(target_plate_number.name.replace("T", ""))
is_whole_plate = True
balance_height = 0
if plate_number is None:
raise ValueError("target_plate_number is required when dropping a resource")
step = self.api_client.clamp_jaw_drop(plate_number, is_whole_plate, balance_height)
self.steps_todo_list.append(step)
return step
async def heater_action(self, temperature: float, time: int):
print(f"\n\nHeater action: temperature={temperature}, time={time}\n\n")
# return await self.api_client.heater_action(temperature, time)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
@@ -731,7 +937,11 @@ class PRCXI9300Backend(LiquidHandlerBackend):
print(f"PRCXI9300Backend created solution with ID: {solution_id}")
self.api_client.load_solution(solution_id)
print(json.dumps(self.steps_todo_list, indent=2))
return self.api_client.start()
if not self.api_client.start():
return False
if not self.api_client.wait_for_finish():
return False
return True
@classmethod
def check_channels(cls, use_channels: List[int]) -> List[int]:
@@ -753,7 +963,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
# 清除错误代码
self.api_client.clear_error_code()
print("PRCXI9300 error code cleared.")
self.api_client.call("IAutomation", "Stop")
# 执行重置
print("Starting PRCXI9300 reset...")
self.api_client.call("IAutomation", "Reset")
@@ -777,12 +987,23 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int] = None):
"""Pick up tips from the specified resource."""
# INSERT_YOUR_CODE
# Ensure use_channels is converted to a list of ints if it's an array
if hasattr(use_channels, 'tolist'):
_use_channels = use_channels.tolist()
else:
_use_channels = list(use_channels) if use_channels is not None else None
if _use_channels == [0]:
axis = "Left"
elif _use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
# print(f"Plate index: {plate_index}, Plate name: {plate.name}")
# print(f"Number of children in deck: {len(deck.children)}")
@@ -807,6 +1028,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row = tipspot_index % 8 + 1
step = self.api_client.Load(
axis=axis,
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
@@ -821,13 +1043,23 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def drop_tips(self, ops: List[Drop], use_channels: List[int] = None):
"""Pick up tips from the specified resource."""
if hasattr(use_channels, 'tolist'):
_use_channels = use_channels.tolist()
else:
_use_channels = list(use_channels) if use_channels is not None else None
if _use_channels == [0]:
axis = "Left"
elif _use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
# 检查trash #
if ops[0].resource.name == "trash":
PlateNo = ops[0].resource.parent.children.index(ops[0].resource) + 1
PlateNo = ops[0].resource.parent.parent.children.index(ops[0].resource.parent) + 1
step = self.api_client.UnLoad(
axis=axis,
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
@@ -845,8 +1077,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
raise ValueError(
@@ -870,6 +1102,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row = tipspot_index % 8 + 1
step = self.api_client.UnLoad(
axis=axis,
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
@@ -893,12 +1126,12 @@ class PRCXI9300Backend(LiquidHandlerBackend):
none_keys: List[str] = [],
):
"""Mix liquid in the specified resources."""
plate_indexes = []
for op in targets:
deck = op.parent.parent
deck = op.parent.parent.parent
plate = op.parent
plate_index = deck.children.index(plate)
plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -936,12 +1169,21 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def aspirate(self, ops: List[SingleChannelAspiration], use_channels: List[int] = None):
"""Aspirate liquid from the specified resources."""
if hasattr(use_channels, 'tolist'):
_use_channels = use_channels.tolist()
else:
_use_channels = list(use_channels) if use_channels is not None else None
if _use_channels == [0]:
axis = "Left"
elif _use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -969,6 +1211,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row = tipspot_index % 8 + 1
step = self.api_client.Imbibing(
axis=axis,
dosage=int(volumes[0]),
plate_no=PlateNo,
is_whole_plate=False,
@@ -983,12 +1226,21 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[int] = None):
"""Dispense liquid into the specified resources."""
if hasattr(use_channels, 'tolist'):
_use_channels = use_channels.tolist()
else:
_use_channels = list(use_channels) if use_channels is not None else None
if _use_channels == [0]:
axis = "Left"
elif _use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -1017,6 +1269,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row = tipspot_index % 8 + 1
step = self.api_client.Tapping(
axis=axis,
dosage=int(volumes[0]),
plate_no=PlateNo,
is_whole_plate=False,
@@ -1041,14 +1294,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def dispense96(self, dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer]):
raise NotImplementedError("The Opentrons backend does not support the 96 head.")
async def pick_up_resource(self, pickup: ResourcePickup):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
async def move_picked_up_resource(self, move: ResourceMove):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
async def drop_resource(self, drop: ResourceDrop):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
pass
def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool:
return True # PRCXI9300Backend does not have tip compatibility issues
@@ -1139,6 +1386,28 @@ class PRCXI9300Api:
def start(self) -> bool:
return self.call("IAutomation", "Start")
def wait_for_finish(self) -> bool:
success = False
start = False
while not success:
status = self.step_state_list()
if len(status) == 1:
start = True
if status is None:
break
if len(status) == 0:
break
if status[-1]["State"] == 2 and start:
success = True
elif status[-1]["State"] > 2:
break
elif status[-1]["State"] == 0:
start = True
else:
time.sleep(1)
return success
def call(self, service: str, method: str, params: Optional[list] = None) -> Any:
payload = json.dumps(
{"ServiceName": service, "MethodName": method, "Paramters": params or []}, separators=(",", ":")
@@ -1225,9 +1494,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "Load",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1263,9 +1533,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "Imbibing",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1301,9 +1572,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "Tapping",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1339,9 +1611,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "Blending",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1377,9 +1650,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "UnLoad",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1398,6 +1672,50 @@ class PRCXI9300Api:
"LiquidDispensingMethod": liquid_method,
}
def clamp_jaw_pick_up(self,
plate_no: int,
is_whole_plate: bool,
balance_height: int,
) -> Dict[str, Any]:
return {
"StepAxis": "ClampingJaw",
"Function": "DefectiveLift",
"PlateNo": plate_no,
"IsWholePlate": is_whole_plate,
"HoleRow": 1,
"HoleCol": 1,
"BalanceHeight": balance_height,
"PlateOrHoleNum": f"T{plate_no}"
}
def clamp_jaw_drop(
self,
plate_no: int,
is_whole_plate: bool,
balance_height: int,
) -> Dict[str, Any]:
return {
"StepAxis": "ClampingJaw",
"Function": "PutDown",
"PlateNo": plate_no,
"IsWholePlate": is_whole_plate,
"HoleRow": 1,
"HoleCol": 1,
"BalanceHeight": balance_height,
"PlateOrHoleNum": f"T{plate_no}"
}
def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
return {
"StepAxis": "Left",
"Function": "Shaking",
"AssistFun1": time,
"AssistFun2": module_no,
"AssistFun3": amplitude,
"AssistFun4": is_wait,
}
class DefaultLayout: