添加切换枪头方法,添加mock振荡与加热方法

This commit is contained in:
zhangshixiang
2025-12-16 11:19:13 +08:00
parent 8ba911bb55
commit 44fc80c70f
3 changed files with 150 additions and 26 deletions

View File

@@ -433,6 +433,12 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0):
return await super().move_to(well, dis_to_top, channel)
async def shaker_action(self, time: int, frequency: float):
return await self._unilabos_backend.shaker_action(time, frequency)
async def heater_action(self, temperature: float, time: int):
return await self._unilabos_backend.heater_action(temperature, time)
class PRCXI9300Backend(LiquidHandlerBackend):
"""PRCXI 9300 的后端实现,继承自 LiquidHandlerBackend。
@@ -474,6 +480,15 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self._num_channels = channel_num
self._execute_setup = setup
self.debug = debug
self.axis = "Left"
async def shaker_action(self, time: int, frequency: float):
print(f"\n\nShaker action: time={time}, frequency={frequency}\n\n")
# return await self.api_client.shaker_action(time, frequency)
async def heater_action(self, temperature: float, time: int):
print(f"\n\nHeater action: temperature={temperature}, time={time}\n\n")
# return await self.api_client.heater_action(temperature, time)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
@@ -555,7 +570,18 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int] = None):
"""Pick up tips from the specified resource."""
# INSERT_YOUR_CODE
# Ensure use_channels is converted to a list of ints if it's an array
if hasattr(use_channels, 'tolist'):
_use_channels = use_channels.tolist()
else:
_use_channels = list(use_channels) if use_channels is not None else None
if _use_channels == [0]:
axis = "Left"
elif _use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
for op in ops:
plate = op.resource.parent
@@ -585,6 +611,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row = tipspot_index % 8 + 1
step = self.api_client.Load(
axis=axis,
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
@@ -599,13 +626,23 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def drop_tips(self, ops: List[Drop], use_channels: List[int] = None):
"""Pick up tips from the specified resource."""
if hasattr(use_channels, 'tolist'):
_use_channels = use_channels.tolist()
else:
_use_channels = list(use_channels) if use_channels is not None else None
if _use_channels == [0]:
axis = "Left"
elif _use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
# 检查trash #
if ops[0].resource.name == "trash":
PlateNo = ops[0].resource.parent.children.index(ops[0].resource) + 1
step = self.api_client.UnLoad(
axis=axis,
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
@@ -648,6 +685,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row = tipspot_index % 8 + 1
step = self.api_client.UnLoad(
axis=axis,
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
@@ -671,7 +709,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
none_keys: List[str] = [],
):
"""Mix liquid in the specified resources."""
plate_indexes = []
for op in targets:
deck = op.parent.parent
@@ -714,7 +752,16 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def aspirate(self, ops: List[SingleChannelAspiration], use_channels: List[int] = None):
"""Aspirate liquid from the specified resources."""
if hasattr(use_channels, 'tolist'):
_use_channels = use_channels.tolist()
else:
_use_channels = list(use_channels) if use_channels is not None else None
if _use_channels == [0]:
axis = "Left"
elif _use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
for op in ops:
plate = op.resource.parent
@@ -747,6 +794,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row = tipspot_index % 8 + 1
step = self.api_client.Imbibing(
axis=axis,
dosage=int(volumes[0]),
plate_no=PlateNo,
is_whole_plate=False,
@@ -761,7 +809,16 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[int] = None):
"""Dispense liquid into the specified resources."""
if hasattr(use_channels, 'tolist'):
_use_channels = use_channels.tolist()
else:
_use_channels = list(use_channels) if use_channels is not None else None
if _use_channels == [0]:
axis = "Left"
elif _use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
for op in ops:
plate = op.resource.parent
@@ -795,6 +852,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row = tipspot_index % 8 + 1
step = self.api_client.Tapping(
axis=axis,
dosage=int(volumes[0]),
plate_no=PlateNo,
is_whole_plate=False,
@@ -1000,9 +1058,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "Load",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1038,9 +1097,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "Imbibing",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1076,9 +1136,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "Tapping",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1114,9 +1175,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "Blending",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1152,9 +1214,10 @@ class PRCXI9300Api:
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
axis: str = "Left",
) -> Dict[str, Any]:
return {
"StepAxis": self.axis,
"StepAxis": axis,
"Function": "UnLoad",
"DosageNum": dosage,
"PlateNo": plate_no,