更新自动化配置抓取位置

This commit is contained in:
zhangshixiang
2026-01-15 17:21:14 +08:00
parent b045ab4e0a
commit be054589b5
3 changed files with 1913 additions and 4254 deletions

View File

@@ -36,7 +36,6 @@ from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandl
from unilabos.resources.itemized_carrier import ItemizedCarrier
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
class PRCXIError(RuntimeError):
"""Lilith 返回 Success=false 时抛出的业务异常"""
@@ -72,7 +71,10 @@ class PRCXI9300Deck(Deck):
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs):
super().__init__(name, size_x, size_y, size_z)
self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位
self.slot_locations = [Coordinate(0, 0, 0)] * 16
self.slot_locations = []
for i in range(0, 16):
self.slot_locations.append(Coordinate((i%4)*137.5+5, (3-int(i/4))*96+13, 0))
def assign_child_at_slot(self, resource: Resource, slot: int, reassign: bool = False) -> None:
if self.slots[slot - 1] is not None and not reassign:
@@ -374,14 +376,13 @@ class PRCXI9300TubeRack(TubeRack):
ordering_param = None
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# 如果是字符串,说明这是位置名称,需要让 TubeRack 自己创建 Tube 对象
# 我们只传递位置信息(键),不传递值,使用 ordering 参数
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象
# ordering 的值是字符串,这种情况下我们让 TubeRack 使用默认行为
# 不在初始化时创建 items而是在 deserialize 后处理
items_to_pass = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
ordering_param = collections.OrderedDict() # 提供空的 ordering 来满足要求
# 保存 ordering 信息以便后续处理
self._temp_ordering = ordering
else:
# ordering 的值已经是对象,需要过滤掉 None 值
# 只保留有效的对象,用于 ordered_items 参数
@@ -390,9 +391,9 @@ class PRCXI9300TubeRack(TubeRack):
items_to_pass = valid_items
ordering_param = None
else:
# 如果没有有效对象,使用 ordering 参数
items_to_pass = None
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
# 如果没有有效对象,创建空的 ordered_items
items_to_pass = {}
ordering_param = None
elif items is not None:
# 兼容旧的 items 参数
items_to_pass = items
@@ -403,25 +404,45 @@ class PRCXI9300TubeRack(TubeRack):
# 根据情况传递不同的参数
if items_to_pass is not None:
super().__init__(name, size_x, size_y, size_z,
ordered_items=items_to_pass,
model=model,
super().__init__(name, size_x, size_y, size_z,
ordered_items=items_to_pass,
model=model,
**kwargs)
elif ordering_param is not None:
# 传递 ordering 参数,让 TubeRack 自己创建 Tube 对象
super().__init__(name, size_x, size_y, size_z,
ordering=ordering_param,
model=model,
**kwargs)
# 直接调用 ItemizedResource 的构造函数来处理 ordering
from pylabrobot.resources import ItemizedResource
ItemizedResource.__init__(self, name, size_x, size_y, size_z,
ordering=ordering_param,
category=category,
model=model,
**kwargs)
else:
super().__init__(name, size_x, size_y, size_z,
model=model,
super().__init__(name, size_x, size_y, size_z,
model=model,
**kwargs)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
# 如果有临时 ordering 信息,在初始化完成后处理
if hasattr(self, '_temp_ordering') and self._temp_ordering:
self._process_temp_ordering()
def _process_temp_ordering(self):
"""处理临时的 ordering 信息,创建相应的 Tube 对象"""
from pylabrobot.resources import Tube, Coordinate
for location, item_type in self._temp_ordering.items():
if item_type == 'Tube' or item_type == 'tube':
# 为每个位置创建 Tube 对象
tube = Tube(name=f"{self.name}_{location}", size_x=10, size_y=10, size_z=50, max_volume=2000.0)
# 使用 assign_child_resource 添加到 rack 中
self.assign_child_resource(tube, location=Coordinate(0, 0, 0))
# 清理临时数据
del self._temp_ordering
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
try:
data = super().serialize_state()
@@ -460,19 +481,35 @@ class PRCXI9300PlateAdapterSite(ItemizedCarrier):
resource_size_y=size_y,
resource_size_z=size_z,
name_prefix=name,
)[0]
)[0]
# 确保不传递重复的参数
kwargs.pop('layout', None)
kwargs.pop('num_items_x', None)
kwargs.pop('num_items_y', None)
kwargs.pop('num_items_z', None)
kwargs.pop('sites', None)
sites_in = kwargs.pop('sites', None)
# 创建默认的sites字典
sites_dict = {name: sites}
# 优先从 sites_in 读取 'content_type',否则使用默认值
content_type = [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
]
# 如果提供了sites参数则用sites_in中的值替换sites_dict中对应的元素
if sites_in is not None and isinstance(sites_in, dict):
for site_key, site_value in sites_in.items():
if site_key in sites_dict:
sites_dict[site_key] = site_value
super().__init__(name, size_x, size_y, size_z,
sites={name: sites},
num_items_x=1,
num_items_y=1,
num_items_z=1,
sites=sites_dict,
num_items_x=kwargs.pop('num_items_x', 1),
num_items_y=kwargs.pop('num_items_y', 1),
num_items_z=kwargs.pop('num_items_z', 1),
content_type=content_type,
**kwargs)
self._unilabos_state = {}
if material_info:
@@ -618,16 +655,59 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
step_mode=False,
matrix_id="",
is_9320=False,
start_rail=2,
rail_nums=4,
rail_interval=0,
x_increase = -0.003636,
y_increase = -0.003636,
x_offset = -0.8,
y_offset = -37.98,
deck_z = 300,
deck_y = 400,
rail_width=27.5,
xy_coupling = -0.0045,
):
self.deck_x = (start_rail + rail_nums*5 + (rail_nums-1)*rail_interval) * rail_width
self.deck_y = deck_y
self.deck_z = deck_z
self.x_increase = x_increase
self.y_increase = y_increase
self.x_offset = x_offset
self.y_offset = y_offset
self.xy_coupling = xy_coupling
tablets_info = []
count = 0
plate_positions = []
for child in deck.children:
number = int(child.name.replace("T", ""))
if child.children:
if "Material" in child.children[0]._unilabos_state:
number = int(child.name.replace("T", ""))
tablets_info.append(
WorkTablets(Number=number, Code=f"T{number}", Material=child.children[0]._unilabos_state["Material"])
{
"Number": number,
"Material": child.children[0]._unilabos_state["Material"]
}
)
else:
tablets_info.append(
{
"Number": number,
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9"}
}
)
pos = self.plr_pos_to_prcxi(child)
plate_positions.append(
{
"Number": number,
"XPos": pos.x,
"YPos": pos.y,
"ZPos": pos.z
}
)
if is_9320:
print("当前设备是9320")
# 始终初始化 step_mode 属性
@@ -637,10 +717,38 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
self.step_mode = step_mode
else:
print("9300设备不支持 单点动作模式")
self._unilabos_backend = PRCXI9300Backend(
tablets_info, host, port, timeout, channel_num, axis, setup, debug, matrix_id, is_9320
tablets_info, plate_positions, host, port, timeout, channel_num, axis, setup, debug, matrix_id, is_9320,
x_increase, y_increase, x_offset, y_offset,
deck_z, deck_x=self.deck_x, deck_y=self.deck_y, xy_coupling=xy_coupling
)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num)
def plr_pos_to_prcxi(self, resource: Resource):
resource_pos = resource.get_absolute_location(x="c",y="c",z="t")
x = resource_pos.x
y = resource_pos.y
z = resource_pos.z
# 如果z等于0则递归resource.parent的高度并向z加使用get_size_z方法
parent = resource.parent
res_z = resource.location.z
while not isinstance(parent, LiquidHandlerAbstract) and (res_z == 0) and parent is not None:
z += parent.get_size_z()
res_z = parent.location.z
parent = getattr(parent, "parent", None)
prcxi_x = (self.deck_x - x)*(1+self.x_increase) + self.x_offset + self.xy_coupling * (self.deck_y - y)
prcxi_y = (self.deck_y - y)*(1+self.y_increase) + self.y_offset
prcxi_z = self.deck_z - z
prcxi_x = min(max(0, prcxi_x),self.deck_x)
prcxi_y = min(max(0, prcxi_y),self.deck_y)
prcxi_z = min(max(0, prcxi_z),self.deck_z)
return Coordinate(prcxi_x, prcxi_y, prcxi_z)
def post_init(self, ros_node: BaseROS2DeviceNode):
super().post_init(ros_node)
@@ -962,6 +1070,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
def __init__(
self,
tablets_info: list[WorkTablets],
plate_positions: dict[int, Coordinate],
host: str = "127.0.0.1",
port: int = 9999,
timeout: float = 10.0,
@@ -970,10 +1079,19 @@ class PRCXI9300Backend(LiquidHandlerBackend):
setup=True,
debug=False,
matrix_id="",
is_9320=False,
is_9320=False,
x_increase = 0,
y_increase = 0,
x_offset = 0,
y_offset = 0,
deck_z = 300,
deck_x = 0,
deck_y = 0,
xy_coupling = 0.0,
) -> None:
super().__init__()
self.tablets_info = tablets_info
self.plate_positions = plate_positions
self.matrix_id = matrix_id
self.api_client = PRCXI9300Api(host, port, timeout, axis, debug, is_9320)
self.host, self.port, self.timeout = host, port, timeout
@@ -981,6 +1099,15 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self._execute_setup = setup
self.debug = debug
self.axis = "Left"
self.x_increase = x_increase
self.y_increase = y_increase
self.xy_coupling = xy_coupling
self.x_offset = x_offset
self.y_offset = y_offset
self.deck_x = deck_x
self.deck_y = deck_y
self.deck_z = deck_z
self.tip_length = 0
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
step = self.api_client.shaker_action(
@@ -992,17 +1119,14 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.steps_todo_list.append(step)
return step
async def pick_up_resource(self, pickup: ResourcePickup, **backend_kwargs):
resource=pickup.resource
offset=pickup.offset
pickup_distance_from_top=pickup.pickup_distance_from_top
direction=pickup.direction
plate_number = int(resource.parent.name.replace("T", ""))
is_whole_plate = True
balance_height = 0
balance_height = 20
step = self.api_client.clamp_jaw_pick_up(plate_number, is_whole_plate, balance_height)
self.steps_todo_list.append(step)
@@ -1010,17 +1134,16 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def drop_resource(self, drop: ResourceDrop, **backend_kwargs):
plate_number = None
target_plate_number = backend_kwargs.get("target_plate_number", None)
if target_plate_number is not None:
plate_number = int(target_plate_number.name.replace("T", ""))
is_whole_plate = True
balance_height = 0
if plate_number is None:
raise ValueError("target_plate_number is required when dropping a resource")
step = self.api_client.clamp_jaw_drop(plate_number, is_whole_plate, balance_height)
self.steps_todo_list.append(step)
return step
@@ -1034,29 +1157,33 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self._ros_node = ros_node
def create_protocol(self, protocol_name):
if protocol_name == "":
protocol_name = f"protocol_{time.time()}"
self.protocol_name = protocol_name
self.steps_todo_list = []
matrices = self.api_client.matrix_by_id("5de524d0-3f95-406c-86dd-f83626ebc7cb")["WorkTablets"]
if not len(self.matrix_id):
self.matrix_id = str(uuid.uuid4())
self.matrix_info = {
"MatrixId": self.matrix_id,
"MatrixName": self.matrix_id,
"WorkTablets": [{"Number": i, "Material": {"uuid": "068b3815e36b4a72a59bae017011b29f"}} for i in range(1, 16)]+
[{"Number": 16, "Material": {'uuid': '730067cf07ae43849ddf4034299030e9', 'Code': 'q1', 'SupplyType': 1, 'Name': '废弃槽', 'SummaryName': None, 'Factory': None, 'LengthNum': None, 'WidthNum': None, 'HeightNum': 0.0, 'DepthNum': 0.0, 'PipetteHeight': None, 'HoleDiameter': None, 'Margins_X': None, 'Margins_Y': None, 'HoleColum': 1, 'HoleRow': 1, 'Volume': 1250, 'ImagePath': 'C:\\Program Files (x86)\\Pipetting workstation chip', 'CreateTime': None, 'UpdateTime': None, 'XSpacing': 1.0, 'YSpacing': 1.0, 'materialEnum': 0}}]
}
# print(json.dumps(self.matrix_info, indent=2))
res = self.api_client.add_WorkTablet_Matrix(self.matrix_info)
if not res["Success"]:
self.matrix_id = ""
raise AssertionError(f"Failed to create matrix: {res.get('Message', 'Unknown error')}")
print(f"PRCXI9300Backend created matrix with ID: {self.matrix_info['MatrixId']}, result: {res}")
def run_protocol(self):
assert self.is_reset_ok, "PRCXI9300Backend is not reset successfully. Please call setup() first."
run_time = time.time()
self.matrix_info = MatrixInfo(
MatrixId=f"{int(run_time)}",
MatrixName=f"protocol_{run_time}",
MatrixCount=len(self.tablets_info),
WorkTablets=self.tablets_info,
solution_id = self.api_client.add_solution(
f"protocol_{run_time}", self.matrix_id, self.steps_todo_list
)
# print(json.dumps(self.matrix_info, indent=2))
if not len(self.matrix_id):
res = self.api_client.add_WorkTablet_Matrix(self.matrix_info)
assert res["Success"], f"Failed to create matrix: {res.get('Message', 'Unknown error')}"
print(f"PRCXI9300Backend created matrix with ID: {self.matrix_info['MatrixId']}, result: {res}")
solution_id = self.api_client.add_solution(
f"protocol_{run_time}", self.matrix_info["MatrixId"], self.steps_todo_list
)
else:
print(f"PRCXI9300Backend using predefined worktable {self.matrix_id}, skipping matrix creation.")
solution_id = self.api_client.add_solution(f"protocol_{run_time}", self.matrix_id, self.steps_todo_list)
print(f"PRCXI9300Backend created solution with ID: {solution_id}")
self.api_client.load_solution(solution_id)
print(json.dumps(self.steps_todo_list, indent=2))
@@ -1099,6 +1226,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
else:
await asyncio.sleep(1)
print("PRCXI9300 reset successfully.")
self.api_client.update_clamp_jaw_position(self.matrix_id, self.plate_positions)
except ConnectionRefusedError as e:
raise RuntimeError(
f"Failed to connect to PRCXI9300 API at {self.host}:{self.port}. "
@@ -1147,7 +1277,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1
hole_row = 1
if self._num_channels == 1:
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
step = self.api_client.Load(
@@ -1160,7 +1290,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers=f"{(hole_col - 1) * 8 + hole_row}" if self._num_channels == 1 else "1,2,3,4,5",
hole_numbers=f"{(hole_col - 1) * 8 + hole_row}" if self._num_channels != 8 else "1,2,3,4,5",
)
self.steps_todo_list.append(step)
@@ -1221,7 +1351,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1
if self.channel_num == 1:
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
step = self.api_client.UnLoad(
@@ -1273,7 +1403,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels == 1:
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
assert mix_time > 0
@@ -1330,7 +1460,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels == 1:
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
step = self.api_client.Imbibing(
@@ -1388,7 +1518,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels == 1:
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
step = self.api_client.Tapping(
@@ -1597,6 +1727,13 @@ class PRCXI9300Api:
"""GetWorkTabletMatrixById"""
return self.call("IMatrix", "GetWorkTabletMatrixById", [matrix_id])
def update_clamp_jaw_position(self, target_matrix_id: str, plate_positions: List[Dict[str, Any]]):
position_params = {
"MatrixId": target_matrix_id,
"WorkTablets": plate_positions
}
return self.call("IMatrix", "UpdateClampJawPosition", [position_params])
def add_WorkTablet_Matrix(self, matrix: MatrixInfo):
return self.call("IMatrix", "AddWorkTabletMatrix2" if self.is_9320 else "AddWorkTabletMatrix", [matrix])
@@ -1870,82 +2007,82 @@ class DefaultLayout:
{
"Number": 1,
"Code": "T1",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 2,
"Code": "T2",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 3,
"Code": "T3",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 4,
"Code": "T4",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 5,
"Code": "T5",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 6,
"Code": "T6",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 7,
"Code": "T7",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 8,
"Code": "T8",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 9,
"Code": "T9",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 10,
"Code": "T10",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 11,
"Code": "T11",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 12,
"Code": "T12",
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0},
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9"},
}, # 这个设置成废液槽,用储液槽表示
{
"Number": 13,
"Code": "T13",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 14,
"Code": "T14",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 15,
"Code": "T15",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
},
{
"Number": 16,
"Code": "T16",
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0},
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9"},
}, # 这个设置成垃圾桶,用储液槽表示
],
}

View File

@@ -79,6 +79,7 @@ class ItemizedCarrier(ResourcePLR):
category: Optional[str] = "carrier",
model: Optional[str] = None,
invisible_slots: Optional[str] = None,
content_type: Optional[List[str]] = ["bottle", "container", "tube", "bottle_carrier", "tip_rack"],
):
super().__init__(
name=name,
@@ -92,6 +93,7 @@ class ItemizedCarrier(ResourcePLR):
self.num_items_x, self.num_items_y, self.num_items_z = num_items_x, num_items_y, num_items_z
self.invisible_slots = [] if invisible_slots is None else invisible_slots
self.layout = "z-y" if self.num_items_z > 1 and self.num_items_x == 1 else "x-z" if self.num_items_z > 1 and self.num_items_y == 1 else "x-y"
self.content_type = content_type
if isinstance(sites, dict):
sites = sites or {}
@@ -419,7 +421,7 @@ class ItemizedCarrier(ResourcePLR):
self[identifier] if isinstance(self[identifier], str) else None,
"position": {"x": location.x, "y": location.y, "z": location.z},
"size": self.child_size[identifier],
"content_type": ["bottle", "container", "tube", "bottle_carrier", "tip_rack"]
"content_type": self.content_type
} for identifier, location in self.child_locations.items()]
}

File diff suppressed because it is too large Load Diff