diff --git a/unilabos/devices/liquid_handling/biomek.py b/unilabos/devices/liquid_handling/biomek.py new file mode 100644 index 00000000..40a90a41 --- /dev/null +++ b/unilabos/devices/liquid_handling/biomek.py @@ -0,0 +1,173 @@ +import requests +from typing import List, Sequence, Optional, Union, Literal +from geometry_msgs.msg import Point +from unilabos_msgs.msg import Resource + +from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker # type: ignore +from .liquid_handler_abstract import LiquidHandlerAbstract + + +class LiquidHandlerBiomek(LiquidHandlerAbstract): + """ + Biomek液体处理器的实现类,继承自LiquidHandlerAbstract。 + 该类用于处理Biomek液体处理器的特定操作。 + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._status = "Idle" # 初始状态为 Idle + self._success = False # 初始成功状态为 False + self._status_queue = kwargs.get("status_queue", None) # 状态队列 + self.temp_protocol = {} + + def create_protocol( + self, + protocol_name: str, + protocol_description: str, + protocol_version: str, + protocol_author: str, + protocol_date: str, + none_keys: List[str] = [], + ): + """ + 创建一个新的协议。 + + Args: + protocol_name (str): 协议名称 + protocol_description (str): 协议描述 + protocol_version (str): 协议版本 + protocol_author (str): 协议作者 + protocol_date (str): 协议日期 + protocol_type (str): 协议类型 + none_keys (List[str]): 需要设置为None的键列表 + + Returns: + dict: 创建的协议字典 + """ + self.temp_protocol = { + "meta": { + "name": protocol_name, + "description": protocol_description, + "version": protocol_version, + "author": protocol_author, + "date": protocol_date, + }, + "labwares": [], + "steps": [], + } + return self.temp_protocol + + def run_protocol(self): + """ + 执行创建的实验流程。 + 工作站的完整执行流程是, + 从 create_protocol 开始,创建新的 method, + 随后执行 transfer_liquid 等操作向实验流程添加步骤, + 最后 run_protocol 执行整个方法。 + + Returns: + dict: 执行结果 + """ + if not self.temp_protocol: + raise ValueError("No protocol created. Please create a protocol first.") + + # 模拟执行协议 + self._status = "Running" + self._success = True + # 在这里可以添加实际执行协议的逻辑 + + response = requests.post("localhost:5000/api/protocols", json=self.temp_protocol) + + def create_resource( + self, + resource_tracker: DeviceNodeResourceTracker, + resources: list[Resource], + bind_parent_id: str, + bind_location: dict[str, float], + liquid_input_slot: list[int], + liquid_type: list[str], + liquid_volume: list[int], + slot_on_deck: int, + ): + """ + 创建一个新的资源。 + + Args: + device_id (str): 设备ID + res_id (str): 资源ID + class_name (str): 资源类名 + parent (str): 父级ID + bind_locations (Point): 绑定位置 + liquid_input_slot (list[int]): 液体输入槽列表 + liquid_type (list[str]): 液体类型列表 + liquid_volume (list[int]): 液体体积列表 + slot_on_deck (int): 甲板上的槽位 + + Returns: + dict: 创建的资源字典 + """ + # TODO:需要对好接口,下面这个是临时的 + resource = { + "id": res_id, + "class": class_name, + "parent": parent, + "bind_locations": bind_locations.to_dict(), + "liquid_input_slot": liquid_input_slot, + "liquid_type": liquid_type, + "liquid_volume": liquid_volume, + "slot_on_deck": slot_on_deck, + } + self.temp_protocol["labwares"].append(resource) + return resource + + def transfer_liquid( + self, + sources: Sequence[Container], + targets: Sequence[Container], + tip_racks: Sequence[TipRack], + *, + use_channels: Optional[List[int]] = None, + asp_vols: Union[List[float], float], + dis_vols: Union[List[float], float], + asp_flow_rates: Optional[List[Optional[float]]] = None, + dis_flow_rates: Optional[List[Optional[float]]] = None, + offsets: Optional[List[Coordinate]] = None, + touch_tip: bool = False, + liquid_height: Optional[List[Optional[float]]] = None, + blow_out_air_volume: Optional[List[Optional[float]]] = None, + spread: Literal["wide", "tight", "custom"] = "wide", + is_96_well: bool = False, + mix_stage: Optional[Literal["none", "before", "after", "both"]] = "none", + mix_times: Optional[List(int)] = None, + mix_vol: Optional[int] = None, + mix_rate: Optional[int] = None, + mix_liquid_height: Optional[float] = None, + delays: Optional[List[int]] = None, + none_keys: List[str] = [] + ): + # TODO:需要对好接口,下面这个是临时的 + self.temp_protocol["steps"].append( + { + "type": "transfer", + "sources": [source.to_dict() for source in sources], + "targets": [target.to_dict() for target in targets], + "tip_racks": [tip_rack.to_dict() for tip_rack in tip_racks], + "use_channels": use_channels, + "asp_vols": asp_vols, + "dis_vols": dis_vols, + "asp_flow_rates": asp_flow_rates, + "dis_flow_rates": dis_flow_rates, + "offsets": offsets, + "touch_tip": touch_tip, + "liquid_height": liquid_height, + "blow_out_air_volume": blow_out_air_volume, + "spread": spread, + "is_96_well": is_96_well, + "mix_stage": mix_stage, + "mix_times": mix_times, + "mix_vol": mix_vol, + "mix_rate": mix_rate, + "mix_liquid_height": mix_liquid_height, + "delays": delays, + } + ) diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index c349403e..4faa0427 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -6,13 +6,8 @@ import asyncio import time from pylabrobot.liquid_handling import LiquidHandler -from pylabrobot.resources import ( - Resource, - TipRack, - Container, - Coordinate, - Well -) +from pylabrobot.resources import Resource, TipRack, Container, Coordinate, Well + class LiquidHandlerAbstract(LiquidHandler): """Extended LiquidHandler with additional operations.""" @@ -21,6 +16,19 @@ class LiquidHandlerAbstract(LiquidHandler): # REMOVE LIQUID -------------------------------------------------- # --------------------------------------------------------------- + async def create_protocol( + self, + protocol_name: str, + protocol_description: str, + protocol_version: str, + protocol_author: str, + protocol_date: str, + protocol_type: str, + none_keys: List[str] = [], + ): + """Create a new protocol with the given metadata.""" + pass + async def remove_liquid( self, vols: List[float], @@ -35,26 +43,26 @@ class LiquidHandlerAbstract(LiquidHandler): spread: Optional[Literal["wide", "tight", "custom"]] = "wide", delays: Optional[List[int]] = None, is_96_well: Optional[bool] = False, - top: Optional[List(float)] = None, - none_keys: List[str] = [] + top: Optional[List[float]] = None, + none_keys: List[str] = [], ): """A complete *remove* (aspirate → waste) operation.""" trash = self.deck.get_trash_area() try: if is_96_well: - pass # This mode is not verified + pass # This mode is not verified else: if len(vols) != len(sources): raise ValueError("Length of `vols` must match `sources`.") for src, vol in zip(sources, vols): - self.move_to(src, dis_to_top=top[0] if top else 0) + await self.move_to(src, dis_to_top=top[0] if top else 0) tip = next(self.current_tip) await self.pick_up_tips(tip) await self.aspirate( resources=[src], vols=[vol], - use_channels=use_channels, # only aspirate96 used, default to None + use_channels=use_channels, # only aspirate96 used, default to None flow_rates=[flow_rates[0]] if flow_rates else None, offsets=[offsets[0]] if offsets else None, liquid_height=[liquid_height[0]] if liquid_height else None, @@ -64,15 +72,15 @@ class LiquidHandlerAbstract(LiquidHandler): await self.custom_delay(seconds=delays[0] if delays else 0) await self.dispense( resources=waste_liquid, - vols=[vol], - use_channels=use_channels, - flow_rates=[flow_rates[1]] if flow_rates else None, - offsets=[offsets[1]] if offsets else None, - liquid_height=[liquid_height[1]] if liquid_height else None, - blow_out_air_volume=blow_out_air_volume[1] if blow_out_air_volume else None, - spread=spread, - ) - await self.discard_tips() # For now, each of tips is discarded after use + vols=[vol], + use_channels=use_channels, + flow_rates=[flow_rates[1]] if flow_rates else None, + offsets=[offsets[1]] if offsets else None, + liquid_height=[liquid_height[1]] if liquid_height else None, + blow_out_air_volume=blow_out_air_volume[1] if blow_out_air_volume else None, + spread=spread, + ) + await self.discard_tips() # For now, each of tips is discarded after use except Exception as e: raise RuntimeError(f"Liquid removal failed: {e}") from e @@ -100,13 +108,13 @@ class LiquidHandlerAbstract(LiquidHandler): mix_vol: Optional[int] = None, mix_rate: Optional[int] = None, mix_liquid_height: Optional[float] = None, - none_keys: List[str] = [] + none_keys: List[str] = [], ): """A complete *add* (aspirate reagent → dispense into targets) operation.""" try: if is_96_well: - pass # This mode is not verified. + pass # This mode is not verified. else: if len(asp_vols) != len(targets): raise ValueError("Length of `vols` must match `targets`.") @@ -122,7 +130,7 @@ class LiquidHandlerAbstract(LiquidHandler): offsets=[offsets[0]] if offsets else None, liquid_height=[liquid_height[0]] if liquid_height else None, blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, - spread=spread + spread=spread, ) if delays is not None: await self.custom_delay(seconds=delays[0]) @@ -144,7 +152,8 @@ class LiquidHandlerAbstract(LiquidHandler): mix_vol=mix_vol, offsets=offsets if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, - mix_rate=mix_rate if mix_rate else None) + mix_rate=mix_rate if mix_rate else None, + ) if delays is not None: await self.custom_delay(seconds=delays[1]) await self.touch_tip(targets[_]) @@ -158,13 +167,13 @@ class LiquidHandlerAbstract(LiquidHandler): # --------------------------------------------------------------- async def transfer_liquid( self, - asp_vols: Union[List[float], float], - dis_vols: Union[List[float], float], sources: Sequence[Container], targets: Sequence[Container], tip_racks: Sequence[TipRack], *, use_channels: Optional[List[int]] = None, + asp_vols: Union[List[float], float], + dis_vols: Union[List[float], float], asp_flow_rates: Optional[List[Optional[float]]] = None, dis_flow_rates: Optional[List[Optional[float]]] = None, offsets: Optional[List[Coordinate]] = None, @@ -179,7 +188,7 @@ class LiquidHandlerAbstract(LiquidHandler): mix_rate: Optional[int] = None, mix_liquid_height: Optional[float] = None, delays: Optional[List[int]] = None, - none_keys: List[str] = [] + none_keys: List[str] = [], ): """Transfer liquid from each *source* well/plate to the corresponding *target*. @@ -201,14 +210,15 @@ class LiquidHandlerAbstract(LiquidHandler): # 96‑channel head mode # ------------------------------------------------------------------ if is_96_well: - pass # This mode is not verified + pass # This mode is not verified else: if not (len(asp_vols) == len(sources) and len(dis_vols) == len(targets)): raise ValueError("`sources`, `targets`, and `vols` must have the same length.") tip_iter = self.iter_tips(tip_racks) - for src, tgt, asp_vol, asp_flow_rate, dis_vol, dis_flow_rate in ( - zip(sources, targets, asp_vols, asp_flow_rates, dis_vols, dis_flow_rates)): + for src, tgt, asp_vol, asp_flow_rate, dis_vol, dis_flow_rate in zip( + sources, targets, asp_vols, asp_flow_rates, dis_vols, dis_flow_rates + ): tip = next(tip_iter) await self.pick_up_tips(tip) # Aspirate from source @@ -247,9 +257,9 @@ class LiquidHandlerAbstract(LiquidHandler): except Exception as exc: raise RuntimeError(f"Liquid transfer failed: {exc}") from exc -# --------------------------------------------------------------- -# Helper utilities -# --------------------------------------------------------------- + # --------------------------------------------------------------- + # Helper utilities + # --------------------------------------------------------------- async def custom_delay(self, seconds=0, msg=None): """ @@ -266,28 +276,26 @@ class LiquidHandlerAbstract(LiquidHandler): print(f"Done: {msg}") print(f"Current time: {time.strftime('%H:%M:%S')}") - async def touch_tip(self, - targets: Sequence[Container], - ): + async def touch_tip(self, targets: Sequence[Container]): """Touch the tip to the side of the well.""" await self.aspirate( resources=[targets], vols=[0], use_channels=None, flow_rates=None, - offsets=[Coordinate(x=-targets.get_size_x()/2,y=0,z=0)], + offsets=[Coordinate(x=-targets.get_size_x() / 2, y=0, z=0)], liquid_height=None, - blow_out_air_volume=None + blow_out_air_volume=None, ) - #await self.custom_delay(seconds=1) # In the simulation, we do not need to wait + # await self.custom_delay(seconds=1) # In the simulation, we do not need to wait await self.aspirate( resources=[targets], vols=[0], use_channels=None, flow_rates=None, - offsets=[Coordinate(x=targets.get_size_x()/2,y=0,z=0)], + offsets=[Coordinate(x=targets.get_size_x() / 2, y=0, z=0)], liquid_height=None, - blow_out_air_volume=None + blow_out_air_volume=None, ) async def mix( @@ -298,9 +306,9 @@ class LiquidHandlerAbstract(LiquidHandler): height_to_bottom: Optional[float] = None, offsets: Optional[Coordinate] = None, mix_rate: Optional[float] = None, - none_keys: List[str] = [] + none_keys: List[str] = [], ): - if mix_time is None: # No mixing required + if mix_time is None: # No mixing required return """Mix the liquid in the target wells.""" for _ in range(mix_time): @@ -333,7 +341,7 @@ class LiquidHandlerAbstract(LiquidHandler): tip_iter = self.iter_tips(tip_racks) self.current_tip = tip_iter - async def move_to(self, well: Well, dis_to_top: float = 0 , channel: int = 0): + async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0): """ Move a single channel to a specific well with a given z-height. @@ -352,4 +360,3 @@ class LiquidHandlerAbstract(LiquidHandler): await self.move_channel_x(channel, abs_loc.x) await self.move_channel_y(channel, abs_loc.y) await self.move_channel_z(channel, abs_loc.z + well_height + dis_to_top) - diff --git a/unilabos/registry/devices/liquid_handler.yaml b/unilabos/registry/devices/liquid_handler.yaml index bcddae55..299db7fe 100644 --- a/unilabos/registry/devices/liquid_handler.yaml +++ b/unilabos/registry/devices/liquid_handler.yaml @@ -22,8 +22,8 @@ liquid_handler: is_96_well: is_96_well top: top none_keys: none_keys - feedback: { } - result: { } + feedback: {} + result: {} add_liquid: type: LiquidHandlerAdd goal: @@ -43,8 +43,8 @@ liquid_handler: mix_rate: mix_rate mix_liquid_height: mix_liquid_height none_keys: none_keys - feedback: { } - result: { } + feedback: {} + result: {} transfer_liquid: type: LiquidHandlerTransfer goal: @@ -69,8 +69,8 @@ liquid_handler: mix_liquid_height: mix_liquid_height delays: delays none_keys: none_keys - feedback: { } - result: { } + feedback: {} + result: {} mix: type: LiquidHandlerMix goal: @@ -81,16 +81,16 @@ liquid_handler: offsets: offsets mix_rate: mix_rate none_keys: none_keys - feedback: { } - result: { } + feedback: {} + result: {} move_to: type: LiquidHandlerMoveTo goal: well: well dis_to_top: dis_to_top channel: channel - feedback: { } - result: { } + feedback: {} + result: {} aspirate: type: LiquidHandlerAspirate goal: @@ -272,3 +272,62 @@ liquid_handler.revvity: status: status result: success: success + +liquid_handler.biomek: + description: Biomek液体处理器设备,基于pylabrobot控制 + icon: icon_yiyezhan.webp + class: + module: unilabos.devices.liquid_handling.biomek:LiquidHandlerBiomek + type: python + status_types: + status: String + success: Boolean + action_value_mappings: + create_protocol: + type: LiquidHandlerProtocolCreation + goal: + protocol_name: protocol_name + protocol_description: protocol_description + protocol_version: protocol_version + protocol_author: protocol_author + protocol_date: protocol_date + protocol_type: protocol_type + none_keys: none_keys + feedback: {} + result: {} + run_protocol: + type: EmptyIn + goal: {} + feedback: {} + result: {} + transfer_liquid: + type: LiquidHandlerTransfer + goal: + asp_vols: asp_vols + dis_vols: dis_vols + sources: sources + targets: targets + tip_racks: tip_racks + use_channels: use_channels + asp_flow_rates: asp_flow_rates + dis_flow_rates: dis_flow_rates + offsets: offsets + touch_tip: touch_tip + liquid_height: liquid_height + blow_out_air_volume: blow_out_air_volume + spread: spread + is_96_well: is_96_well + mix_stage: mix_stage + mix_times: mix_times + mix_vol: mix_vol + mix_rate: mix_rate + mix_liquid_height: mix_liquid_height + delays: delays + none_keys: none_keys + feedback: {} + result: {} + schema: + type: object + properties: {} + required: [] + additionalProperties: false diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 28b67aa4..daf52300 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -349,6 +349,20 @@ class BaseROS2DeviceNode(Node, Generic[T]): response = rclient.call(request) # 应该先add_resource了 res.response = "OK" + # 如果driver自己就有assign的方法,那就使用driver自己的assign方法 + if hasattr(self.driver_instance, "create_resource"): + create_resource_func = getattr(self.driver_instance, "create_resource") + create_resource_func( + resource_tracker=self.resource_tracker, + resources=request.resources, + bind_parent_id=bind_parent_id, + bind_location=location, + liquid_input_slot=LIQUID_INPUT_SLOT, + liquid_type=ADD_LIQUID_TYPE, + liquid_volume=LIQUID_VOLUME, + slot_on_deck=slot, + ) + return res # 接下来该根据bind_parent_id进行assign了,目前只有plr可以进行assign,不然没有办法输入到物料系统中 resource = self.resource_tracker.figure_resource({"name": bind_parent_id}) # request.resources = [convert_to_ros_msg(Resource, resources)] diff --git a/unilabos_msgs/CMakeLists.txt b/unilabos_msgs/CMakeLists.txt index 0cd6a1e3..135e5738 100644 --- a/unilabos_msgs/CMakeLists.txt +++ b/unilabos_msgs/CMakeLists.txt @@ -29,6 +29,8 @@ set(action_files "action/HeatChillStart.action" "action/HeatChillStop.action" + "action/LiquidHandlerProtocolCreation.action" + "action/LiquidHandlerAspirate.action" "action/LiquidHandlerDiscardTips.action" "action/LiquidHandlerDispense.action" diff --git a/unilabos_msgs/action/LiquidHandlerProtocolCreation.action b/unilabos_msgs/action/LiquidHandlerProtocolCreation.action new file mode 100644 index 00000000..de8c605a --- /dev/null +++ b/unilabos_msgs/action/LiquidHandlerProtocolCreation.action @@ -0,0 +1,9 @@ +string protocol_name +string protocol_description +string protocol_version +string protocol_author +string protocol_date +string protocol_type +string[] none_keys +--- +---