夹爪添加

This commit is contained in:
zhangshixiang
2025-12-23 14:54:36 +08:00
parent 44fc80c70f
commit 3ad20c85a5
3 changed files with 41149 additions and 35 deletions

View File

@@ -6,6 +6,7 @@ import os
import socket
import time
from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal
from pylabrobot.liquid_handling.standard import GripDirection
from pylabrobot.liquid_handling import (
LiquidHandlerBackend,
@@ -27,7 +28,7 @@ from pylabrobot.liquid_handling.standard import (
ResourceMove,
ResourceDrop,
)
from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash
from pylabrobot.resources import ResourceHolder, ResourceStack, Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract, SimpleReturn
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
@@ -153,11 +154,12 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
tablets_info = []
count = 0
for child in deck.children:
if "Material" in child._unilabos_state:
count += 1
tablets_info.append(
WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"])
)
if child.children:
if "Material" in child.children[0]._unilabos_state:
number = int(child.name.replace("T", ""))
tablets_info.append(
WorkTablets(Number=number, Code=f"T{number}", Material=child.children[0]._unilabos_state["Material"])
)
if is_9320:
print("当前设备是9320")
# 始终初始化 step_mode 属性
@@ -433,12 +435,46 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0):
return await super().move_to(well, dis_to_top, channel)
async def shaker_action(self, time: int, frequency: float):
return await self._unilabos_backend.shaker_action(time, frequency)
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
return await self._unilabos_backend.shaker_action(time, module_no, amplitude, is_wait)
async def heater_action(self, temperature: float, time: int):
return await self._unilabos_backend.heater_action(temperature, time)
return await self._unilabos_backend.heater_action(temperature, time)
async def move_plate(
self,
plate: Plate,
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
intermediate_locations: Optional[List[Coordinate]] = None,
pickup_offset: Coordinate = Coordinate.zero(),
destination_offset: Coordinate = Coordinate.zero(),
drop_direction: GripDirection = GripDirection.FRONT,
pickup_direction: GripDirection = GripDirection.FRONT,
pickup_distance_from_top: float = 13.2 - 3.33,
**backend_kwargs,
):
if self._simulator:
return await self._simulate_handler.move_plate(
plate,
to,
intermediate_locations,
pickup_offset,
destination_offset,
drop_direction,
pickup_direction,
pickup_distance_from_top,
**backend_kwargs,
)
return await super().move_plate(
plate,
to,
intermediate_locations,
pickup_offset,
destination_offset,
drop_direction,
pickup_direction,
pickup_distance_from_top,
**backend_kwargs,
)
class PRCXI9300Backend(LiquidHandlerBackend):
"""PRCXI 9300 的后端实现,继承自 LiquidHandlerBackend。
@@ -482,9 +518,42 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.debug = debug
self.axis = "Left"
async def shaker_action(self, time: int, frequency: float):
print(f"\n\nShaker action: time={time}, frequency={frequency}\n\n")
# return await self.api_client.shaker_action(time, frequency)
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
step = self.api_client.shaker_action(
time=time,
module_no=module_no,
amplitude=amplitude,
is_wait=is_wait,
)
self.steps_todo_list.append(step)
return step
async def pick_up_resource(self, pickup: ResourcePickup):
resource=pickup.resource
offset=pickup.offset
pickup_distance_from_top=pickup.pickup_distance_from_top
direction=pickup.direction
plate_number = int(resource.parent.name.replace("T", ""))
is_whole_plate = True
balance_height = 0
step = self.api_client.clamp_jaw_pick_up(plate_number, is_whole_plate, balance_height)
self.steps_todo_list.append(step)
return step
async def drop_resource(self, drop: ResourceDrop):
resource=drop.resource
plate_number = int(resource.name.replace("T", ""))
is_whole_plate = True
balance_height = drop.pickup_distance_from_top
step = self.api_client.clamp_jaw_drop(plate_number, is_whole_plate, balance_height)
self.steps_todo_list.append(step)
return step
async def heater_action(self, temperature: float, time: int):
print(f"\n\nHeater action: temperature={temperature}, time={time}\n\n")
@@ -585,8 +654,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
# print(f"Plate index: {plate_index}, Plate name: {plate.name}")
# print(f"Number of children in deck: {len(deck.children)}")
@@ -639,7 +708,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
# 检查trash #
if ops[0].resource.name == "trash":
PlateNo = ops[0].resource.parent.children.index(ops[0].resource) + 1
PlateNo = ops[0].resource.parent.parent.children.index(ops[0].resource.parent) + 1
step = self.api_client.UnLoad(
axis=axis,
@@ -660,8 +729,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
raise ValueError(
@@ -712,9 +781,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in targets:
deck = op.parent.parent
deck = op.parent.parent.parent
plate = op.parent
plate_index = deck.children.index(plate)
plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -765,8 +834,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -822,8 +891,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -877,14 +946,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def dispense96(self, dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer]):
raise NotImplementedError("The Opentrons backend does not support the 96 head.")
async def pick_up_resource(self, pickup: ResourcePickup):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
async def move_picked_up_resource(self, move: ResourceMove):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
async def drop_resource(self, drop: ResourceDrop):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
pass
def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool:
return True # PRCXI9300Backend does not have tip compatibility issues
@@ -957,6 +1020,8 @@ class PRCXI9300Api:
start = False
while not success:
status = self.step_state_list()
if len(status) == 1:
start = True
if status is None:
break
if len(status) == 0:
@@ -1236,6 +1301,50 @@ class PRCXI9300Api:
"LiquidDispensingMethod": liquid_method,
}
def clamp_jaw_pick_up(self,
plate_no: int,
is_whole_plate: bool,
balance_height: int,
) -> Dict[str, Any]:
return {
"StepAxis": "ClampingJaw",
"Function": "DefectiveLift",
"PlateNo": plate_no,
"IsWholePlate": is_whole_plate,
"HoleRow": 1,
"HoleCol": 1,
"BalanceHeight": balance_height,
"PlateOrHoleNum": f"T{plate_no}"
}
def clamp_jaw_drop(
self,
plate_no: int,
is_whole_plate: bool,
balance_height: int,
) -> Dict[str, Any]:
return {
"StepAxis": "ClampingJaw",
"Function": "PutDown",
"PlateNo": plate_no,
"IsWholePlate": is_whole_plate,
"HoleRow": 1,
"HoleCol": 1,
"BalanceHeight": balance_height,
"PlateOrHoleNum": f"T{plate_no}"
}
def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
return {
"StepAxis": "Left",
"Function": "Shaking",
"AssistFun1": time,
"AssistFun2": module_no,
"AssistFun3": amplitude,
"AssistFun4": is_wait,
}
class DefaultLayout: