更新中析仪器,以及启动示例

This commit is contained in:
Xuwznln
2025-07-06 18:39:40 +08:00
parent bef44b2293
commit ce8667f937
7 changed files with 1577 additions and 454 deletions

View File

@@ -183,7 +183,7 @@ class LiquidHandlerAbstract(LiquidHandler):
spread: Literal["wide", "tight", "custom"] = "wide",
is_96_well: bool = False,
mix_stage: Optional[Literal["none", "before", "after", "both"]] = "none",
mix_times: Optional[List(int)] = None,
mix_times: Optional[List[int]] = None,
mix_vol: Optional[int] = None,
mix_rate: Optional[int] = None,
mix_liquid_height: Optional[float] = None,

View File

@@ -1,17 +1,456 @@
import socket, json, contextlib
from typing import Any, List, Dict, Optional
import collections
import contextlib
import json
import socket
import time
from typing import Any, List, Dict, Optional, TypedDict, Union, Sequence, Iterator, Literal
from pylabrobot.liquid_handling import (
LiquidHandlerBackend,
Pickup,
SingleChannelAspiration,
Drop,
SingleChannelDispense,
PickupTipRack,
DropTipRack,
MultiHeadAspirationPlate,
)
from pylabrobot.liquid_handling.standard import (
MultiHeadAspirationContainer,
MultiHeadDispenseContainer,
MultiHeadDispensePlate,
ResourcePickup,
ResourceMove,
ResourceDrop,
)
from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
class PRCXIError(RuntimeError):
"""Lilith 返回 Success=false 时抛出的业务异常"""
class PRCXI9300:
class Material(TypedDict): # 和Plate同关系
uuid: str
Code: Optional[str]
Name: Optional[str]
SummaryName: Optional[str]
PipetteHeight: Optional[int]
materialEnum: Optional[int]
def __init__(self, host: str = "127.0.0.1", port: int = 9999,
timeout: float = 10.0) -> None:
class WorkTablets(TypedDict):
Number: int
Code: str
Material: Dict[str, Any]
class MatrixInfo(TypedDict):
MatrixId: str
MatrixName: str
MatrixCount: int
WorkTablets: list[WorkTablets]
class PRCXI9300Deck(Deck):
"""PRCXI 9300 的专用 Deck 类,继承自 Deck。
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float):
super().__init__(name, size_x, size_y, size_z)
self.slots = [None] * 6 # PRCXI 9300 有 6 个槽位
class PRCXI9300Container(Plate):
"""PRCXI 9300 的专用 Deck 类,继承自 Deck。
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str):
super().__init__(name, size_x, size_y, size_z, category=category, ordering=collections.OrderedDict())
self._unilabos_state = {}
def load_state(self, state: Dict[str, Any]) -> None:
"""从给定的状态加载工作台信息。"""
super().load_state(state)
self._unilabos_state = state
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
data = super().serialize_state()
data.update(self._unilabos_state)
return data
class PRCXI9300Handler(LiquidHandlerAbstract):
def __init__(self, deck: Deck, host: str, port: int, timeout: float, setup=True):
tablets_info = []
count = 0
for child in deck.children:
if "Material" in child._unilabos_state:
count += 1
tablets_info.append(
WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"])
)
self._unilabos_backend = PRCXI9300Backend(tablets_info, host, port, timeout, setup)
super().__init__(backend=self._unilabos_backend, deck=deck)
async def create_protocol(
self,
protocol_name: str,
protocol_description: str,
protocol_version: str,
protocol_author: str,
protocol_date: str,
protocol_type: str,
none_keys: List[str] = [],
):
self._unilabos_backend.create_protocol(protocol_name)
async def run_protocol(self):
self._unilabos_backend.run_protocol()
async def remove_liquid(
self,
vols: List[float],
sources: Sequence[Container],
waste_liquid: Optional[Container] = None,
*,
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Optional[List[Coordinate]] = None,
liquid_height: Optional[List[Optional[float]]] = None,
blow_out_air_volume: Optional[List[Optional[float]]] = None,
spread: Optional[Literal["wide", "tight", "custom"]] = "wide",
delays: Optional[List[int]] = None,
is_96_well: Optional[bool] = False,
top: Optional[List[float]] = None,
none_keys: List[str] = [],
):
return await super().remove_liquid(
vols,
sources,
waste_liquid,
use_channels=use_channels,
flow_rates=flow_rates,
offsets=offsets,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
delays=delays,
is_96_well=is_96_well,
top=top,
none_keys=none_keys,
)
async def add_liquid(
self,
asp_vols: Union[List[float], float],
dis_vols: Union[List[float], float],
reagent_sources: Sequence[Container],
targets: Sequence[Container],
*,
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Optional[List[Coordinate]] = None,
liquid_height: Optional[List[Optional[float]]] = None,
blow_out_air_volume: Optional[List[Optional[float]]] = None,
spread: Optional[Literal["wide", "tight", "custom"]] = "wide",
is_96_well: bool = False,
delays: Optional[List[int]] = None,
mix_time: Optional[int] = None,
mix_vol: Optional[int] = None,
mix_rate: Optional[int] = None,
mix_liquid_height: Optional[float] = None,
none_keys: List[str] = [],
):
return await super().add_liquid(
asp_vols,
dis_vols,
reagent_sources,
targets,
use_channels=use_channels,
flow_rates=flow_rates,
offsets=offsets,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
is_96_well=is_96_well,
delays=delays,
mix_time=mix_time,
mix_vol=mix_vol,
mix_rate=mix_rate,
mix_liquid_height=mix_liquid_height,
none_keys=none_keys,
)
async def transfer_liquid(
self,
sources: Sequence[Container],
targets: Sequence[Container],
tip_racks: Sequence[TipRack],
*,
use_channels: Optional[List[int]] = None,
asp_vols: Union[List[float], float],
dis_vols: Union[List[float], float],
asp_flow_rates: Optional[List[Optional[float]]] = None,
dis_flow_rates: Optional[List[Optional[float]]] = None,
offsets: Optional[List[Coordinate]] = None,
touch_tip: bool = False,
liquid_height: Optional[List[Optional[float]]] = None,
blow_out_air_volume: Optional[List[Optional[float]]] = None,
spread: Literal["wide", "tight", "custom"] = "wide",
is_96_well: bool = False,
mix_stage: Optional[Literal["none", "before", "after", "both"]] = "none",
mix_times: Optional[List[int]] = None,
mix_vol: Optional[int] = None,
mix_rate: Optional[int] = None,
mix_liquid_height: Optional[float] = None,
delays: Optional[List[int]] = None,
none_keys: List[str] = [],
):
return await super().transfer_liquid(
sources,
targets,
tip_racks,
use_channels=use_channels,
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
touch_tip=touch_tip,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
is_96_well=is_96_well,
mix_stage=mix_stage,
mix_times=mix_times,
mix_vol=mix_vol,
mix_rate=mix_rate,
mix_liquid_height=mix_liquid_height,
delays=delays,
none_keys=none_keys,
)
async def custom_delay(self, seconds=0, msg=None):
return await super().custom_delay(seconds, msg)
async def touch_tip(self, targets: Sequence[Container]):
return await super().touch_tip(targets)
async def mix(
self,
targets: Sequence[Container],
mix_time: int = None,
mix_vol: Optional[int] = None,
height_to_bottom: Optional[float] = None,
offsets: Optional[Coordinate] = None,
mix_rate: Optional[float] = None,
none_keys: List[str] = [],
):
return await super().mix(targets, mix_time, mix_vol, height_to_bottom, offsets, mix_rate, none_keys)
def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]:
return super().iter_tips(tip_racks)
def set_tiprack(self, tip_racks: Sequence[TipRack]):
super().set_tiprack(tip_racks)
async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0):
return await super().move_to(well, dis_to_top, channel)
class PRCXI9300Backend(LiquidHandlerBackend):
"""PRCXI 9300 的后端实现,继承自 LiquidHandlerBackend。
该类提供了与 PRCXI 9300 设备进行通信的基本方法,包括方案管理、自动化控制、运行状态查询等。
"""
_num_channels = 8 # 默认通道数为 8
matrix_info: MatrixInfo
protocol_name: str
steps_todo_list = []
def __init__(
self,
tablets_info: list[WorkTablets],
host: str = "127.0.0.1",
port: int = 9999,
timeout: float = 10.0,
setup=True,
) -> None:
super().__init__()
self.tablets_info = tablets_info
self.api_client = PRCXI9300Api(host, port, timeout)
self.host, self.port, self.timeout = host, port, timeout
self._num_channels = 8
self._execute_setup = setup
def create_protocol(self, protocol_name):
self.protocol_name = protocol_name
self.steps_todo_list = []
def run_protocol(self):
run_time = time.time()
self.matrix_info = MatrixInfo(
MatrixId=f"matrix_{run_time}",
MatrixName=f"protocol_{run_time}",
MatrixCount=len(self.tablets_info),
WorkTablets=self.tablets_info,
)
self.api_client.add_WorkTablet_Matrix(self.matrix_info)
solution_id = self.api_client.add_solution(
f"protocol_{run_time}", self.matrix_info["MatrixId"], self.steps_todo_list
)
self.api_client.load_solution(solution_id)
self.api_client.start()
@classmethod
def check_channels(cls, use_channels: List[int]) -> List[int]:
"""检查通道是否符合要求PRCXI9300Backend 只支持所有 8 个通道。"""
if use_channels != [0, 1, 2, 3, 4, 5, 6, 7]:
print("PRCXI9300Backend only supports all 8 channels, using default [0, 1, 2, 3, 4, 5, 6, 7].")
return [0, 1, 2, 3, 4, 5, 6, 7]
return use_channels
async def setup(self):
await super().setup()
try:
if self._execute_setup:
self.api_client.call("IAutomation", "Reset")
except ConnectionRefusedError as e:
raise RuntimeError(
f"Failed to connect to PRCXI9300 API at {self.host}:{self.port}. "
"Please ensure the PRCXI9300 service is running."
) from e
async def stop(self):
self.api_client.call("IAutomation", "Stop")
async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int] = None):
"""Pick up tips from the specified resource."""
# 12列要PickUp A-H 1
# PlateNo = 1-6 # 2行3列
PlateNo = 1 # 第一块板
hole_col = 2 # 第二列的8个孔
step = self.api_client.Load(
"Left",
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
hole_row=1,
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)
print("PRCXI9300Backend pick_up_tips logged.")
async def drop_tips(self, ops: List[Drop], use_channels: List[int] = None):
PlateNo = 1
hole_col = 2
step = self.api_client.UnLoad(
"Left",
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
hole_row=1,
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
allow_drop = False
for s in self.steps_todo_list:
if s.get("Function") == "Load":
self.steps_todo_list.append(step)
allow_drop = True
break
if not allow_drop:
raise ValueError("No matching Load step found for drop_tips.")
print("PRCXI9300Backend drop_tips logged.")
async def aspirate(self, ops: List[SingleChannelAspiration], use_channels: List[int] = None):
volumes = [op.volume for op in ops]
print(f"PRCXI9300Backend aspirate volumes: {volumes[0]} -> {int(volumes[0])} μL")
PlateNo = 1
hole_col = 2
step = self.api_client.Imbibing(
"Left",
dosage=int(volumes[0]),
plate_no=PlateNo,
is_whole_plate=False,
hole_row=1,
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)
async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[int] = None):
volumes = [op.volume for op in ops]
print(f"PRCXI9300Backend dispense volumes: {volumes[0]} -> {int(volumes[0])} μL")
PlateNo = 1
hole_col = 2
step = self.api_client.Tapping(
"Left",
dosage=int(volumes[0]),
plate_no=PlateNo,
is_whole_plate=False,
hole_row=1,
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)
async def pick_up_tips96(self, pickup: PickupTipRack):
raise NotImplementedError("The PRCXI backend does not support the 96 head.")
async def drop_tips96(self, drop: DropTipRack):
raise NotImplementedError("The PRCXI backend does not support the 96 head.")
async def aspirate96(self, aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer]):
raise NotImplementedError("The Opentrons backend does not support the 96 head.")
async def dispense96(self, dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer]):
raise NotImplementedError("The Opentrons backend does not support the 96 head.")
async def pick_up_resource(self, pickup: ResourcePickup):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
async def move_picked_up_resource(self, move: ResourceMove):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
async def drop_resource(self, drop: ResourceDrop):
raise NotImplementedError("The Opentrons backend does not support the robotic arm.")
def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool:
return True # PRCXI9300Backend does not have tip compatibility issues
def serialize(self) -> dict:
raise NotImplementedError()
@property
def num_channels(self) -> int:
return self._num_channels
class PRCXI9300Api:
def __init__(self, host: str = "127.0.0.1", port: int = 9999, timeout: float = 10.0) -> None:
self.host, self.port, self.timeout = host, port, timeout
@staticmethod
def _len_prefix(n: int) -> bytes:
@@ -30,17 +469,30 @@ class PRCXI9300:
if not chunk:
break
if first:
chunk, first = chunk[8:], False
chunk, first = chunk[8:], False
chunks.append(chunk)
return b"".join(chunks).decode()
def _call(self, service: str, method: str,
params: Optional[list] = None) -> Any:
# ---------------------------------------------------- 方案相关ISolution
def list_solutions(self) -> List[Dict[str, Any]]:
"""GetSolutionList"""
return self.call("ISolution", "GetSolutionList")
def load_solution(self, solution_id: str) -> bool:
"""LoadSolution"""
return self.call("ISolution", "LoadSolution", [solution_id])
def add_solution(self, name: str, matrix_id: str, steps: List[Dict[str, Any]]) -> str:
"""AddSolution → 返回新方案 GUID"""
return self.call("ISolution", "AddSolution", [name, matrix_id, steps])
# ---------------------------------------------------- 自动化控制IAutomation
def start(self) -> bool:
return self.call("IAutomation", "Start")
def call(self, service: str, method: str, params: Optional[list] = None) -> Any:
payload = json.dumps(
{"ServiceName": service,
"MethodName": method,
"Paramters": params or []},
separators=(",", ":")
{"ServiceName": service, "MethodName": method, "Paramters": params or []}, separators=(",", ":")
)
resp = json.loads(self._raw_request(payload))
if not resp.get("Success", False):
@@ -51,87 +503,53 @@ class PRCXI9300:
except (TypeError, json.JSONDecodeError):
return data
# ---------------------------------------------------- 方案相关ISolution
def list_solutions(self) -> List[Dict[str, Any]]:
"""GetSolutionList"""
return self._call("ISolution", "GetSolutionList")
def load_solution(self, solution_id: str) -> bool:
"""LoadSolution"""
return self._call("ISolution", "LoadSolution", [solution_id])
def add_solution(self, name: str, matrix_id: str,
steps: List[Dict[str, Any]]) -> str:
"""AddSolution → 返回新方案 GUID"""
return self._call("ISolution", "AddSolution",
[name, matrix_id, steps])
# ---------------------------------------------------- 自动化控制IAutomation
def start(self) -> bool:
return self._call("IAutomation", "Start")
def stop(self) -> bool:
"""Stop"""
return self._call("IAutomation", "Stop")
def reset(self) -> bool:
"""Reset"""
return self._call("IAutomation", "Reset")
def pause(self) -> bool:
"""Pause"""
return self._call("IAutomation", "Pause")
return self.call("IAutomation", "Pause")
def resume(self) -> bool:
"""Resume"""
return self._call("IAutomation", "Resume")
return self.call("IAutomation", "Resume")
def get_error_code(self) -> Optional[str]:
"""GetErrorCode"""
return self._call("IAutomation", "GetErrorCode")
return self.call("IAutomation", "GetErrorCode")
def clear_error_code(self) -> bool:
"""RemoveErrorCodet"""
return self._call("IAutomation", "RemoveErrorCodet")
return self.call("IAutomation", "RemoveErrorCodet")
# ---------------------------------------------------- 运行状态IMachineState
def step_state_list(self) -> List[Dict[str, Any]]:
"""GetStepStateList"""
return self._call("IMachineState", "GetStepStateList")
return self.call("IMachineState", "GetStepStateList")
def step_status(self, seq_num: int) -> Dict[str, Any]:
"""GetStepStatus"""
return self._call("IMachineState", "GetStepStatus", [seq_num])
return self.call("IMachineState", "GetStepStatus", [seq_num])
def step_state(self, seq_num: int) -> Dict[str, Any]:
"""GetStepState"""
return self._call("IMachineState", "GetStepState", [seq_num])
return self.call("IMachineState", "GetStepState", [seq_num])
def axis_location(self, axis_num: int = 1) -> Dict[str, Any]:
"""GetLocation"""
return self._call("IMachineState", "GetLocation", [axis_num])
return self.call("IMachineState", "GetLocation", [axis_num])
# ---------------------------------------------------- 版位矩阵IMatrix
def list_matrices(self) -> List[Dict[str, Any]]:
"""GetWorkTabletMatrices"""
return self._call("IMatrix", "GetWorkTabletMatrices")
return self.call("IMatrix", "GetWorkTabletMatrices")
def matrix_by_id(self, matrix_id: str) -> Dict[str, Any]:
"""GetWorkTabletMatrixById"""
return self._call("IMatrix", "GetWorkTabletMatrixById", [matrix_id])
def add_WorkTablet_Matrix(self,matrix):
return self._call("IMatrix", "AddWorkTabletMatrix", [matrix])
return self.call("IMatrix", "GetWorkTabletMatrixById", [matrix_id])
# ---------------------------------------------------- 一键运行
def run_solution(self, solution_id: str, channel_idx: int = 1) -> None:
self.load_solution(solution_id)
self.start(channel_idx)
# ---------------------------------------------------- 单点动作
def add_WorkTablet_Matrix(self, matrix: MatrixInfo):
return self.call("IMatrix", "AddWorkTabletMatrix", [matrix])
def Load(
self,
self,
axis: str,
dosage: int,
plate_no: int,
@@ -147,7 +565,7 @@ class PRCXI9300:
assist_fun3: str = "",
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense"
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
return {
"StepAxis": axis,
@@ -166,11 +584,11 @@ class PRCXI9300:
"AssistFun4": assist_fun4,
"AssistFun5": assist_fun5,
"HoleNumbers": hole_numbers,
"LiquidDispensingMethod": liquid_method
"LiquidDispensingMethod": liquid_method,
}
def Imbibing(
self,
self,
axis: str,
dosage: int,
plate_no: int,
@@ -186,7 +604,7 @@ class PRCXI9300:
assist_fun3: str = "",
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense"
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
return {
"StepAxis": axis,
@@ -205,12 +623,11 @@ class PRCXI9300:
"AssistFun4": assist_fun4,
"AssistFun5": assist_fun5,
"HoleNumbers": hole_numbers,
"LiquidDispensingMethod": liquid_method
"LiquidDispensingMethod": liquid_method,
}
def Tapping(
self,
self,
axis: str,
dosage: int,
plate_no: int,
@@ -226,7 +643,7 @@ class PRCXI9300:
assist_fun3: str = "",
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense"
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
return {
"StepAxis": axis,
@@ -245,12 +662,11 @@ class PRCXI9300:
"AssistFun4": assist_fun4,
"AssistFun5": assist_fun5,
"HoleNumbers": hole_numbers,
"LiquidDispensingMethod": liquid_method
"LiquidDispensingMethod": liquid_method,
}
def Blending(
self,
self,
axis: str,
dosage: int,
plate_no: int,
@@ -266,7 +682,7 @@ class PRCXI9300:
assist_fun3: str = "",
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense"
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
return {
"StepAxis": axis,
@@ -285,11 +701,11 @@ class PRCXI9300:
"AssistFun4": assist_fun4,
"AssistFun5": assist_fun5,
"HoleNumbers": hole_numbers,
"LiquidDispensingMethod": liquid_method
"LiquidDispensingMethod": liquid_method,
}
def UnLoad(
self,
self,
axis: str,
dosage: int,
plate_no: int,
@@ -305,7 +721,7 @@ class PRCXI9300:
assist_fun3: str = "",
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense"
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
return {
"StepAxis": axis,
@@ -324,5 +740,5 @@ class PRCXI9300:
"AssistFun4": assist_fun4,
"AssistFun5": assist_fun5,
"HoleNumbers": hole_numbers,
"LiquidDispensingMethod": liquid_method
"LiquidDispensingMethod": liquid_method,
}