Merge remote-tracking branch 'upstream/dev' into device_visualization

This commit is contained in:
zhangshixiang
2025-07-19 23:20:15 +08:00
7 changed files with 56212 additions and 6028 deletions

1
.gitignore vendored
View File

@@ -243,3 +243,4 @@ unilabos/device_mesh/view_robot.rviz
local_test2.py
ros-humble-unilabos-msgs-0.9.13-h6403a04_5.tar.bz2
*.bz2
test_config.py

19732
deck.json

File diff suppressed because it is too large Load Diff

21197
deck_9300_new.json Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,7 @@
"timeout": 10.0,
"axis": "Right",
"channel_num": 1,
"setup": true,
"setup": false,
"debug": false,
"simulator": false,
"matrix_id": "fd383e6d-2d0e-40b5-9c01-1b2870b1f1b1"

View File

@@ -714,15 +714,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
else:
if len(asp_vols) != len(targets):
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
# 首先应该对任务分组然后每次1个/8个进行操作处理
if len(use_channels) == 1:
tip = []
for _ in range(len(use_channels)):
for x in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.pick_up_tips(tip)
for _ in range(len(targets)):
print(use_channels, reagent_sources)
await self.aspirate(
resources=reagent_sources,
vols=[asp_vols[_]],

View File

@@ -430,7 +430,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int] = None):
"""Pick up tips from the specified resource."""
print('where?'*200)
plate_indexes = []
for op in ops:
plate = op.resource.parent
@@ -969,12 +969,14 @@ class PRCXI9300Api:
if __name__ == "__main__":
# # Example usage
# # 1. 用导出的json给每个T1 T2板子设定相应的物料如果是孔板和枪头盒要对应区分
# # 2. backend需要支持num channel为1的情况
# # 3. 设计一个单点动作流程,可以跑
# # 4.
deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=100, size_y=100, size_z=100)
# Example usage
# 1. 用导出的json给每个T1 T2板子设定相应的物料如果是孔板和枪头盒要对应区分
# 2. backend需要支持num channel为1的情况
# 3. 设计一个单点动作流程,可以跑
# 4.
deck = PRCXI9300Deck(name="PRCXI_Deck_9300", size_x=100, size_y=100, size_z=100)
from pylabrobot.resources.opentrons.tip_racks import opentrons_96_tiprack_300ul,opentrons_96_tiprack_10ul
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat, nest_96_wellplate_2ml_deep
@@ -1025,12 +1027,12 @@ if __name__ == "__main__":
}
})
plate4 = get_tip_rack("RackT4")
plate4 = get_well_container("PlateT4")
plate4.load_state({
"Material": {
"uuid": "076250742950465b9d6ea29a225dfb00",
"Code": "ZX-001-300",
"Name": "300μL Tip头"
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
"Code": "ZX-019-2.2",
"Name": "96深孔板"
}
})
@@ -1043,6 +1045,7 @@ if __name__ == "__main__":
}
})
plate6 = get_well_container("PlateT6")
plate6.load_state({
"Material": {
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
@@ -1075,13 +1078,19 @@ if __name__ == "__main__":
# from pylabrobot.resources import set_tip_tracking
set_volume_tracking(enabled=True)
from unilabos.resources.graphio import *
A = tree_to_list([resource_plr_to_ulab(deck)])
with open("deck_9300_new.json", "w", encoding="utf-8") as f:
json.dump(A, f, indent=4, ensure_ascii=False)
asyncio.run(handler.create_protocol(protocol_name="Test Protocol")) # Initialize the backend and setup the connection
# asyncio.run(handler.pick_up_tips(plate1.children[:8],[0,1,2,3,4,5,6,7]))
# asyncio.run(handler.pick_up_tips(plate1.children[:8],[0,1,2,3,4,5,6,7]))
# print(plate1.children[:8])
# asyncio.run(handler.aspirate(plate2.children[:8],[50]*8, [0,1,2,3,4,5,6,7]))
# print(plate2.children[:8])
# asyncio.run(handler.dispense(plate5.children[:8],[50]*8,[0,1,2,3,4,5,6,7]))
# # # asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
# print(plate5.children[:8])
# # # # asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
# asyncio.run(handler.discard_tips())
# asyncio.run(handler.mix(well_containers.children[:8
@@ -1102,7 +1111,7 @@ if __name__ == "__main__":
# mix_vol=50,
# spread="wide",
# ))
# asyncio.run(handler.run_protocol()) # Run the protocol
# asyncio.run(handler.remove_liquid(
# vols=[100]*16,
# sources=plate2.children[-16:],
@@ -1114,12 +1123,15 @@ if __name__ == "__main__":
# blow_out_air_volume=[None] * 32,
# spread="wide",
# ))
acid = [20]*8+[40]*8+[60]*8+[80]*8+[100]*8+[120]*8+[140]*8+[160]*8+[180]*8+[200]*8+[220]*8+[240]*8
alkaline = acid[::-1] # Reverse the acid list for alkaline
asyncio.run(handler.transfer_liquid(
asp_vols=[100]*16,
dis_vols=[100]*16,
asp_vols=acid,
dis_vols=acid,
tip_racks=[plate1],
sources=plate2.children[-16:],
targets=plate5.children[:16],
sources=plate2.children[:],
targets=plate5.children[:],
use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
offsets=[Coordinate(0, 0, 0)] * 32,
asp_flow_rates=[None] * 16,
@@ -1130,8 +1142,6 @@ if __name__ == "__main__":
mix_vol=50,
spread="wide",
))
print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # input("pick_up_tips add step")
asyncio.run(handler.run_protocol()) # Run the protocol
# # input("Running protocol...")
# # input("Press Enter to continue...") # Wait for user input before proceeding
@@ -1143,12 +1153,6 @@ if __name__ == "__main__":
# 3.
# deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=100, size_y=100, size_z=100)
# from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul,opentrons_96_tiprack_10ul
@@ -1289,8 +1293,8 @@ if __name__ == "__main__":
# from unilabos.resources.graphio import *
# A = tree_to_list([resource_plr_to_ulab(deck)])
# with open("deck.json", "w", encoding="utf-8") as f:
# json.dump(A, f, indent=4, ensure_ascii=False)
# # with open("deck.json", "w", encoding="utf-8") as f:
# # json.dump(A, f, indent=4, ensure_ascii=False)
# print(plate11.get_well(0).tracker.get_used_volume())
# asyncio.run(handler.create_protocol(protocol_name="Test Protocol")) # Initialize the backend and setup the connection
@@ -1351,14 +1355,6 @@ if __name__ == "__main__":
# # delays=None,
# # spread="wide"
# # ))
# asyncio.run(handler.run_protocol())
# # asyncio.run(handler.discard_tips())