Compare commits

...

6 Commits

Author SHA1 Message Date
Guangxin Zhang
41c130ef54 Merge branch 'dev' of https://github.com/dptech-corp/Uni-Lab-OS into dev 2025-07-17 23:03:01 +08:00
Guangxin Zhang
2d30fb79c1 update 2025-07-17 23:02:08 +08:00
Xuwznln
743ec8839d get_well_container&get_tip_rack 2025-07-17 23:01:30 +08:00
Guangxin Zhang
3f7b991dd9 Update 9320 2025-07-17 22:36:45 +08:00
Xuwznln
0466b57e0a 更新axis等参数 2025-07-17 21:53:25 +08:00
Guangxin Zhang
aaf33a8878 Update PRCXI 2025-07-17 20:54:11 +08:00
12 changed files with 10275 additions and 195 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -63,4 +63,5 @@ dependencies:
# ilab equipments
# - ros-humble-unilabos-msgs
- pip:
- paho-mqtt
- paho-mqtt
- opentrons_shared_data

View File

@@ -62,4 +62,5 @@ dependencies:
# ilab equipments
# - ros-humble-unilabos-msgs
- pip:
- paho-mqtt
- paho-mqtt
- opentrons_shared_data

View File

@@ -65,4 +65,5 @@ dependencies:
# ilab equipments
# - ros-humble-unilabos-msgs
- pip:
- paho-mqtt
- paho-mqtt
- opentrons_shared_data

View File

@@ -66,6 +66,7 @@ dependencies:
#- crcmod
- pip:
- paho-mqtt
- opentrons_shared_data
# driver
#- ur-rtde # set PYTHONUTF8=1
#- pyautogui

View File

@@ -39,6 +39,7 @@ def job_add(req: JobAddReq) -> JobData:
HostNode.get_instance().send_goal(req.device_id, action_type=action_type, action_name=action_name, action_kwargs=action_args, goal_uuid=req.job_id, server_info=req.server_info)
except Exception as e:
for bridge in HostNode.get_instance().bridges:
traceback.print_exc()
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status({}, req.job_id, "failed", serialize_result_info(traceback.format_exc(), False, {}))
return JobData(jobId=req.job_id)

View File

@@ -27,10 +27,10 @@ from pylabrobot.resources import (
class LiquidHandlerMiddleware(LiquidHandler):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8):
self._simulator = simulator
if simulator:
self._simulate_backend = LiquidHandlerChatterboxBackend(8)
self._simulate_backend = LiquidHandlerChatterboxBackend(channel_num)
self._simulate_handler = LiquidHandlerAbstract(self._simulate_backend, deck, False)
super().__init__(backend, deck)
@@ -533,7 +533,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
"""Extended LiquidHandler with additional operations."""
support_touch_tip = True
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool, channel_num:int = 8):
"""Initialize a LiquidHandler.
Args:
@@ -541,7 +541,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
deck: Deck to use.
"""
self._simulator = simulator
super().__init__(backend, deck, simulator)
super().__init__(backend, deck, simulator, channel_num)
# ---------------------------------------------------------------
# REMOVE LIQUID --------------------------------------------------
@@ -584,7 +584,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
pass # This mode is not verified.
else:
# 首先应该对任务分组然后每次1个/8个进行操作处理
if len(use_channels) == 1:
if len(use_channels) == 1 and self.backend.num_channels == 1:
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
@@ -614,7 +614,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
spread=spread,
)
await self.discard_tips()
elif len(use_channels) == 8:
elif len(use_channels) == 8 and self.backend.num_channels == 8:
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
@@ -715,8 +715,8 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.pick_up_tips(tip)
for _ in range(len(targets)):
await self.aspirate(
resources=reagent_sources,
@@ -740,6 +740,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
liquid_height=[liquid_height[1]] if liquid_height else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
await self.mix(
@@ -817,7 +818,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets)
await self.discard_tips()
except Exception as e:
traceback.print_exc()

View File

@@ -66,14 +66,14 @@ class PRCXI9300Deck(Deck):
self.slots = [None] * 6 # PRCXI 9300 有 6 个槽位
class PRCXI9300Container(Plate):
class PRCXI9300Container(Plate, TipRack):
"""PRCXI 9300 的专用 Deck 类,继承自 Deck。
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str):
super().__init__(name, size_x, size_y, size_z, category=category, ordering=collections.OrderedDict())
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str, ordering: collections.OrderedDict, model: Optional[str] = None,):
super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model)
self._unilabos_state = {}
def load_state(self, state: Dict[str, Any]) -> None:
@@ -93,8 +93,11 @@ class PRCXI9300Trash(Trash):
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str):
super().__init__(name, size_x, size_y, size_z, category=category)
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str, **kwargs):
if name != "trash":
name = "trash"
print("PRCXI9300Trash name must be 'trash', using 'trash' instead.")
super().__init__(name, size_x, size_y, size_z, category=category, **kwargs)
self._unilabos_state = {}
def load_state(self, state: Dict[str, Any]) -> None:
@@ -109,6 +112,7 @@ class PRCXI9300Trash(Trash):
class PRCXI9300Handler(LiquidHandlerAbstract):
support_touch_tip = False
@property
def reset_ok(self) -> bool:
"""检查设备是否已重置成功。"""
@@ -116,7 +120,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
return True
return self._unilabos_backend.is_reset_ok
def __init__(self, deck: Deck, host: str, port: int, timeout: float, setup=True, debug=False):
def __init__(self, deck: Deck, host: str, port: int, timeout: float, channel_num=8, axis="Left", setup=True, debug=False, matrix_id=""):
tablets_info = []
count = 0
for child in deck.children:
@@ -125,8 +129,8 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
tablets_info.append(
WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"])
)
self._unilabos_backend = PRCXI9300Backend(tablets_info, host, port, timeout, setup, debug)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=True)
self._unilabos_backend = PRCXI9300Backend(tablets_info, host, port, timeout, channel_num, axis, setup, debug, matrix_id)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=True, channel_num=channel_num)
async def create_protocol(
self,
@@ -350,14 +354,18 @@ class PRCXI9300Backend(LiquidHandlerBackend):
host: str = "127.0.0.1",
port: int = 9999,
timeout: float = 10.0,
channel_num: int=8,
axis: str="Left",
setup=True,
debug=False,
matrix_id="",
) -> None:
super().__init__()
self.tablets_info = tablets_info
self.api_client = PRCXI9300Api(host, port, timeout, debug)
self.matrix_id = matrix_id
self.api_client = PRCXI9300Api(host, port, timeout, axis, debug)
self.host, self.port, self.timeout = host, port, timeout
self._num_channels = 8
self._num_channels = channel_num
self._execute_setup = setup
self.debug = debug
@@ -375,12 +383,18 @@ class PRCXI9300Backend(LiquidHandlerBackend):
WorkTablets=self.tablets_info,
)
#print(json.dumps(self.matrix_info, indent=2))
res = self.api_client.add_WorkTablet_Matrix(self.matrix_info)
assert res["Success"], f"Failed to create matrix: {res.get('Message', 'Unknown error')}"
print(f"PRCXI9300Backend created matrix with ID: {self.matrix_info['MatrixId']}, result: {res}")
solution_id = self.api_client.add_solution(
f"protocol_{run_time}", self.matrix_info["MatrixId"], self.steps_todo_list
)
if not len(self.matrix_id):
res = self.api_client.add_WorkTablet_Matrix(self.matrix_info)
assert res["Success"], f"Failed to create matrix: {res.get('Message', 'Unknown error')}"
print(f"PRCXI9300Backend created matrix with ID: {self.matrix_info['MatrixId']}, result: {res}")
solution_id = self.api_client.add_solution(
f"protocol_{run_time}", self.matrix_info["MatrixId"], self.steps_todo_list
)
else:
print(f"PRCXI9300Backend using predefined worktable {self.matrix_id}, skipping matrix creation.")
solution_id = self.api_client.add_solution(
f"protocol_{run_time}", self.matrix_id, self.steps_todo_list
)
print(f"PRCXI9300Backend created solution with ID: {solution_id}")
self.api_client.load_solution(solution_id)
return self.api_client.start()
@@ -437,32 +451,20 @@ class PRCXI9300Backend(LiquidHandlerBackend):
PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1
step = self.api_client.Load(
"Left",
dosage=0,
plate_no=PlateNo,
is_whole_plate=False,
hole_row=1,
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
step = self.api_client.Load(dosage=0, plate_no=PlateNo, is_whole_plate=False, hole_row=1, hole_col=hole_col,
blending_times=0, balance_height=0, plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8")
self.steps_todo_list.append(step)
async def drop_tips(self, ops: List[Drop], use_channels: List[int] = None):
"""Pick up tips from the specified resource."""
plate = ops[0].resource.parent.parent
deck = plate.parent
plate_index = deck.children.index(plate)
if deck.children[plate_index].name == "trash":
step = self.api_client.UnLoad(
"Left",
dosage=0,
plate_no=plate_index+1,
is_whole_plate=False,
@@ -474,10 +476,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)
return
if len(ops) != 8:
raise ValueError(f"PRCXI9300Backend drop_tips: Expected 8 pickups, got {len(ops)}")
@@ -600,18 +600,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1
step = self.api_client.Imbibing(
"Left",
dosage=int(volumes[0]),
plate_no=PlateNo,
is_whole_plate=False,
hole_row=1,
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
step = self.api_client.Imbibing(dosage=int(volumes[0]), plate_no=PlateNo, is_whole_plate=False, hole_row=1,
hole_col=hole_col, blending_times=0, balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}", hole_numbers="1,2,3,4,5,6,7,8")
self.steps_todo_list.append(step)
@@ -696,9 +687,10 @@ class PRCXI9300Backend(LiquidHandlerBackend):
class PRCXI9300Api:
def __init__(self, host: str = "127.0.0.1", port: int = 9999, timeout: float = 10.0, debug: bool = False) -> None:
def __init__(self, host: str = "127.0.0.1", port: int = 9999, timeout: float = 10.0, axis="Left", debug: bool = False) -> None:
self.host, self.port, self.timeout = host, port, timeout
self.debug = debug
self.axis = axis
@staticmethod
def _len_prefix(n: int) -> bytes:
@@ -765,8 +757,10 @@ class PRCXI9300Api:
"""GetErrorCode"""
return self.call("IAutomation", "GetErrorCode")
def get_reset_status(self) -> Optional[str]:
def get_reset_status(self) -> bool:
"""GetErrorCode"""
if self.debug:
return True
res = self.call("IAutomation", "GetResetStatus")
return not res
@@ -803,27 +797,12 @@ class PRCXI9300Api:
def add_WorkTablet_Matrix(self, matrix: MatrixInfo):
return self.call("IMatrix", "AddWorkTabletMatrix", [matrix])
def Load(
self,
axis: str,
dosage: int,
plate_no: int,
is_whole_plate: bool,
hole_row: int,
hole_col: int,
blending_times: int,
balance_height: int,
plate_or_hole: str,
hole_numbers: str,
assist_fun1: str = "",
assist_fun2: str = "",
assist_fun3: str = "",
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
def Load(self, dosage: int, plate_no: int, is_whole_plate: bool, hole_row: int, hole_col: int, blending_times: int,
balance_height: int, plate_or_hole: str, hole_numbers: str, assist_fun1: str = "", assist_fun2: str = "",
assist_fun3: str = "", assist_fun4: str = "", assist_fun5: str = "",
liquid_method: str = "NormalDispense") -> Dict[str, Any]:
return {
"StepAxis": axis,
"StepAxis": self.axis,
"Function": "Load",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -842,27 +821,12 @@ class PRCXI9300Api:
"LiquidDispensingMethod": liquid_method,
}
def Imbibing(
self,
axis: str,
dosage: int,
plate_no: int,
is_whole_plate: bool,
hole_row: int,
hole_col: int,
blending_times: int,
balance_height: int,
plate_or_hole: str,
hole_numbers: str,
assist_fun1: str = "",
assist_fun2: str = "",
assist_fun3: str = "",
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
def Imbibing(self, dosage: int, plate_no: int, is_whole_plate: bool, hole_row: int, hole_col: int,
blending_times: int, balance_height: int, plate_or_hole: str, hole_numbers: str, assist_fun1: str = "",
assist_fun2: str = "", assist_fun3: str = "", assist_fun4: str = "", assist_fun5: str = "",
liquid_method: str = "NormalDispense") -> Dict[str, Any]:
return {
"StepAxis": axis,
"StepAxis": self.axis,
"Function": "Imbibing",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -883,7 +847,6 @@ class PRCXI9300Api:
def Tapping(
self,
axis: str,
dosage: int,
plate_no: int,
is_whole_plate: bool,
@@ -901,7 +864,7 @@ class PRCXI9300Api:
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
return {
"StepAxis": axis,
"StepAxis": self.axis,
"Function": "Tapping",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -922,7 +885,6 @@ class PRCXI9300Api:
def Blending(
self,
axis: str,
dosage: int,
plate_no: int,
is_whole_plate: bool,
@@ -940,7 +902,7 @@ class PRCXI9300Api:
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
return {
"StepAxis": axis,
"StepAxis": self.axis,
"Function": "Blending",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -961,7 +923,6 @@ class PRCXI9300Api:
def UnLoad(
self,
axis: str,
dosage: int,
plate_no: int,
is_whole_plate: bool,
@@ -979,7 +940,7 @@ class PRCXI9300Api:
liquid_method: str = "NormalDispense",
) -> Dict[str, Any]:
return {
"StepAxis": axis,
"StepAxis": self.axis,
"Function": "UnLoad",
"DosageNum": dosage,
"PlateNo": plate_no,
@@ -1000,92 +961,348 @@ class PRCXI9300Api:
if __name__ == "__main__":
# Example usage
deck = PRCXI9300Deck(name="PRCXI Deck", size_x=100, size_y=100, size_z=100)
plate1 = PRCXI9300Container(name="rackT1", size_x=50, size_y=50, size_z=10, category="plate")
plate1.load_state({
"Material": {
"uuid": "80652665f6a54402b2408d50b40398df",
"Code": "ZX-001-1000",
"Name": "1000μL Tip头",
"SummaryName": "1000μL Tip头",
"PipetteHeight": 100,
"materialEnum": 1
}
})
# # Example usage
# # 1. 用导出的json给每个T1 T2板子设定相应的物料如果是孔板和枪头盒要对应区分
# # 2. 设计一个单点动作流程,可以跑
# # 3.
# deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=100, size_y=100, size_z=100)
# plate1 = PRCXI9300Container(name="rackT1", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
# plate1.load_state({
# "Material": {
# "uuid": "80652665f6a54402b2408d50b40398df",
# "Co#de": "ZX-001-1000",
# "Name": "1000μL Tip头",
# "SummaryName": "1000μL Tip头",
# "PipetteHeight": 100,
# "materialEnum": 1
# }
# })
plate2 = PRCXI9300Container(name="plateT2", size_x=50, size_y=50, size_z=10, category="plate")
# plate2 = PRCXI9300Container(name="plateT2", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
# plate2.load_state({
# "Material": {
# "uuid": "57b1e4711e9e4a32b529f3132fc5931f",
# }
# })
# plate3 = PRCXI9300Container(name="plateT3", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
# plate3.load_state({
# "Material": {
# "uuid": "57b1e4711e9e4a32b529f3132fc5931f",
# }
# })
# plate4 = PRCXI9300Container(name="rackT4", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
# plate4.load_state({
# "Material": {
# "uuid": "80652665f6a54402b2408d50b40398df",
# "Code": "ZX-001-1000",
# "Name": "1000μL Tip头",
# "SummaryName": "1000μL Tip头",
# "PipetteHeight": 100,
# "materialEnum": 1
# }
# })
# plate5 = PRCXI9300Container(name="plateT5", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
# plate5.load_state({
# "Material": {
# "uuid": "57b1e4711e9e4a32b529f3132fc5931f",
# }
# })
# plate6 = PRCXI9300Trash(name="trash", size_x=50, size_y=500, size_z=10, category="trash")
# plate6.load_state({
# "Material": {
# "uuid": "57b1e4711e9e4a32b529f3132fc5931f",
# }
# })
# from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul
# from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat
# tip_rack = tipone_96_tiprack_200ul("TipRack")
# well_containers = corning_96_wellplate_360ul_flat("Plate")
# # from pprint import pprint
# # pprint(well_containers.children)
# plate1.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
# plate2.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
# deck.assign_child_resource(plate1, location=Coordinate(0, 0, 0))
# deck.assign_child_resource(plate2, location=Coordinate(0, 0, 0))
# deck.assign_child_resource(plate3, location=Coordinate(0, 0, 0))
# deck.assign_child_resource(plate4, location=Coordinate(0, 0, 0))
# deck.assign_child_resource(plate5, location=Coordinate(0, 0, 0))
# deck.assign_child_resource(plate6, location=Coordinate(0, 0, 0))
# print(plate2)
# # plate_2_liquids = [[(None, 500)]]*96
# # plate2.set_well_liquids(plate_2_liquids)
# handler = PRCXI9300Handler(deck=deck, host="192.168.3.9", port=9999, timeout=10.0, setup=False, debug=True, matrix_id="fd383e6d-2d0e-40b5-9c01-1b2870b1f1b1")
# handler.set_tiprack([tip_rack]) # Set the tip rack for the handler
# asyncio.run(handler.setup()) # Initialize the handler and setup the connection
# from pylabrobot.resources import set_volume_tracking
# # from pylabrobot.resources import set_tip_tracking
# set_volume_tracking(enabled=True)
# plate2.set_well_liquids([("Water", 100)] * plate2.num_items)
# asyncio.run(handler.create_protocol(protocol_name="Test Protocol")) # Initialize the backend and setup the connection
# # asyncio.run(handler.pick_up_tips(tip_rack.children[:8],[0,1,2,3,4,5,6,7]))
# # asyncio.run(handler.aspirate(well_containers.children[:8],[50]*8, [0,1,2,3,4,5,6,7]))
# # asyncio.run(handler.dispense(well_containers.children[:8],[50]*8,[0,1,2,3,4,5,6,7]))
# # asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
# # asyncio.run(handler.discard_tips())
# # asyncio.run(handler.mix(well_containers.children[:8
# # ], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
# #print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# asyncio.run(handler.add_liquid(
# asp_vols=[100]*16,
# dis_vols=[100]*16,
# reagent_sources=well_containers.children[-16:],
# targets=well_containers.children[:16],
# use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# flow_rates=[None] * 32,
# offsets=[Coordinate(0, 0, 0)] * 32,
# liquid_height=[None] * 16,
# blow_out_air_volume=[None] * 16,
# delays=None,
# mix_time=3,
# mix_vol=50,
# spread="wide",
# ))
# # asyncio.run(handler.remove_liquid(
# # vols=[100]*16,
# # sources=well_containers.children[-16:],
# # waste_liquid=well_containers.children[:16], # 这个有些奇怪,但是好像也只能这么写
# # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # flow_rates=[None] * 32,
# # offsets=[Coordinate(0, 0, 0)] * 32,
# # liquid_height=[None] * 32,
# # blow_out_air_volume=[None] * 32,
# # spread="wide",
# # ))
# # asyncio.run(handler.transfer_liquid(
# # asp_vols=[100]*16,
# # dis_vols=[100]*16,
# # tip_racks=[tip_rack],
# # sources=well_containers.children[-16:],
# # targets=well_containers.children[:16],
# # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # offsets=[Coordinate(0, 0, 0)] * 32,
# # asp_flow_rates=[None] * 16,
# # dis_flow_rates=[None] * 16,
# # liquid_height=[None] * 32,
# # blow_out_air_volume=[None] * 32,
# # mix_times=3,
# # mix_vol=50,
# # spread="wide",
# # ))
# print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # input("pick_up_tips add step")
# #asyncio.run(handler.run_protocol()) # Run the protocol
# # input("Running protocol...")
# # input("Press Enter to continue...") # Wait for user input before proceeding
# # print("PRCXI9300Handler initialized with deck and host settings.")
# Example usage
# 1. 用导出的json给每个T1 T2板子设定相应的物料如果是孔板和枪头盒要对应区分
# 2. 设计一个单点动作流程,可以跑
# 3.
deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=100, size_y=100, size_z=100)
plate2 = PRCXI9300Container(name="plateT2", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate2.load_state({
"Material": {
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
plate3 = PRCXI9300Container(name="plateT3", size_x=50, size_y=50, size_z=10, category="plate")
#储液槽
plate3 = PRCXI9300Container(name="plateT3", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate3.load_state({
"Material": {
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
plate4 = PRCXI9300Container(name="rackT4", size_x=50, size_y=50, size_z=10, category="plate")
plate4 = PRCXI9300Trash(name="trash", size_x=50, size_y=50, size_z=10, category="trash", ordering=collections.OrderedDict())
plate4.load_state({
"Material": {
"uuid": "80652665f6a54402b2408d50b40398df",
"Code": "ZX-001-1000",
"Name": "1000μL Tip头",
"SummaryName": "1000μL Tip头",
"PipetteHeight": 100,
"materialEnum": 1
"uuid": "730067cf07ae43849ddf4034299030e9"
}
})
plate5 = PRCXI9300Container(name="plateT5", size_x=50, size_y=50, size_z=10, category="plate")
plate5 = PRCXI9300Container(name="plateT5", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate5.load_state({
"Material": {
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
plate6 = PRCXI9300Trash(name="trash", size_x=50, size_y=500, size_z=10, category="trash")
plate6 = PRCXI9300Container(name="plateT6", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate6.load_state({
"Material": {
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
plate7 = PRCXI9300Container(name="plateT7", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate7.load_state({
"Material": {
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
plate8 = PRCXI9300Container(name="rackT8", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate8.load_state({
"Material": {
"uuid": "068b3815e36b4a72a59bae017011b29f",
"Code": "ZX-001-10+",
"Name": "10μL加长 Tip头"
}
})
plate9 = PRCXI9300Container(name="plateT9", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate9.load_state({
"Material": {
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
plate10 = PRCXI9300Container(name="plateT10", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate10.load_state({
"Material": {
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
plate11 = PRCXI9300Container(name="plateT11", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate11.load_state({
"Material": {
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
}
})
from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat
tip_rack = tipone_96_tiprack_200ul("TipRack")
well_containers = corning_96_wellplate_360ul_flat("Plate")
plate12 = PRCXI9300Container(name="plateT12", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate12.load_state({
"Material": {
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
plate13 = PRCXI9300Container(name="plateT13", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict())
plate13.load_state({
"Material": {
"uuid": "04211a2dc93547fe9bf6121eac533650"
}
})
from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul,opentrons_96_tiprack_10ul
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat, nest_96_wellplate_2ml_deep
def get_well_container(name: str) -> PRCXI9300Container:
well_containers = corning_96_wellplate_360ul_flat(name).serialize()
plate = PRCXI9300Container(name=name, size_x=50, size_y=50, size_z=10, category="plate",
ordering=collections.OrderedDict())
plate_serialized = plate.serialize()
plate_serialized["parent_name"] = deck.name
well_containers.update({k: v for k, v in plate_serialized.items() if k not in ["children"]})
new_plate: PRCXI9300Container = PRCXI9300Container.deserialize(well_containers)
return new_plate
def get_tip_rack(name: str) -> PRCXI9300Container:
tip_racks = opentrons_96_tiprack_10ul("name").serialize()
tip_rack = PRCXI9300Container(name=name, size_x=50, size_y=50, size_z=10, category="tip_rack",
ordering=collections.OrderedDict())
tip_rack_serialized = tip_rack.serialize()
tip_rack_serialized["parent_name"] = deck.name
tip_racks.update({k: v for k, v in tip_rack_serialized.items() if k not in ["children"]})
new_tip_rack: PRCXI9300Container = PRCXI9300Container.deserialize(tip_racks)
return new_tip_rack
plate1 = get_well_container("HPLCPlateT1")
plate1.load_state({
"Material": {
"uuid": "548bbc3df0d4447586f2c19d2c0c0c55",
"Code": "HPLC01",
"Name": "HPLC料盘"
}
})
plate2 = get_well_container("HPLCPlateT1")
plate2.load_state({
"Material": {
"uuid": "548bbc3df0d4447586f2c19d2c0c0c55",
"Code": "HPLC01",
"Name": "HPLC料盘"
}
})
# from pprint import pprint
# pprint(well_containers.children)
plate1.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
plate1.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate2.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate3.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate4.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate5.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate6.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate7.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate8.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
plate9.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate10.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate11.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate12.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
plate13.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate1, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate2, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate3, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate4, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate5, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate6, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate7, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate8, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate9, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate10, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate11, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate12, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate13, location=Coordinate(0, 0, 0))
handler = PRCXI9300Handler(deck=deck, host="192.168.3.9", port=9999, timeout=10.0, setup=False, debug=True)
handler = PRCXI9300Handler(deck=deck, host="10.181.102.13", port=9999, timeout=10.0, setup=False, debug=True, matrix_id="fd383e6d-2d0e-40b5-9c01-1b2870b1f1b1")
handler.set_tiprack([tip_rack]) # Set the tip rack for the handler
asyncio.run(handler.setup()) # Initialize the handler and setup the connection
from pylabrobot.resources import set_volume_tracking
# from pylabrobot.resources import set_tip_tracking
set_volume_tracking(enabled=True)
plate1.set_well_liquids([("Water", 100)] * plate1.num_items)
asyncio.run(handler.create_protocol(protocol_name="Test Protocol")) # Initialize the backend and setup the connection
# asyncio.run(handler.pick_up_tips(tip_rack.children[:8],[0,1,2,3,4,5,6,7]))
# asyncio.run(handler.aspirate(well_containers.children[:8],[50]*8, [0,1,2,3,4,5,6,7]))
# asyncio.run(handler.dispense(well_containers.children[:8],[50]*8,[0,1,2,3,4,5,6,7]))
#asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
# asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
# asyncio.run(handler.discard_tips())
# asyncio.run(handler.mix(well_containers.children[:8], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
# asyncio.run(handler.mix(well_containers.children[:8
# ], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
#print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
asyncio.run(handler.add_liquid(
asp_vols=[100]*16,
dis_vols=[100]*16,
reagent_sources=well_containers.children[-16:],
targets=well_containers.children[:16],
reagent_sources=final_plate_2.children[-16:],
targets=final_plate_2.children[:16],
use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
flow_rates=[None] * 32,
offsets=[Coordinate(0, 0, 0)] * 32,
@@ -1131,6 +1348,3 @@ if __name__ == "__main__":
# input("Press Enter to continue...") # Wait for user input before proceeding
# print("PRCXI9300Handler initialized with deck and host settings.")
# 但是怎么丢tip这个需要手动设置一下

View File

@@ -429,6 +429,20 @@ def resource_ulab_to_plr(resource: dict, plr_model=False) -> "ResourcePLR":
def resource_plr_to_ulab(resource_plr: "ResourcePLR", parent_name: str = None):
def replace_plr_type_to_ulab(source: str):
replace_info = {
"plate": "plate",
"well": "well",
"tip_spot": "container",
"trash": "container",
"deck": "deck",
"tip_rack": "container",
}
if source in replace_info:
return replace_info[source]
else:
print("转换pylabrobot的时候出现未知类型", source)
return "container"
def resource_plr_to_ulab_inner(d: dict, all_states: dict) -> dict:
r = {
"id": d["name"],
@@ -436,7 +450,7 @@ def resource_plr_to_ulab(resource_plr: "ResourcePLR", parent_name: str = None):
"sample_id": None,
"children": [resource_plr_to_ulab_inner(child, all_states) for child in d["children"]],
"parent": d["parent_name"] if d["parent_name"] else parent_name if parent_name else None,
"type": "device", # FIXME plr自带的type是python class name
"type": replace_plr_type_to_ulab(d.get("category")), # FIXME plr自带的type是python class name
"class": d.get("class", ""),
"position": (
{"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]}

View File

@@ -367,7 +367,7 @@ def convert_to_ros_msg(ros_msg_type: Union[Type, Any], obj: Any) -> Any:
logger.warning(f"Not Supported type: {td}")
setattr(ros_msg, key, []) # FIXME
elif "array.array" in str(type(attr)):
if attr.typecode == "f":
if attr.typecode == "f" or attr.typecode == "d":
setattr(ros_msg, key, [float(i) for i in value])
else:
setattr(ros_msg, key, value)

View File

@@ -660,7 +660,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if len(action_kwargs[k]) > 1:
for i in action_kwargs[k]:
r = ResourceGet.Request()
r.id = i["id"]
r.id = i["id"] # splash optional
r.with_children = True
response = await self._resource_clients["resource_get"].call_async(r)
current_resources.extend(response.resources)

View File

@@ -148,7 +148,7 @@ def configure_logger():
"""配置日志记录器"""
# 获取根日志记录器
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO) # 修改为DEBUG以显示所有级别
root_logger.setLevel(logging.DEBUG) # 修改为DEBUG以显示所有级别
# 移除已存在的处理器
for handler in root_logger.handlers[:]:
@@ -156,7 +156,7 @@ def configure_logger():
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO) # 修改为DEBUG以显示所有级别
console_handler.setLevel(logging.DEBUG) # 修改为DEBUG以显示所有级别
# 使用自定义的颜色格式化器
color_formatter = ColoredFormatter()