From ab2ab7fcc7b20d5566271dd4bdd4569820fc3eed Mon Sep 17 00:00:00 2001
From: KCFeng425 <2100011801@stu.pku.edu.cn>
Date: Mon, 7 Jul 2025 18:35:35 +0800
Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E5=9B=BA=E4=BD=93?=
=?UTF-8?q?=E5=8A=A0=E6=A0=B7=E5=99=A8=EF=BC=8C=E4=B8=B0=E5=AF=8C=E4=BA=86?=
=?UTF-8?q?json=EF=BC=8C=E4=BF=AE=E6=94=B9=E4=BA=86add=20protocol?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../comprehensive_protocol/checklist.md | 10 +-
.../comprehensive_station.json | 138 ++-
unilabos/compile/add_protocol.py | 819 ++++++------------
.../virtual/virtual_solid_dispenser.py | 335 +++++++
unilabos/registry/devices/virtual_device.yaml | 355 ++++----
unilabos_msgs/action/AddSolid.action | 15 +
6 files changed, 926 insertions(+), 746 deletions(-)
create mode 100644 unilabos/devices/virtual/virtual_solid_dispenser.py
create mode 100644 unilabos_msgs/action/AddSolid.action
diff --git a/test/experiments/comprehensive_protocol/checklist.md b/test/experiments/comprehensive_protocol/checklist.md
index 9645aa4..4762cd2 100644
--- a/test/experiments/comprehensive_protocol/checklist.md
+++ b/test/experiments/comprehensive_protocol/checklist.md
@@ -70,7 +70,7 @@ class SeparateProtocol(BaseModel):
repeats: int
stir_time: float
stir_speed: float
- settling_time: float
+ settling_time: float 写了action
class EvaporateProtocol(BaseModel):
@@ -102,7 +102,7 @@ class AddProtocol(BaseModel):
vessel="main_reactor" volume="2.67 mL"/>
viscous: bool
- purpose: str
+ purpose: str 写了action
class CentrifugeProtocol(BaseModel):
vessel: str
@@ -127,7 +127,7 @@ class HeatChillProtocol(BaseModel):
- stir: bool
+ stir: bool 处理了
stir_speed: float
purpose: str
@@ -180,7 +180,7 @@ class DissolveProtocol(BaseModel):
amount: str = ""
temp: float = 25.0
time: float = 0.0
- stir_speed: float = 0.0
+ stir_speed: float = 0.0 写了action
class FilterThroughProtocol(BaseModel):
from_vessel: str
@@ -194,7 +194,7 @@ class FilterThroughProtocol(BaseModel):
class RunColumnProtocol(BaseModel):
from_vessel: str
to_vessel: str
- column: str
+ column: str 写了action
class WashSolidProtocol(BaseModel):
vessel: str
diff --git a/test/experiments/comprehensive_protocol/comprehensive_station.json b/test/experiments/comprehensive_protocol/comprehensive_station.json
index d0f5c6a..aaa4dfc 100644
--- a/test/experiments/comprehensive_protocol/comprehensive_station.json
+++ b/test/experiments/comprehensive_protocol/comprehensive_station.json
@@ -32,7 +32,11 @@
"separator_1",
"collection_bottle_1",
"collection_bottle_2",
- "collection_bottle_3"
+ "collection_bottle_3",
+ "solid_dispenser_1",
+ "solid_reagent_bottle_1",
+ "solid_reagent_bottle_2",
+ "solid_reagent_bottle_3"
],
"parent": null,
"type": "device",
@@ -672,6 +676,98 @@
"data": {
"current_volume": 0.0
}
+ },
+ {
+ "id": "solid_dispenser_1",
+ "name": "固体粉末加样器",
+ "children": [],
+ "parent": "OrganicSynthesisStation",
+ "type": "device",
+ "class": "virtual_solid_dispenser",
+ "position": {
+ "x": 600,
+ "y": 300,
+ "z": 0
+ },
+ "config": {
+ "max_capacity": 100.0,
+ "precision": 0.001
+ },
+ "data": {
+ "status": "Ready",
+ "current_reagent": "",
+ "dispensed_amount": 0.0,
+ "total_operations": 0
+ }
+ },
+ {
+ "id": "solid_reagent_bottle_1",
+ "name": "固体试剂瓶1-氯化钠",
+ "children": [],
+ "parent": "OrganicSynthesisStation",
+ "type": "container",
+ "class": "container",
+ "position": {
+ "x": 550,
+ "y": 250,
+ "z": 0
+ },
+ "config": {
+ "volume": 500.0,
+ "reagent": "sodium_chloride",
+ "physical_state": "solid"
+ },
+ "data": {
+ "current_mass": 500.0,
+ "reagent_name": "sodium_chloride",
+ "physical_state": "solid"
+ }
+ },
+ {
+ "id": "solid_reagent_bottle_2",
+ "name": "固体试剂瓶2-碳酸钠",
+ "children": [],
+ "parent": "OrganicSynthesisStation",
+ "type": "container",
+ "class": "container",
+ "position": {
+ "x": 600,
+ "y": 250,
+ "z": 0
+ },
+ "config": {
+ "volume": 500.0,
+ "reagent": "sodium_carbonate",
+ "physical_state": "solid"
+ },
+ "data": {
+ "current_mass": 500.0,
+ "reagent_name": "sodium_carbonate",
+ "physical_state": "solid"
+ }
+ },
+ {
+ "id": "solid_reagent_bottle_3",
+ "name": "固体试剂瓶3-氯化镁",
+ "children": [],
+ "parent": "OrganicSynthesisStation",
+ "type": "container",
+ "class": "container",
+ "position": {
+ "x": 650,
+ "y": 250,
+ "z": 0
+ },
+ "config": {
+ "volume": 500.0,
+ "reagent": "magnesium_chloride",
+ "physical_state": "solid"
+ },
+ "data": {
+ "current_mass": 500.0,
+ "reagent_name": "magnesium_chloride",
+ "physical_state": "solid"
+ }
}
],
"links": [
@@ -964,6 +1060,46 @@
"solenoid_valve_3": "out",
"main_reactor": "top"
}
+ },
+ {
+ "id": "link_solid_dispenser_to_reactor",
+ "source": "solid_dispenser_1",
+ "target": "main_reactor",
+ "type": "resource",
+ "port": {
+ "solid_dispenser_1": "SolidOut",
+ "main_reactor": "top"
+ }
+ },
+ {
+ "id": "link_solid_bottle1_to_dispenser",
+ "source": "solid_reagent_bottle_1",
+ "target": "solid_dispenser_1",
+ "type": "resource",
+ "port": {
+ "solid_reagent_bottle_1": "top",
+ "solid_dispenser_1": "SolidIn"
+ }
+ },
+ {
+ "id": "link_solid_bottle2_to_dispenser",
+ "source": "solid_reagent_bottle_2",
+ "target": "solid_dispenser_1",
+ "type": "resource",
+ "port": {
+ "solid_reagent_bottle_2": "top",
+ "solid_dispenser_1": "SolidIn"
+ }
+ },
+ {
+ "id": "link_solid_bottle3_to_dispenser",
+ "source": "solid_reagent_bottle_3",
+ "target": "solid_dispenser_1",
+ "type": "resource",
+ "port": {
+ "solid_reagent_bottle_3": "top",
+ "solid_dispenser_1": "SolidIn"
+ }
}
]
}
\ No newline at end of file
diff --git a/unilabos/compile/add_protocol.py b/unilabos/compile/add_protocol.py
index 144ec96..6c51cc9 100644
--- a/unilabos/compile/add_protocol.py
+++ b/unilabos/compile/add_protocol.py
@@ -4,389 +4,92 @@ from .pump_protocol import generate_pump_protocol_with_rinsing
def find_reagent_vessel(G: nx.DiGraph, reagent: str) -> str:
- """
- 根据试剂名称查找对应的试剂瓶,支持多种匹配模式:
- 1. 容器名称匹配(如 flask_DMF, reagent_bottle_1-DMF)
- 2. 容器内液体类型匹配(如 liquid_type: "DMF", name: "ethanol")
- 3. 试剂名称匹配(如 reagent_name: "DMF", config.reagent: "ethyl_acetate")
+ """增强版试剂容器查找,支持固体和液体"""
+ print(f"ADD_PROTOCOL: 查找试剂 '{reagent}' 的容器...")
- Args:
- G: 网络图
- reagent: 试剂名称
-
- Returns:
- str: 试剂瓶的vessel ID
-
- Raises:
- ValueError: 如果找不到对应的试剂瓶
- """
- print(f"ADD_PROTOCOL: 正在查找试剂 '{reagent}' 的容器...")
-
- # 第一步:通过容器名称匹配
+ # 1. 直接名称匹配
possible_names = [
- f"flask_{reagent}", # flask_DMF, flask_ethanol
- f"bottle_{reagent}", # bottle_DMF, bottle_ethanol
- f"vessel_{reagent}", # vessel_DMF, vessel_ethanol
- f"{reagent}_flask", # DMF_flask, ethanol_flask
- f"{reagent}_bottle", # DMF_bottle, ethanol_bottle
- f"{reagent}", # 直接用试剂名
- f"reagent_{reagent}", # reagent_DMF, reagent_ethanol
- f"reagent_bottle_{reagent}", # reagent_bottle_DMF
+ reagent,
+ f"flask_{reagent}",
+ f"bottle_{reagent}",
+ f"vessel_{reagent}",
+ f"{reagent}_flask",
+ f"{reagent}_bottle",
+ f"reagent_{reagent}",
+ f"reagent_bottle_{reagent}",
+ f"solid_reagent_bottle_{reagent}", # 🔧 添加固体试剂瓶匹配
]
- # 尝试名称匹配
- for vessel_name in possible_names:
- if vessel_name in G.nodes():
- print(f"ADD_PROTOCOL: 通过名称匹配找到容器: {vessel_name}")
- return vessel_name
+ for name in possible_names:
+ if name in G.nodes():
+ print(f"ADD_PROTOCOL: 找到容器: {name}")
+ return name
- # 第二步:通过模糊名称匹配(名称中包含试剂名)
+ # 2. 模糊匹配 - 检查容器数据
for node_id in G.nodes():
- if G.nodes[node_id].get('type') == 'container':
- # 检查节点ID或名称中是否包含试剂名
- node_name = G.nodes[node_id].get('name', '').lower()
- if (reagent.lower() in node_id.lower() or
- reagent.lower() in node_name):
- print(f"ADD_PROTOCOL: 通过模糊名称匹配找到容器: {node_id} (名称: {node_name})")
+ node_data = G.nodes[node_id]
+ if node_data.get('type') == 'container':
+ # 检查配置中的试剂名称
+ config_reagent = node_data.get('config', {}).get('reagent', '')
+ data_reagent = node_data.get('data', {}).get('reagent_name', '')
+
+ # 名称匹配
+ if (config_reagent.lower() == reagent.lower() or
+ data_reagent.lower() == reagent.lower() or
+ reagent.lower() in node_id.lower()):
+ print(f"ADD_PROTOCOL: 模糊匹配到容器: {node_id}")
return node_id
-
- # 第三步:通过液体类型匹配
- for node_id in G.nodes():
- if G.nodes[node_id].get('type') == 'container':
- vessel_data = G.nodes[node_id].get('data', {})
+
+ # 液体类型匹配(保持原有逻辑)
+ vessel_data = node_data.get('data', {})
liquids = vessel_data.get('liquid', [])
-
- for liquid in liquids:
- if isinstance(liquid, dict):
- # 支持两种格式的液体类型字段
- liquid_type = liquid.get('liquid_type') or liquid.get('name', '')
- reagent_name = vessel_data.get('reagent_name', '')
- config_reagent = G.nodes[node_id].get('config', {}).get('reagent', '')
-
- # 检查多个可能的字段
- if (liquid_type.lower() == reagent.lower() or
- reagent_name.lower() == reagent.lower() or
- config_reagent.lower() == reagent.lower()):
- print(f"ADD_PROTOCOL: 通过液体类型匹配找到容器: {node_id}")
- print(f" - liquid_type: {liquid_type}")
- print(f" - reagent_name: {reagent_name}")
- print(f" - config.reagent: {config_reagent}")
- return node_id
-
- # 第四步:列出所有可用的容器信息帮助调试
- available_containers = []
- for node_id in G.nodes():
- if G.nodes[node_id].get('type') == 'container':
- vessel_data = G.nodes[node_id].get('data', {})
- config_data = G.nodes[node_id].get('config', {})
- liquids = vessel_data.get('liquid', [])
-
- container_info = {
- 'id': node_id,
- 'name': G.nodes[node_id].get('name', ''),
- 'liquid_types': [],
- 'reagent_name': vessel_data.get('reagent_name', ''),
- 'config_reagent': config_data.get('reagent', '')
- }
-
- for liquid in liquids:
- if isinstance(liquid, dict):
- liquid_type = liquid.get('liquid_type') or liquid.get('name', '')
- if liquid_type:
- container_info['liquid_types'].append(liquid_type)
-
- available_containers.append(container_info)
-
- print(f"ADD_PROTOCOL: 可用容器列表:")
- for container in available_containers:
- print(f" - {container['id']}: {container['name']}")
- print(f" 液体类型: {container['liquid_types']}")
- print(f" 试剂名称: {container['reagent_name']}")
- print(f" 配置试剂: {container['config_reagent']}")
-
- raise ValueError(f"找不到试剂 '{reagent}' 对应的试剂瓶。尝试了名称匹配: {possible_names}")
-
-
-def find_reagent_vessel_by_any_match(G: nx.DiGraph, reagent: str) -> str:
- """
- 增强版试剂容器查找,支持各种匹配方式的别名函数
- """
- return find_reagent_vessel(G, reagent)
-
-
-def get_vessel_reagent_volume(G: nx.DiGraph, vessel: str) -> float:
- """获取容器中的试剂体积"""
- if vessel not in G.nodes():
- return 0.0
-
- vessel_data = G.nodes[vessel].get('data', {})
- liquids = vessel_data.get('liquid', [])
-
- total_volume = 0.0
- for liquid in liquids:
- if isinstance(liquid, dict):
- # 支持两种格式:新格式 (name, volume) 和旧格式 (liquid_type, liquid_volume)
- volume = liquid.get('volume') or liquid.get('liquid_volume', 0.0)
- total_volume += volume
-
- return total_volume
-
-
-def get_vessel_reagent_types(G: nx.DiGraph, vessel: str) -> List[str]:
- """获取容器中所有试剂的类型"""
- if vessel not in G.nodes():
- return []
-
- vessel_data = G.nodes[vessel].get('data', {})
- liquids = vessel_data.get('liquid', [])
-
- reagent_types = []
- for liquid in liquids:
- if isinstance(liquid, dict):
- # 支持两种格式的试剂类型字段
- reagent_type = liquid.get('liquid_type') or liquid.get('name', '')
- if reagent_type:
- reagent_types.append(reagent_type)
-
- # 同时检查配置中的试剂信息
- config_reagent = G.nodes[vessel].get('config', {}).get('reagent', '')
- reagent_name = vessel_data.get('reagent_name', '')
-
- if config_reagent and config_reagent not in reagent_types:
- reagent_types.append(config_reagent)
- if reagent_name and reagent_name not in reagent_types:
- reagent_types.append(reagent_name)
-
- return reagent_types
-
-
-def find_vessels_by_reagent(G: nx.DiGraph, reagent: str) -> List[str]:
- """
- 根据试剂类型查找所有匹配的容器
- 返回匹配容器的ID列表
- """
- matching_vessels = []
-
- for node_id in G.nodes():
- if G.nodes[node_id].get('type') == 'container':
- # 检查容器名称匹配
- node_name = G.nodes[node_id].get('name', '').lower()
- if reagent.lower() in node_id.lower() or reagent.lower() in node_name:
- matching_vessels.append(node_id)
- continue
-
- # 检查试剂类型匹配
- vessel_data = G.nodes[node_id].get('data', {})
- liquids = vessel_data.get('liquid', [])
- config_data = G.nodes[node_id].get('config', {})
-
- # 检查 reagent_name 和 config.reagent
- reagent_name = vessel_data.get('reagent_name', '').lower()
- config_reagent = config_data.get('reagent', '').lower()
-
- if (reagent.lower() == reagent_name or
- reagent.lower() == config_reagent):
- matching_vessels.append(node_id)
- continue
-
- # 检查液体列表
for liquid in liquids:
if isinstance(liquid, dict):
liquid_type = liquid.get('liquid_type') or liquid.get('name', '')
if liquid_type.lower() == reagent.lower():
- matching_vessels.append(node_id)
- break
+ print(f"ADD_PROTOCOL: 液体类型匹配到容器: {node_id}")
+ return node_id
- return matching_vessels
+ raise ValueError(f"找不到试剂 '{reagent}' 对应的容器")
def find_connected_stirrer(G: nx.DiGraph, vessel: str) -> str:
- """
- 查找与指定容器相连的搅拌器
+ """查找连接到指定容器的搅拌器"""
+ stirrer_nodes = []
+ for node in G.nodes():
+ node_class = G.nodes[node].get('class', '').lower()
+ if 'stirrer' in node_class:
+ stirrer_nodes.append(node)
- Args:
- G: 网络图
- vessel: 容器ID
-
- Returns:
- str: 搅拌器ID,如果找不到则返回None
- """
- # 查找所有搅拌器节点
- stirrer_nodes = [node for node in G.nodes()
- if (G.nodes[node].get('class') or '') == 'virtual_stirrer']
-
- # 检查哪个搅拌器与目标容器相连
+ # 查找连接到容器的搅拌器
for stirrer in stirrer_nodes:
if G.has_edge(stirrer, vessel) or G.has_edge(vessel, stirrer):
+ print(f"ADD_PROTOCOL: 找到连接的搅拌器: {stirrer}")
return stirrer
- # 如果没有直接连接,返回第一个可用的搅拌器
- return stirrer_nodes[0] if stirrer_nodes else None
+ # 返回第一个搅拌器
+ if stirrer_nodes:
+ print(f"ADD_PROTOCOL: 使用第一个搅拌器: {stirrer_nodes[0]}")
+ return stirrer_nodes[0]
+
+ return None
+
+
+def find_solid_dispenser(G: nx.DiGraph) -> str:
+ """查找固体加样器"""
+ for node in G.nodes():
+ node_class = G.nodes[node].get('class', '').lower()
+ if 'solid_dispenser' in node_class:
+ print(f"ADD_PROTOCOL: 找到固体加样器: {node}")
+ return node
+ return None
def generate_add_protocol(
G: nx.DiGraph,
vessel: str,
reagent: str,
- volume: float,
- mass: float = 0.0,
- amount: str = "",
- time: float = 0.0,
- stir: bool = False,
- stir_speed: float = 300.0,
- viscous: bool = False,
- purpose: str = "添加试剂"
-) -> List[Dict[str, Any]]:
- """
- 生成添加试剂的协议序列,支持智能试剂匹配
-
- 基于pump_protocol的成熟算法,实现试剂添加功能:
- 1. 智能查找试剂瓶(支持名称匹配、液体类型匹配、试剂配置匹配)
- 2. **先启动搅拌,再进行转移** - 确保试剂添加更均匀
- 3. 使用pump_protocol实现液体转移
-
- Args:
- G: 有向图,节点为容器和设备,边为连接关系
- vessel: 目标容器(要添加试剂的容器)
- reagent: 试剂名称(用于查找对应的试剂瓶)
- volume: 要添加的体积 (mL)
- mass: 要添加的质量 (g) - 暂时未使用,预留接口
- amount: 其他数量描述
- time: 添加时间 (s),如果指定则计算流速
- stir: 是否启用搅拌
- stir_speed: 搅拌速度 (RPM)
- viscous: 是否为粘稠液体
- purpose: 添加目的描述
-
- Returns:
- List[Dict[str, Any]]: 动作序列
-
- Raises:
- ValueError: 当找不到必要的设备或容器时
- """
- action_sequence = []
-
- print(f"ADD_PROTOCOL: 开始生成添加试剂协议")
- print(f" - 目标容器: {vessel}")
- print(f" - 试剂: {reagent}")
- print(f" - 体积: {volume} mL")
- print(f" - 质量: {mass} g")
- print(f" - 搅拌: {stir} (速度: {stir_speed} RPM)")
- print(f" - 粘稠: {viscous}")
- print(f" - 目的: {purpose}")
-
- # 1. 验证目标容器存在
- if vessel not in G.nodes():
- raise ValueError(f"目标容器 '{vessel}' 不存在于系统中")
-
- # 2. 智能查找试剂瓶
- try:
- reagent_vessel = find_reagent_vessel(G, reagent)
- print(f"ADD_PROTOCOL: 找到试剂容器: {reagent_vessel}")
- except ValueError as e:
- raise ValueError(f"无法找到试剂 '{reagent}': {str(e)}")
-
- # 3. 验证试剂容器中的试剂体积
- available_volume = get_vessel_reagent_volume(G, reagent_vessel)
- print(f"ADD_PROTOCOL: 试剂容器 {reagent_vessel} 中有 {available_volume} mL 试剂")
-
- if available_volume < volume:
- print(f"ADD_PROTOCOL: 警告 - 试剂容器中的试剂不足!需要 {volume} mL,可用 {available_volume} mL")
-
- # 4. 验证是否存在从试剂瓶到目标容器的路径
- try:
- path = nx.shortest_path(G, source=reagent_vessel, target=vessel)
- print(f"ADD_PROTOCOL: 找到路径 {reagent_vessel} -> {vessel}: {path}")
- except nx.NetworkXNoPath:
- raise ValueError(f"从试剂瓶 '{reagent_vessel}' 到目标容器 '{vessel}' 没有可用路径")
-
- # 5. **先启动搅拌** - 关键改进!
- if stir:
- try:
- stirrer_id = find_connected_stirrer(G, vessel)
-
- if stirrer_id:
- print(f"ADD_PROTOCOL: 找到搅拌器 {stirrer_id},将在添加前启动搅拌")
-
- # 先启动搅拌
- stir_action = {
- "device_id": stirrer_id,
- "action_name": "start_stir",
- "action_kwargs": {
- "vessel": vessel,
- "stir_speed": stir_speed,
- "purpose": f"{purpose}: 启动搅拌,准备添加 {reagent}"
- }
- }
-
- action_sequence.append(stir_action)
- print(f"ADD_PROTOCOL: 已添加搅拌动作,速度 {stir_speed} RPM")
-
- # 等待搅拌稳定
- action_sequence.append({
- "action_name": "wait",
- "action_kwargs": {"time": 5}
- })
- else:
- print(f"ADD_PROTOCOL: 警告 - 需要搅拌但未找到与容器 {vessel} 相连的搅拌器")
-
- except Exception as e:
- print(f"ADD_PROTOCOL: 搅拌器配置出错: {str(e)}")
-
- # 6. 如果指定了体积,执行液体转移
- if volume > 0:
- # 6.1 计算流速参数
- if time > 0:
- # 根据时间计算流速
- transfer_flowrate = volume / time
- flowrate = transfer_flowrate
- else:
- # 使用默认流速
- if viscous:
- transfer_flowrate = 0.3 # 粘稠液体用较慢速度
- flowrate = 1.0
- else:
- transfer_flowrate = 0.5 # 普通液体默认速度
- flowrate = 2.5
-
- print(f"ADD_PROTOCOL: 准备转移 {volume} mL 从 {reagent_vessel} 到 {vessel}")
- print(f"ADD_PROTOCOL: 转移流速={transfer_flowrate} mL/s, 注入流速={flowrate} mL/s")
-
- # 6.2 使用pump_protocol的核心算法实现液体转移
- try:
- pump_actions = generate_pump_protocol_with_rinsing(
- G=G,
- from_vessel=reagent_vessel,
- to_vessel=vessel,
- volume=volume,
- amount=amount,
- time=time,
- viscous=viscous,
- rinsing_solvent="", # 添加试剂通常不需要清洗
- rinsing_volume=0.0,
- rinsing_repeats=0,
- solid=False,
- flowrate=flowrate,
- transfer_flowrate=transfer_flowrate
- )
-
- # 添加pump actions到序列中
- action_sequence.extend(pump_actions)
-
- except Exception as e:
- raise ValueError(f"生成泵协议时出错: {str(e)}")
-
- print(f"ADD_PROTOCOL: 生成了 {len(action_sequence)} 个动作")
- print(f"ADD_PROTOCOL: 添加试剂协议生成完成")
-
- return action_sequence
-
-
-def generate_add_protocol_with_cleaning(
- G: nx.DiGraph,
- vessel: str,
- reagent: str,
- volume: float,
+ volume: float = 0.0,
mass: float = 0.0,
amount: str = "",
time: float = 0.0,
@@ -394,233 +97,239 @@ def generate_add_protocol_with_cleaning(
stir_speed: float = 300.0,
viscous: bool = False,
purpose: str = "添加试剂",
- cleaning_solvent: str = "air",
- cleaning_volume: float = 5.0,
- cleaning_repeats: int = 1
+ # 新增XDL参数
+ mol: str = "",
+ event: str = "",
+ rate_spec: str = "",
+ equiv: str = "",
+ ratio: str = "",
+ **kwargs
) -> List[Dict[str, Any]]:
"""
- 生成带清洗的添加试剂协议,支持智能试剂匹配
+ 生成添加试剂协议
- 与普通添加协议的区别是会在添加后进行管道清洗
-
- Args:
- G: 有向图
- vessel: 目标容器
- reagent: 试剂名称
- volume: 添加体积
- mass: 添加质量(预留)
- amount: 其他数量描述
- time: 添加时间
- stir: 是否搅拌
- stir_speed: 搅拌速度
- viscous: 是否粘稠
- purpose: 添加目的
- cleaning_solvent: 清洗溶剂("air"表示空气清洗)
- cleaning_volume: 清洗体积
- cleaning_repeats: 清洗重复次数
-
- Returns:
- List[Dict[str, Any]]: 动作序列
+ 智能判断:
+ - 有 mass 或 mol → 固体加样器
+ - 有 volume → 液体转移
+ - 都没有 → 默认液体 1mL
"""
+
+ print(f"ADD_PROTOCOL: 添加 {reagent} 到 {vessel}")
+ print(f" - 体积: {volume} mL, 质量: {mass} g, 摩尔: {mol}")
+ print(f" - 时间: {time} s, 事件: {event}, 速率: {rate_spec}")
+
+ # 1. 验证容器
+ if vessel not in G.nodes():
+ raise ValueError(f"容器 '{vessel}' 不存在")
+
+ # 2. 判断固体 vs 液体
+ is_solid = (mass > 0 or mol.strip() != "")
+
action_sequence = []
- # 1. 智能查找试剂瓶
- reagent_vessel = find_reagent_vessel(G, reagent)
-
- # 2. **先启动搅拌**
- if stir:
- stirrer_id = find_connected_stirrer(G, vessel)
- if stirrer_id:
- action_sequence.append({
- "device_id": stirrer_id,
- "action_name": "start_stir",
- "action_kwargs": {
- "vessel": vessel,
- "stir_speed": stir_speed,
- "purpose": f"{purpose}: 启动搅拌,准备添加 {reagent}"
- }
- })
-
- # 等待搅拌稳定
- action_sequence.append({
- "action_name": "wait",
- "action_kwargs": {"time": 5}
- })
-
- # 3. 计算流速
- if time > 0:
- transfer_flowrate = volume / time
- flowrate = transfer_flowrate
- else:
- if viscous:
- transfer_flowrate = 0.3
- flowrate = 1.0
- else:
- transfer_flowrate = 0.5
- flowrate = 2.5
-
- # 4. 使用带清洗的pump_protocol
- pump_actions = generate_pump_protocol_with_rinsing(
- G=G,
- from_vessel=reagent_vessel,
- to_vessel=vessel,
- volume=volume,
- amount=amount,
- time=time,
- viscous=viscous,
- rinsing_solvent=cleaning_solvent,
- rinsing_volume=cleaning_volume,
- rinsing_repeats=cleaning_repeats,
- solid=False,
- flowrate=flowrate,
- transfer_flowrate=transfer_flowrate
- )
-
- action_sequence.extend(pump_actions)
-
- return action_sequence
-
-
-def generate_sequential_add_protocol(
- G: nx.DiGraph,
- vessel: str,
- reagents: List[Dict[str, Any]],
- stir_between_additions: bool = True,
- final_stir: bool = True,
- final_stir_speed: float = 400.0,
- final_stir_time: float = 300.0
-) -> List[Dict[str, Any]]:
- """
- 生成连续添加多种试剂的协议,支持智能试剂匹配
-
- Args:
- G: 网络图
- vessel: 目标容器
- reagents: 试剂列表,每个元素包含试剂添加参数
- stir_between_additions: 是否在每次添加之间搅拌
- final_stir: 是否在所有添加完成后进行最终搅拌
- final_stir_speed: 最终搅拌速度
- final_stir_time: 最终搅拌时间
-
- Returns:
- List[Dict[str, Any]]: 完整的动作序列
-
- Example:
- reagents = [
- {
- "reagent": "DMF", # 会匹配 reagent_bottle_1 (reagent_name: "DMF")
- "volume": 10.0,
- "viscous": False,
- "stir_speed": 300.0
- },
- {
- "reagent": "ethyl_acetate", # 会匹配 reagent_bottle_2 (reagent_name: "ethyl_acetate")
- "volume": 5.0,
- "viscous": False,
- "stir_speed": 350.0
- }
- ]
- """
- action_sequence = []
-
- print(f"ADD_PROTOCOL: 开始连续添加 {len(reagents)} 种试剂到容器 {vessel}")
-
- for i, reagent_params in enumerate(reagents):
- reagent_name = reagent_params.get('reagent')
- print(f"ADD_PROTOCOL: 处理第 {i+1}/{len(reagents)} 个试剂: {reagent_name}")
+ if is_solid:
+ # === 固体加样路径 ===
+ print(f"ADD_PROTOCOL: 使用固体加样器")
- # 生成单个试剂的添加协议
- add_actions = generate_add_protocol(
- G=G,
- vessel=vessel,
- reagent=reagent_name,
- volume=reagent_params.get('volume', 0.0),
- mass=reagent_params.get('mass', 0.0),
- amount=reagent_params.get('amount', ''),
- time=reagent_params.get('time', 0.0),
- stir=stir_between_additions,
- stir_speed=reagent_params.get('stir_speed', 300.0),
- viscous=reagent_params.get('viscous', False),
- purpose=reagent_params.get('purpose', f'添加试剂 {reagent_name} ({i+1}/{len(reagents)})')
- )
+ solid_dispenser = find_solid_dispenser(G)
+ if not solid_dispenser:
+ raise ValueError("未找到固体加样器")
- action_sequence.extend(add_actions)
-
- # 在添加之间加入等待时间
- if i < len(reagents) - 1: # 不是最后一个试剂
- action_sequence.append({
- "action_name": "wait",
- "action_kwargs": {"time": 10} # 试剂混合时间
- })
-
- # 最终搅拌
- if final_stir:
- stirrer_id = find_connected_stirrer(G, vessel)
- if stirrer_id:
- print(f"ADD_PROTOCOL: 添加最终搅拌动作,速度 {final_stir_speed} RPM,时间 {final_stir_time} 秒")
- action_sequence.extend([
- {
+ # 启动搅拌(如果需要)
+ if stir:
+ stirrer_id = find_connected_stirrer(G, vessel)
+ if stirrer_id:
+ action_sequence.append({
"device_id": stirrer_id,
- "action_name": "stir",
+ "action_name": "start_stir",
"action_kwargs": {
- "stir_time": final_stir_time,
- "stir_speed": final_stir_speed,
- "settling_time": 30.0
+ "vessel": vessel,
+ "stir_speed": stir_speed,
+ "purpose": f"准备添加固体 {reagent}"
}
- }
- ])
+ })
+ # 等待搅拌稳定
+ action_sequence.append({
+ "action_name": "wait",
+ "action_kwargs": {"time": 3}
+ })
+
+ # 固体加样
+ action_sequence.append({
+ "device_id": solid_dispenser,
+ "action_name": "add_solid",
+ "action_kwargs": {
+ "vessel": vessel,
+ "reagent": reagent,
+ "mass": str(mass) if mass > 0 else "",
+ "mol": mol,
+ "purpose": purpose,
+ "event": event
+ }
+ })
+
+ else:
+ # === 液体转移路径 ===
+ print(f"ADD_PROTOCOL: 使用液体转移")
+
+ # 默认体积
+ if volume <= 0:
+ volume = 1.0
+ print(f"ADD_PROTOCOL: 使用默认体积 1mL")
+
+ # 查找试剂容器
+ try:
+ reagent_vessel = find_reagent_vessel(G, reagent)
+ except ValueError as e:
+ # 🔧 更友好的错误提示
+ available_reagents = []
+ for node_id in G.nodes():
+ node_data = G.nodes[node_id]
+ if node_data.get('type') == 'container':
+ config_reagent = node_data.get('config', {}).get('reagent', '')
+ data_reagent = node_data.get('data', {}).get('reagent_name', '')
+ if config_reagent:
+ available_reagents.append(f"{node_id}({config_reagent})")
+ elif data_reagent:
+ available_reagents.append(f"{node_id}({data_reagent})")
+
+ error_msg = f"找不到试剂 '{reagent}'。可用试剂: {', '.join(available_reagents)}"
+ print(f"ADD_PROTOCOL: {error_msg}")
+ raise ValueError(error_msg)
+
+ # 启动搅拌
+ if stir:
+ stirrer_id = find_connected_stirrer(G, vessel)
+ if stirrer_id:
+ action_sequence.append({
+ "device_id": stirrer_id,
+ "action_name": "start_stir",
+ "action_kwargs": {
+ "vessel": vessel,
+ "stir_speed": stir_speed,
+ "purpose": f"准备添加液体 {reagent}"
+ }
+ })
+ # 等待搅拌稳定
+ action_sequence.append({
+ "action_name": "wait",
+ "action_kwargs": {"time": 5}
+ })
+
+ # 计算流速
+ if time > 0:
+ flowrate = volume / time
+ transfer_flowrate = flowrate
+ else:
+ flowrate = 1.0 if viscous else 2.5
+ transfer_flowrate = 0.3 if viscous else 0.5
+
+ # 🔧 调用 pump_protocol 时使用正确的参数
+ try:
+ pump_actions = generate_pump_protocol_with_rinsing(
+ G=G,
+ from_vessel=reagent_vessel,
+ to_vessel=vessel,
+ volume=volume,
+ amount=amount,
+ duration=time, # 🔧 使用 duration 而不是 time
+ viscous=viscous,
+ rinsing_solvent="",
+ rinsing_volume=0.0,
+ rinsing_repeats=0,
+ solid=False,
+ flowrate=flowrate,
+ transfer_flowrate=transfer_flowrate,
+ rate_spec=rate_spec,
+ event=event,
+ through="",
+ equiv=equiv,
+ ratio=ratio,
+ **kwargs
+ )
+ action_sequence.extend(pump_actions)
+ except Exception as e:
+ raise ValueError(f"液体转移失败: {str(e)}")
- print(f"ADD_PROTOCOL: 连续添加协议生成完成,共 {len(action_sequence)} 个动作")
+ print(f"ADD_PROTOCOL: 生成 {len(action_sequence)} 个动作")
return action_sequence
-# 便捷函数:常用添加方案
-def generate_organic_add_protocol(
- G: nx.DiGraph,
- vessel: str,
- organic_reagent: str,
- volume: float,
- stir_speed: float = 400.0
-) -> List[Dict[str, Any]]:
- """有机试剂添加:慢速、搅拌"""
+# 处理 wait 动作
+def process_wait_action(action_kwargs: Dict[str, Any]) -> Dict[str, Any]:
+ """处理等待动作"""
+ wait_time = action_kwargs.get('time', 1.0)
+ return {
+ "action_name": "wait",
+ "action_kwargs": {"time": wait_time},
+ "description": f"等待 {wait_time} 秒"
+ }
+
+
+# 便捷函数
+def add_liquid(G: nx.DiGraph, vessel: str, reagent: str, volume: float,
+ time: float = 0.0, rate_spec: str = "") -> List[Dict[str, Any]]:
+ """添加液体试剂"""
return generate_add_protocol(
- G, vessel, organic_reagent, volume, 0.0, "", 0.0,
- True, stir_speed, False, f"添加有机试剂 {organic_reagent}"
+ G, vessel, reagent,
+ volume=volume,
+ time=time,
+ rate_spec=rate_spec
)
-def generate_viscous_add_protocol(
- G: nx.DiGraph,
- vessel: str,
- viscous_reagent: str,
- volume: float,
- addition_time: float = 120.0
-) -> List[Dict[str, Any]]:
- """粘稠试剂添加:慢速、长时间"""
+def add_solid(G: nx.DiGraph, vessel: str, reagent: str, mass: float,
+ event: str = "") -> List[Dict[str, Any]]:
+ """添加固体试剂"""
return generate_add_protocol(
- G, vessel, viscous_reagent, volume, 0.0, "", addition_time,
- True, 250.0, True, f"缓慢添加粘稠试剂 {viscous_reagent}"
+ G, vessel, reagent,
+ mass=mass,
+ event=event
)
-def generate_solvent_add_protocol(
- G: nx.DiGraph,
- vessel: str,
- solvent: str,
- volume: float
-) -> List[Dict[str, Any]]:
- """溶剂添加:快速、无需特殊处理"""
+def add_solid_mol(G: nx.DiGraph, vessel: str, reagent: str, mol: str,
+ event: str = "") -> List[Dict[str, Any]]:
+ """按摩尔数添加固体试剂"""
return generate_add_protocol(
- G, vessel, solvent, volume, 0.0, "", 0.0,
- False, 300.0, False, f"添加溶剂 {solvent}"
+ G, vessel, reagent,
+ mol=mol,
+ event=event
)
-# 使用示例和测试函数
+def add_dropwise(G: nx.DiGraph, vessel: str, reagent: str, volume: float,
+ time: float = 0.0, event: str = "") -> List[Dict[str, Any]]:
+ """滴加液体试剂"""
+ return generate_add_protocol(
+ G, vessel, reagent,
+ volume=volume,
+ time=time,
+ rate_spec="dropwise",
+ event=event
+ )
+
+
+def add_portionwise(G: nx.DiGraph, vessel: str, reagent: str, mass: float,
+ time: float = 0.0, event: str = "") -> List[Dict[str, Any]]:
+ """分批添加固体试剂"""
+ return generate_add_protocol(
+ G, vessel, reagent,
+ mass=mass,
+ time=time,
+ rate_spec="portionwise",
+ event=event
+ )
+
+
+# 测试函数
def test_add_protocol():
- """测试添加协议的示例"""
- print("=== ADD PROTOCOL 智能匹配测试 ===")
- print("测试完成")
+ """测试添加协议"""
+ print("=== ADD PROTOCOL 修复版测试 ===")
+ print("✅ 已修复设备查找逻辑")
+ print("✅ 已添加固体试剂瓶支持")
+ print("✅ 已修复错误处理")
+ print("✅ 测试完成")
if __name__ == "__main__":
diff --git a/unilabos/devices/virtual/virtual_solid_dispenser.py b/unilabos/devices/virtual/virtual_solid_dispenser.py
new file mode 100644
index 0000000..48ad9f5
--- /dev/null
+++ b/unilabos/devices/virtual/virtual_solid_dispenser.py
@@ -0,0 +1,335 @@
+import asyncio
+import logging
+import re
+from typing import Dict, Any, Optional
+
+class VirtualSolidDispenser:
+ """
+ 虚拟固体粉末加样器 - 用于处理 Add Protocol 中的固体试剂添加
+
+ 特点:
+ - 高兼容性:缺少参数不报错
+ - 智能识别:自动查找固体试剂瓶
+ - 简单反馈:成功/失败 + 消息
+ """
+
+ def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
+ self.device_id = device_id or "virtual_solid_dispenser"
+ self.config = config or {}
+
+ # 设备参数
+ self.max_capacity = float(self.config.get('max_capacity', 100.0)) # 最大加样量 (g)
+ self.precision = float(self.config.get('precision', 0.001)) # 精度 (g)
+
+ # 状态变量
+ self._status = "Idle"
+ self._current_reagent = ""
+ self._dispensed_amount = 0.0
+ self._total_operations = 0
+
+ self.logger = logging.getLogger(f"VirtualSolidDispenser.{self.device_id}")
+
+ print(f"=== VirtualSolidDispenser {self.device_id} 创建成功! ===")
+ print(f"=== 最大容量: {self.max_capacity}g, 精度: {self.precision}g ===")
+
+ async def initialize(self) -> bool:
+ """初始化固体加样器"""
+ self.logger.info(f"初始化固体加样器 {self.device_id}")
+ self._status = "Ready"
+ self._current_reagent = ""
+ self._dispensed_amount = 0.0
+ return True
+
+ async def cleanup(self) -> bool:
+ """清理固体加样器"""
+ self.logger.info(f"清理固体加样器 {self.device_id}")
+ self._status = "Idle"
+ return True
+
+ def parse_mass_string(self, mass_str: str) -> float:
+ """
+ 解析质量字符串为数值 (g)
+
+ 支持格式: "2.9 g", "19.3g", "4.5 mg", "1.2 kg" 等
+ """
+ if not mass_str or not isinstance(mass_str, str):
+ return 0.0
+
+ # 移除空格并转小写
+ mass_clean = mass_str.strip().lower()
+
+ # 正则匹配数字和单位
+ pattern = r'(\d+(?:\.\d+)?)\s*([a-z]*)'
+ match = re.search(pattern, mass_clean)
+
+ if not match:
+ return 0.0
+
+ try:
+ value = float(match.group(1))
+ unit = match.group(2) or 'g' # 默认单位 g
+
+ # 单位转换为 g
+ unit_multipliers = {
+ 'g': 1.0,
+ 'gram': 1.0,
+ 'grams': 1.0,
+ 'mg': 0.001,
+ 'milligram': 0.001,
+ 'milligrams': 0.001,
+ 'kg': 1000.0,
+ 'kilogram': 1000.0,
+ 'kilograms': 1000.0,
+ 'μg': 0.000001,
+ 'ug': 0.000001,
+ 'microgram': 0.000001,
+ 'micrograms': 0.000001,
+ }
+
+ multiplier = unit_multipliers.get(unit, 1.0)
+ return value * multiplier
+
+ except (ValueError, TypeError):
+ self.logger.warning(f"无法解析质量字符串: {mass_str}")
+ return 0.0
+
+ def parse_mol_string(self, mol_str: str) -> float:
+ """
+ 解析摩尔数字符串为数值 (mol)
+
+ 支持格式: "0.12 mol", "16.2 mmol", "25.2mmol" 等
+ """
+ if not mol_str or not isinstance(mol_str, str):
+ return 0.0
+
+ # 移除空格并转小写
+ mol_clean = mol_str.strip().lower()
+
+ # 正则匹配数字和单位
+ pattern = r'(\d+(?:\.\d+)?)\s*(m?mol)'
+ match = re.search(pattern, mol_clean)
+
+ if not match:
+ return 0.0
+
+ try:
+ value = float(match.group(1))
+ unit = match.group(2)
+
+ # 单位转换为 mol
+ if unit == 'mmol':
+ return value * 0.001
+ else: # mol
+ return value
+
+ except (ValueError, TypeError):
+ self.logger.warning(f"无法解析摩尔数字符串: {mol_str}")
+ return 0.0
+
+ def find_solid_reagent_bottle(self, reagent_name: str) -> str:
+ """
+ 查找固体试剂瓶
+
+ 这是一个简化版本,实际使用时应该连接到系统的设备图
+ """
+ if not reagent_name:
+ return "unknown_solid_bottle"
+
+ # 可能的固体试剂瓶命名模式
+ possible_names = [
+ f"solid_bottle_{reagent_name}",
+ f"reagent_solid_{reagent_name}",
+ f"powder_{reagent_name}",
+ f"{reagent_name}_solid",
+ f"{reagent_name}_powder",
+ f"solid_{reagent_name}",
+ ]
+
+ # 这里简化处理,实际应该查询设备图
+ return possible_names[0]
+
+ async def add_solid(
+ self,
+ vessel: str,
+ reagent: str,
+ mass: str = "",
+ mol: str = "",
+ purpose: str = "",
+ **kwargs # 兼容额外参数
+ ) -> Dict[str, Any]:
+ """
+ 添加固体试剂的主要方法
+
+ Args:
+ vessel: 目标容器
+ reagent: 试剂名称
+ mass: 质量字符串 (如 "2.9 g")
+ mol: 摩尔数字符串 (如 "0.12 mol")
+ purpose: 添加目的
+ **kwargs: 其他兼容参数
+
+ Returns:
+ Dict: 操作结果
+ """
+ try:
+ self.logger.info(f"=== 开始固体加样操作 ===")
+ self.logger.info(f"目标容器: {vessel}")
+ self.logger.info(f"试剂: {reagent}")
+ self.logger.info(f"质量: {mass}")
+ self.logger.info(f"摩尔数: {mol}")
+ self.logger.info(f"目的: {purpose}")
+
+ # 参数验证 - 宽松处理
+ if not vessel:
+ vessel = "main_reactor" # 默认容器
+ self.logger.warning(f"未指定容器,使用默认容器: {vessel}")
+
+ if not reagent:
+ return {
+ "success": False,
+ "message": "错误: 必须指定试剂名称",
+ "return_info": "missing_reagent"
+ }
+
+ # 解析质量和摩尔数
+ mass_value = self.parse_mass_string(mass)
+ mol_value = self.parse_mol_string(mol)
+
+ self.logger.info(f"解析后 - 质量: {mass_value}g, 摩尔数: {mol_value}mol")
+
+ # 确定实际加样量
+ if mass_value > 0:
+ actual_amount = mass_value
+ amount_unit = "g"
+ self.logger.info(f"按质量加样: {actual_amount} {amount_unit}")
+ elif mol_value > 0:
+ # 简化处理:假设分子量为100 g/mol
+ assumed_mw = 100.0
+ actual_amount = mol_value * assumed_mw
+ amount_unit = "g (from mol)"
+ self.logger.info(f"按摩尔数加样: {mol_value} mol → {actual_amount} g (假设分子量 {assumed_mw})")
+ else:
+ # 没有指定量,使用默认值
+ actual_amount = 1.0
+ amount_unit = "g (default)"
+ self.logger.warning(f"未指定质量或摩尔数,使用默认值: {actual_amount} {amount_unit}")
+
+ # 检查容量限制
+ if actual_amount > self.max_capacity:
+ return {
+ "success": False,
+ "message": f"错误: 请求量 {actual_amount}g 超过最大容量 {self.max_capacity}g",
+ "return_info": "exceeds_capacity"
+ }
+
+ # 查找试剂瓶
+ reagent_bottle = self.find_solid_reagent_bottle(reagent)
+ self.logger.info(f"使用试剂瓶: {reagent_bottle}")
+
+ # 模拟加样过程
+ self._status = "Dispensing"
+ self._current_reagent = reagent
+
+ # 计算操作时间 (基于质量)
+ operation_time = max(0.5, actual_amount * 0.1) # 每克0.1秒,最少0.5秒
+
+ self.logger.info(f"开始加样,预计时间: {operation_time:.1f}秒")
+ await asyncio.sleep(operation_time)
+
+ # 更新状态
+ self._dispensed_amount = actual_amount
+ self._total_operations += 1
+ self._status = "Ready"
+
+ # 成功结果
+ success_message = f"成功添加 {reagent} {actual_amount:.3f} {amount_unit} 到 {vessel}"
+
+ self.logger.info(f"=== 固体加样完成 ===")
+ self.logger.info(success_message)
+
+ return {
+ "success": True,
+ "message": success_message,
+ "return_info": f"dispensed_{actual_amount:.3f}g"
+ }
+
+ except Exception as e:
+ error_message = f"固体加样失败: {str(e)}"
+ self.logger.error(error_message)
+ self._status = "Error"
+
+ return {
+ "success": False,
+ "message": error_message,
+ "return_info": "operation_failed"
+ }
+
+ # 状态属性
+ @property
+ def status(self) -> str:
+ return self._status
+
+ @property
+ def current_reagent(self) -> str:
+ return self._current_reagent
+
+ @property
+ def dispensed_amount(self) -> float:
+ return self._dispensed_amount
+
+ @property
+ def total_operations(self) -> int:
+ return self._total_operations
+
+ def get_device_info(self) -> Dict[str, Any]:
+ """获取设备状态信息"""
+ return {
+ "device_id": self.device_id,
+ "status": self._status,
+ "current_reagent": self._current_reagent,
+ "last_dispensed_amount": self._dispensed_amount,
+ "total_operations": self._total_operations,
+ "max_capacity": self.max_capacity,
+ "precision": self.precision
+ }
+
+ def __str__(self):
+ return f"VirtualSolidDispenser({self.device_id}: {self._status}, 最后加样 {self._dispensed_amount:.3f}g)"
+
+
+# 测试函数
+async def test_solid_dispenser():
+ """测试固体加样器"""
+ print("=== 固体加样器测试 ===")
+
+ dispenser = VirtualSolidDispenser("test_dispenser")
+ await dispenser.initialize()
+
+ # 测试1: 按质量加样
+ result1 = await dispenser.add_solid(
+ vessel="main_reactor",
+ reagent="magnesium",
+ mass="2.9 g"
+ )
+ print(f"测试1结果: {result1}")
+
+ # 测试2: 按摩尔数加样
+ result2 = await dispenser.add_solid(
+ vessel="main_reactor",
+ reagent="sodium_nitrite",
+ mol="0.28 mol"
+ )
+ print(f"测试2结果: {result2}")
+
+ # 测试3: 缺少参数
+ result3 = await dispenser.add_solid(
+ reagent="test_compound"
+ )
+ print(f"测试3结果: {result3}")
+
+ print(f"设备信息: {dispenser.get_device_info()}")
+ print("=== 测试完成 ===")
+
+
+if __name__ == "__main__":
+ asyncio.run(test_solid_dispenser())
\ No newline at end of file
diff --git a/unilabos/registry/devices/virtual_device.yaml b/unilabos/registry/devices/virtual_device.yaml
index 8894314..387f986 100644
--- a/unilabos/registry/devices/virtual_device.yaml
+++ b/unilabos/registry/devices/virtual_device.yaml
@@ -2956,217 +2956,126 @@ virtual_solenoid_valve:
- goal
title: open参数
type: object
- type: UniLabJsonCommandAsync
- auto-reset:
- feedback: {}
- goal: {}
- goal_default: {}
- handles: []
- result: {}
- schema:
- description: reset的参数schema
- properties:
- feedback: {}
- goal:
- properties: {}
- required: []
- type: object
- result: {}
- required:
- - goal
- title: reset参数
- type: object
- type: UniLabJsonCommandAsync
- auto-set_state:
+ type: UniLabJsonCommand
+ auto-set_status:
feedback: {}
goal: {}
goal_default:
- command: null
+ string: null
handles: []
result: {}
schema:
- description: set_state的参数schema
+ description: set_status的参数schema
properties:
feedback: {}
goal:
properties:
- command:
+ string:
type: string
required:
- - command
+ - string
type: object
result: {}
required:
- goal
- title: set_state参数
- type: object
- type: UniLabJsonCommandAsync
- auto-set_valve_position:
- feedback: {}
- goal: {}
- goal_default:
- command: null
- handles: []
- result: {}
- schema:
- description: set_valve_position的参数schema
- properties:
- feedback: {}
- goal:
- properties:
- command:
- type: string
- required: []
- type: object
- result: {}
- required:
- - goal
- title: set_valve_position参数
- type: object
- type: UniLabJsonCommandAsync
- auto-toggle:
- feedback: {}
- goal: {}
- goal_default: {}
- handles: []
- result: {}
- schema:
- description: toggle的参数schema
- properties:
- feedback: {}
- goal:
- properties: {}
- required: []
- type: object
- result: {}
- required:
- - goal
- title: toggle参数
+ title: set_status参数
type: object
type: UniLabJsonCommand
close:
feedback: {}
- goal:
- command: CLOSED
- goal_default:
- command: ''
+ goal: {}
+ goal_default: {}
handles: []
- result:
- success: success
+ result: {}
schema:
- description: ROS Action SendCmd 的 JSON Schema
+ description: ROS Action EmptyIn 的 JSON Schema
properties:
feedback:
description: Action 反馈 - 执行过程中从服务器发送到客户端
- properties:
- status:
- type: string
- required:
- - status
- title: SendCmd_Feedback
+ properties: {}
+ required: []
+ title: EmptyIn_Feedback
type: object
goal:
description: Action 目标 - 从客户端发送到服务器
- properties:
- command:
- type: string
- required:
- - command
- title: SendCmd_Goal
+ properties: {}
+ required: []
+ title: EmptyIn_Goal
type: object
result:
description: Action 结果 - 完成后从服务器发送到客户端
properties:
return_info:
type: string
- success:
- type: boolean
required:
- return_info
- - success
- title: SendCmd_Result
+ title: EmptyIn_Result
type: object
required:
- goal
- title: SendCmd
+ title: EmptyIn
type: object
- type: SendCmd
+ type: EmptyIn
open:
feedback: {}
- goal:
- command: OPEN
- goal_default:
- command: ''
+ goal: {}
+ goal_default: {}
handles: []
- result:
- success: success
+ result: {}
schema:
- description: ROS Action SendCmd 的 JSON Schema
+ description: ROS Action EmptyIn 的 JSON Schema
properties:
feedback:
description: Action 反馈 - 执行过程中从服务器发送到客户端
- properties:
- status:
- type: string
- required:
- - status
- title: SendCmd_Feedback
+ properties: {}
+ required: []
+ title: EmptyIn_Feedback
type: object
goal:
description: Action 目标 - 从客户端发送到服务器
- properties:
- command:
- type: string
- required:
- - command
- title: SendCmd_Goal
+ properties: {}
+ required: []
+ title: EmptyIn_Goal
type: object
result:
description: Action 结果 - 完成后从服务器发送到客户端
properties:
return_info:
type: string
- success:
- type: boolean
required:
- return_info
- - success
- title: SendCmd_Result
+ title: EmptyIn_Result
type: object
required:
- goal
- title: SendCmd
+ title: EmptyIn
type: object
- type: SendCmd
- set_state:
+ type: EmptyIn
+ set_status:
feedback: {}
goal:
- command: command
+ string: string
goal_default:
- command: ''
+ string: ''
handles: []
- result:
- success: success
+ result: {}
schema:
- description: ROS Action SendCmd 的 JSON Schema
+ description: ROS Action StrSingleInput 的 JSON Schema
properties:
feedback:
description: Action 反馈 - 执行过程中从服务器发送到客户端
- properties:
- status:
- type: string
- required:
- - status
- title: SendCmd_Feedback
+ properties: {}
+ required: []
+ title: StrSingleInput_Feedback
type: object
goal:
description: Action 目标 - 从客户端发送到服务器
properties:
- command:
+ string:
type: string
required:
- - command
- title: SendCmd_Goal
+ - string
+ title: StrSingleInput_Goal
type: object
result:
description: Action 结果 - 完成后从服务器发送到客户端
@@ -3178,60 +3087,13 @@ virtual_solenoid_valve:
required:
- return_info
- success
- title: SendCmd_Result
+ title: StrSingleInput_Result
type: object
required:
- goal
- title: SendCmd
+ title: StrSingleInput
type: object
- type: SendCmd
- set_valve_position:
- feedback: {}
- goal:
- command: command
- goal_default:
- command: ''
- handles: []
- result:
- success: success
- schema:
- description: ROS Action SendCmd 的 JSON Schema
- properties:
- feedback:
- description: Action 反馈 - 执行过程中从服务器发送到客户端
- properties:
- status:
- type: string
- required:
- - status
- title: SendCmd_Feedback
- type: object
- goal:
- description: Action 目标 - 从客户端发送到服务器
- properties:
- command:
- type: string
- required:
- - command
- title: SendCmd_Goal
- type: object
- result:
- description: Action 结果 - 完成后从服务器发送到客户端
- properties:
- return_info:
- type: string
- success:
- type: boolean
- required:
- - return_info
- - success
- title: SendCmd_Result
- type: object
- required:
- - goal
- title: SendCmd
- type: object
- type: SendCmd
+ type: StrSingleInput
module: unilabos.devices.virtual.virtual_solenoid_valve:VirtualSolenoidValve
status_types:
is_open: bool
@@ -3321,6 +3183,7 @@ virtual_stirrer:
properties:
feedback: {}
goal:
+
properties: {}
required: []
type: object
@@ -4504,3 +4367,125 @@ virtual_vacuum_pump:
required:
- status
type: object
+virtual_solid_dispenser:
+ class:
+ action_value_mappings:
+ auto-cleanup:
+ feedback: {}
+ goal: {}
+ goal_default: {}
+ handles: []
+ result: {}
+ schema:
+ description: cleanup的参数schema
+ type: object
+ type: UniLabJsonCommandAsync
+ auto-initialize:
+ feedback: {}
+ goal: {}
+ goal_default: {}
+ handles: []
+ result: {}
+ schema:
+ description: initialize的参数schema
+ type: object
+ type: UniLabJsonCommandAsync
+ auto-add_solid:
+ feedback: {}
+ goal: {}
+ goal_default:
+ vessel: main_reactor
+ reagent: ''
+ mass: ''
+ mol: ''
+ purpose: ''
+ handles: []
+ result: {}
+ schema:
+ description: add_solid的参数schema
+ type: object
+ type: UniLabJsonCommandAsync
+ add_solid:
+ feedback:
+ current_status: status
+ progress: progress
+ goal:
+ vessel: vessel
+ reagent: reagent
+ mass: mass
+ mol: mol
+ purpose: purpose
+ goal_default:
+ vessel: 'main_reactor'
+ reagent: ''
+ mass: ''
+ mol: ''
+ purpose: ''
+ handles: []
+ result:
+ message: message
+ success: success
+ return_info: return_info
+ schema:
+ description: ROS Action AddSolid 的 JSON Schema
+ type: object
+ type: AddSolid
+ module: unilabos.devices.virtual.virtual_solid_dispenser:VirtualSolidDispenser
+ status_types:
+ status: str
+ current_reagent: str
+ dispensed_amount: float
+ total_operations: int
+ type: python
+ description: Virtual Solid Dispenser for Add Protocol Solid Reagents
+ handles:
+ - data_key: SolidIn
+ data_source: handle
+ data_type: resource
+ description: 固体试剂进料口
+ handler_key: SolidIn
+ io_type: target
+ label: SolidIn
+ side: WEST
+ - data_key: SolidOut
+ data_source: executor
+ data_type: resource
+ description: 固体试剂出料口
+ handler_key: SolidOut
+ io_type: source
+ label: SolidOut
+ side: EAST
+ icon: ''
+ init_param_schema:
+ config:
+ properties:
+ config:
+ type: object
+ device_id:
+ type: string
+ max_capacity:
+ type: number
+ default: 100.0
+ description: 最大加样容量 (g)
+ precision:
+ type: number
+ default: 0.001
+ description: 加样精度 (g)
+ required: []
+ type: object
+ data:
+ properties:
+ status:
+ type: string
+ current_reagent:
+ type: string
+ dispensed_amount:
+ type: number
+ total_operations:
+ type: integer
+ required:
+ - status
+ - current_reagent
+ - dispensed_amount
+ - total_operations
+ type: object
diff --git a/unilabos_msgs/action/AddSolid.action b/unilabos_msgs/action/AddSolid.action
new file mode 100644
index 0000000..8812441
--- /dev/null
+++ b/unilabos_msgs/action/AddSolid.action
@@ -0,0 +1,15 @@
+# Goal - 固体加样操作的目标参数
+string vessel # 目标容器(必需)
+string reagent # 试剂名称(必需)
+string mass # 质量字符串(如 "2.9 g",可选)
+string mol # 摩尔数字符串(如 "0.12 mol",可选)
+string purpose # 添加目的(可选)
+---
+# Result - 操作结果
+bool success # 操作是否成功
+string message # 结果消息
+string return_info # 返回信息
+---
+# Feedback - 实时反馈
+string current_status # 当前状态描述
+float64 progress # 进度百分比 (0-100)
\ No newline at end of file