修改sample_uuid的返回值

This commit is contained in:
zhangshixiang
2025-12-06 01:33:04 +08:00
parent b1cdef9185
commit 91aadba4ef
9 changed files with 44334 additions and 32 deletions

View File

@@ -6,6 +6,7 @@ import traceback
from collections import Counter
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
from typing_extensions import TypedDict
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend
from unilabos.devices.liquid_handling.laiyu.backend.laiyu_v_backend import UniLiquidHandlerLaiyuBackend
@@ -28,7 +29,9 @@ from pylabrobot.resources import (
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class SimpleReturn(TypedDict):
samples: list
volumes: list
class LiquidHandlerMiddleware(LiquidHandler):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8, **kwargs):
@@ -564,10 +567,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
self._ros_node = ros_node
@classmethod
def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]):
def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SimpleReturn:
"""Set the liquid in a well."""
res_samples = []
res_volumes = []
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
well.set_liquids([(liquid_name, volume)]) # type: ignore
res_samples.append({"name": well.name, "sample_uuid": well.unilabos_extra.get("sample_uuid", None)})
res_volumes.append(volume)
return SimpleReturn(samples=res_samples, volumes=res_volumes)
# ---------------------------------------------------------------
# REMOVE LIQUID --------------------------------------------------
# ---------------------------------------------------------------

View File

@@ -29,7 +29,7 @@ from pylabrobot.liquid_handling.standard import (
)
from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract, SimpleReturn
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
@@ -176,7 +176,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
super().post_init(ros_node)
self._unilabos_backend.post_init(ros_node)
def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]):
def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SimpleReturn:
return super().set_liquid(wells, liquid_names, volumes)
def set_group(self, group_name: str, wells: List[Well], volumes: List[float]):
@@ -505,7 +505,11 @@ class PRCXI9300Backend(LiquidHandlerBackend):
print(f"PRCXI9300Backend created solution with ID: {solution_id}")
self.api_client.load_solution(solution_id)
print(json.dumps(self.steps_todo_list, indent=2))
return self.api_client.start()
if not self.api_client.start():
return False
if not self.api_client.wait_for_finish():
return False
return True
@classmethod
def check_channels(cls, use_channels: List[int]) -> List[int]:
@@ -890,6 +894,26 @@ class PRCXI9300Api:
def start(self) -> bool:
return self.call("IAutomation", "Start")
def wait_for_finish(self) -> bool:
success = False
start = False
while not success:
status = self.step_state_list()
if status is None:
break
if len(status) == 0:
break
if status[-1]["State"] == 2 and start:
success = True
elif status[-1]["State"] > 2:
break
elif status[-1]["State"] == 0:
start = True
else:
time.sleep(1)
return success
def call(self, service: str, method: str, params: Optional[list] = None) -> Any:
payload = json.dumps(
{"ServiceName": service, "MethodName": method, "Paramters": params or []}, separators=(",", ":")