@@ -2,17 +2,42 @@ import numpy as np
import networkx as nx
def is_integrated_pump ( node_name ) :
return " pump " in node_name and " valve " in node_name
def find_connected_pump ( G , valve_node ) :
for neighbor in G . neighbors ( valve_node ) :
if " pump " in G . nodes [ neighbor ] [ " class " ] :
return neighbor
raise ValueError ( f " 未找到与阀 { valve_node } 唯一相连的泵节点 " )
def build_pump_valve_maps ( G , pump_backbone ) :
pumps_from_node = { }
valve_from_node = { }
for node in pump_backbone :
if is_integrated_pump ( node ) :
pumps_from_node [ node ] = node
valve_from_node [ node ] = node
else :
pump_node = find_connected_pump ( G , node )
pumps_from_node [ node ] = pump_node
valve_from_node [ node ] = node
return pumps_from_node , valve_from_node
def generate_pump_protocol (
G : nx . DiGraph ,
from_vessel : str ,
to_vessel : str ,
volume : float ,
flowrate : float = 0.5 ,
transfer_flowrate : float = 0 ,
G : nx . DiGraph ,
from_vessel : str ,
to_vessel : str ,
volume : float ,
flowrate : float = 0.5 ,
transfer_flowrate : float = 0 ,
) - > list [ dict ] :
"""
生成泵操作的动作序列。
:param G: 有向图, 节点为容器和注射泵, 边为流体管道, A→B边的属性为管道接A端的阀门位置
:param from_vessel: 容器A
:param to_vessel: 容器B
@@ -21,194 +46,137 @@ def generate_pump_protocol(
:param transfer_flowrate: 泵骨架中转移流速(若不指定,默认与注入流速相同)
:return: 泵操作的动作序列
"""
# 生成泵操作的动作序列
pump_action_sequence = [ ]
# 检查节点是否存在
if from_vessel not in G . nodes :
print ( f " Warning: Source vessel ' { from_vessel } ' not found in graph. Skipping. " )
return [ ]
if to_vessel not in G . nodes :
print ( f " Warning: Target vessel ' { to_vessel } ' not found in graph. Skipping. " )
return [ ]
# 检查是否存在路径
try :
shortest_path = nx . shortest_path ( G , source = from_vessel , target = to_vessel )
except nx . NetworkXNoPath :
print ( f " Warning: No path from ' { from_vessel } ' to ' { to_vessel } ' . Skipping. " )
return [ ]
except nx . NodeNotFound as e :
print ( f " Warning: Node not found: { e } . Skipping. " )
return [ ]
print ( f " Shortest path: { shortest_path } " )
nodes = G . nodes ( data = True )
# 从from_vessel到to_vessel的最短路径
shortest_path = nx . shortest_path ( G , source = from_vessel , target = to_vessel )
print ( shortest_path )
pump_backbone = shortest_path
if not from_vessel . startswith ( " pump " ) :
pump_backbone = pump_backbone [ 1 : ]
if not to_vessel . startswith ( " pump " ) :
pump_backbone = pump_backbone [ : - 1 ]
print ( f " Pump backbone: { pump_backbone } " )
# 修复: 检查pump_backbone是否为空
if not pump_backbone :
print ( f " Warning: No pumps found in path from ' { from_vessel } ' to ' { to_vessel } ' . Skipping. " )
return [ ]
if transfer_flowrate == 0 :
transfer_flowrate = flowrate
# 修复:正确访问节点数据
pump_max_volumes = [ ]
for pump in pump_backbone :
# 直接使用 G.nodes[pump] 来访问节点数据
pump_data = G . nodes [ pump ] if pump in G . nodes else { }
# 尝试多种可能的键名,并提供默认值
max_vol = pump_data . get ( ' max_volume ' ) or pump_data . get ( ' max_vol ' ) or pump_data . get ( ' volume ' )
if max_vol is None :
# 如果是设备节点, 尝试从config中获取
config = pump_data . get ( ' config ' , { } )
max_vol = config . get ( ' max_volume ' , 25.0 )
pump_max_volumes . append ( float ( max_vol ) )
if pump_max_volumes :
min_transfer_volume = min ( pump_max_volumes )
else :
min_transfer_volume = 25.0 # 默认值
pumps_from_node , valve_from_node = build_pump_valve_maps ( G , pump_backbone )
min_transfer_volume = min ( [ nodes [ pumps_from_node [ node ] ] [ " config " ] [ " max_volume " ] for node in pump_backbone ] )
repeats = int ( np . ceil ( volume / min_transfer_volume ) )
if repeats > 1 and ( from_vessel . startswith ( " pump " ) or to_vessel . startswith ( " pump " ) ) :
raise ValueError ( " Cannot transfer volume larger than min_transfer_volume between two pumps. " )
volume_left = volume
# 生成泵操作的动作序列
for i in range ( repeats ) :
# 单泵依次执行阀指令、活塞指令,将液体吸入与之相连的第一台泵
if not from_vessel . startswith ( " pump " ) and pump_backbone :
# 修复:添加边缘数据检查
edge_data = G . get_edge_data ( pump_backbone [ 0 ] , from_vessel )
if edge_data and " port " in edge_data :
pump_action_sequence . extend ( [
{
" device_i d" : pump_backbone [ 0 ] ,
" action_name " : " set_valve_position " ,
" action_kwargs " : {
" command " : edge_data [ " port " ] [ pump_backbone [ 0 ] ]
}
} ,
{
" device_id " : pump_backbone [ 0 ] ,
" action_name " : " set_position " ,
" action_kwargs " : {
" position " : float ( min ( volume_left , min_transfer_volume ) ) ,
" max_velocity " : transfer_flowrate
}
if not from_vessel . startswith ( " pump " ) :
pump_action_sequence . extend ( [
{
" device_id " : valve_from_node [ pump_backbone [ 0 ] ] ,
" action_name " : " set_valve_position " ,
" action_kwargs " : {
" comman d" : G . get_edge_data ( pump_backbone[ 0 ] , from_vessel ) [ " port " ] [ pump_backbone [ 0 ] ]
}
] )
pump_action_sequence . append ( { " action_name " : " wait " , " action_kwargs " : { " time " : 5 } } )
else :
print ( f " Warning: No edge data found between { pump_backbone [ 0 ] } and { from_vessel } " )
# 修复: 检查pump_backbone长度, 避免多泵操作时出错
if len ( pump_backbone ) > 1 :
for pumpA , pumpB in zip ( pump_backbone [ : - 1 ] , pump_backbone [ 1 : ] ) :
# 相邻两泵同时切换阀门至连通位置
edge_AB = G . get_edge_data ( pumpA , pumpB )
edge_BA = G . get_edge_data ( pumpB , pumpA )
if edge_AB and " port " in edge_AB and edge_BA and " port " in edge_BA :
pump_action_sequence . append ( [
{
" device_id " : pumpA ,
" action_name " : " set_valve_position " ,
" action_kwargs " : {
" command " : edge_AB [ " port " ] [ pumpA ]
}
} ,
{
" device_id " : pumpB ,
" action_name " : " set_valve_position " ,
" action_kwargs " : {
" command " : edge_BA [ " port " ] [ pumpB ] ,
}
} ,
{
" device_id " : pumps_from_node [ pump_backbone [ 0 ] ] ,
" action_name " : " set_position " ,
" action_kwargs " : {
" position " : float ( min ( volume_left , min_transfer_volume ) ) ,
" max_velocity " : transfer_flowrate
}
] )
# 相邻两泵液体转移: 泵A排出液体, 泵B吸入液体
pump_action_sequence . append ( [
{
" device_id " : pumpA ,
" action_name " : " set_position " ,
" action_kwargs " : {
" position " : 0.0 ,
" max_velocity " : transfer_flowrate
}
} ,
{
" device_id " : pumpB ,
" action_name " : " set_position " ,
" action_kwargs " : {
" position " : float ( min ( volume_left , min_transfer_volume ) ) ,
" max_velocity " : transfer_flowrate
}
}
] )
pump_action_sequence . append ( { " action_name " : " wait " , " action_kwargs " : { " time " : 5 } } )
for nodeA , nodeB in zip ( pump_backbone [ : - 1 ] , pump_backbone [ 1 : ] ) :
# 相邻两泵同时切换阀门至连通位置
pump_action_sequence . append ( [
{
" device_id " : valve_from_node [ nodeA ] ,
" action_name " : " set_valve_position " ,
" action_kwargs " : {
" command " : G . get_edge_data ( nodeA , nodeB ) [ " port " ] [ nodeA ]
}
] )
pump_action_sequence . append ( { " action_name " : " wait " , " action_kwargs " : { " time " : 5 } } )
else :
print ( f " Warning: No edge data found between { pumpA } and { pumpB } " )
if not to_vessel . startswith ( " pump " ) and pump_backbone :
} ,
{
" device_id " : valve_from_node [ nodeB ] ,
" action_name " : " set_valve_position " ,
" action_kwargs " : {
" command " : G . get_edge_data ( nodeB , nodeA ) [ " port " ] [ nodeB ] ,
}
}
] )
# 相邻两泵液体转移: 泵A排出液体, 泵B吸入液体
pump_action_sequence . append ( [
{
" device_id " : pumps_from_node [ nodeA ] ,
" action_name " : " set_position " ,
" action_kwargs " : {
" position " : 0.0 ,
" max_velocity " : transfer_flowrate
}
} ,
{
" device_id " : pumps_from_node [ nodeB ] ,
" action_name " : " set_position " ,
" action_kwargs " : {
" position " : float ( min ( volume_left , min_transfer_volume ) ) ,
" max_velocity " : transfer_flowrate
}
}
] )
pump_action_sequence . append ( { " action_name " : " wait " , " action_kwargs " : { " time " : 5 } } )
if not to_vessel . startswith ( " pump " ) :
# 单泵依次执行阀指令、活塞指令, 将最后一台泵液体缓慢加入容器B
edge_data = G . get_edge_data ( pump_b ackbone [ - 1 ] , to_vessel )
if edge_data and " port " in edge_data :
pump_action_sequence . extend ( [
{
" device_id " : pump_backbone [ - 1 ] ,
" action_name " : " set_valve_position " ,
" action_kwargs " : {
" command " : edge_data [ " port " ] [ pump_backbone [ - 1 ] ]
}
} ,
{
" device_id " : pump_backbone [ - 1 ] ,
" action_name " : " set_position " ,
" action_kwargs " : {
" position " : 0.0 ,
" max_velocity " : flowrate
}
pump_action_sequence . extend ( [
{
" device_id " : valve_from_node [ pump_b ackbone [ - 1 ] ] ,
" action_name " : " set_valve_position " ,
" action_kwargs " : {
" command " : G . get_edge_data ( pump_backbone [ - 1 ] , to_vessel ) [ " port " ] [ pump_backbone [ - 1 ] ]
}
] )
pump_action_sequence . append ( { " action_name " : " wait " , " action_kwargs " : { " time " : 5 } } )
else :
print ( f " Warning: No edge data found between { pump_backbone [ - 1 ] } and { to_vessel } " )
} ,
{
" device_id " : pumps_from_node [ pump_backbone [ - 1 ] ] ,
" action_name " : " set_position " ,
" action_kwargs " : {
" position " : 0.0 ,
" max_velocity " : flowrate
}
}
] )
pump_action_sequence . append ( { " action_name " : " wait " , " action_kwargs " : { " time " : 5 } } )
volume_left - = min_transfer_volume
return pump_action_sequence
# Pump protocol compilation
def generate_pump_protocol_with_rinsing (
G : nx . DiGraph ,
from_vessel : str ,
to_vessel : str ,
volume : float ,
amount : str = " " ,
time : float = 0 ,
viscous : bool = False ,
rinsing_solvent : str = " air " ,
rinsing_volume : float = 5.0 ,
rinsing_repeats : int = 2 ,
solid : bool = False ,
flowrate : float = 2.5 ,
transfer_flowrate : float = 0.5 ,
G : nx . DiGraph ,
from_vessel : str ,
to_vessel : str ,
volume : float ,
amount : str = " " ,
time : float = 0 ,
viscous : bool = False ,
rinsing_solvent : str = " air " ,
rinsing_volume : float = 5.0 ,
rinsing_repeats : int = 2 ,
solid : bool = False ,
flowrate : float = 2.5 ,
transfer_flowrate : float = 0.5 ,
) - > list [ dict ] :
"""
Generates a pump protocol for transferring a specified volume between vessels, including rinsing steps with a chosen solvent. This function constructs a sequence of pump actions based on the provided parameters and the shortest path in a directed graph.
Args:
G (nx.DiGraph): The directed graph representing the vessels and connections. 有向图, 节点为容器和注射泵, 边为流体管道, A→B边的属性为管道接A端的阀门位置
from_vessel (str): The name of the vessel to transfer from.
@@ -223,96 +191,64 @@ def generate_pump_protocol_with_rinsing(
solid (bool, optional): Indicates if the transfer involves a solid (default is False).
flowrate (float, optional): The flow rate for the transfer (default is 2.5). 最终注入容器B时的流速
transfer_flowrate (float, optional): The flow rate for the transfer action (default is 0.5). 泵骨架中转移流速(若不指定,默认与注入流速相同)
Returns:
list[dict]: A sequence of pump actions to be executed for the transfer and rinsing process. 泵操作的动作序列.
Raises:
AssertionError: If the number of rinsing solvents does not match the number of rinsing repeats.
Examples:
pump_protocol = generate_pump_protocol_with_rinsing(G, " vessel_A " , " vessel_B " , 0.1, rinsing_solvent= " water " )
"""
# 修复:使用实际存在的节点名称
air _vessel = " flask_air " # 这个在你的配置中存在
# 寻找合适的废料容器,如果没有找到则使用空的容器作为替代
waste_vessel = None
available_vessels = [ node for node in G . nodes if node . startswith ( " flask_ " ) and node != air_vessel ]
if available_vessels :
# 使用第一个可用的容器作为废料容器
waste_vessel = available_vessels [ 0 ]
print ( f " Using { waste_vessel } as waste vessel " )
else :
waste_vessel = " flask_1 " # 备用选择
# 修复:添加路径检查
try :
shortest_path = nx . shortest_path ( G , source = from_vessel , target = to_vessel )
pump_backbone = shortest_path [ 1 : - 1 ]
except ( nx . NetworkXNoPath , nx . NodeNotFound ) as e :
print ( f " Warning: Cannot find path from { from_vessel } to { to_vessel } : { e } " )
return [ ]
# 修复:正确访问节点数据
pump_max_volumes = [ ]
for pump in pump_backbone :
# 直接使用 G.nodes[pump] 来访问节点数据
pump_data = G . nodes [ pump ] if pump in G . nodes else { }
# 尝试多种可能的键名,并提供默认值
max_vol = pump_data . get ( ' max_volume ' ) or pump_data . get ( ' max_vol ' ) or pump_data . get ( ' volume ' )
if max_vol is None :
# 如果是设备节点, 尝试从config中获取
config = pump_data . get ( ' config ' , { } )
max_vol = config . get ( ' max_volume ' , 25.0 )
pump_max_volumes . append ( float ( max_vol ) )
if pump_max_volumes :
min_transfer_volume = float ( min ( pump_max_volumes ) )
else :
min_transfer_volume = 25.0 # 默认值
air_vessel = " flask_air "
waste _vessel = f " waste_workup "
shortest_path = nx . shortest_path ( G , source = from_vessel , target = to_vessel )
pump_backbone = shortest_path [ 1 : - 1 ]
nodes = G . nodes ( data = True )
pumps_from_node , valve_from_node = build_pump_valve_maps ( G , pump_backbone )
min_transfer_volume = min ( [ nodes [ pumps_from_node [ node ] ] [ " config " ] [ " max_volume " ] for node in pump_backbone ] )
if time != 0 :
flowrate = transfer_flowrate = volume / time
pump_action_sequence = generate_pump_protocol ( G , from_vessel , to_vessel , float ( volume ) , flowrate , transfer_flowrate )
# 修复:只在需要清洗且相关节点存在时才执行清洗步骤
if rinsing_solvent != " air " and pump_backbone :
if rinsing_solvent != " air " and rinsing_solvent != " " :
if " , " in rinsing_solvent :
rinsing_solvents = rinsing_solvent . split ( " , " )
assert len ( rinsing_solvents ) == rinsing_repeats , " Number of rinsing solvents must match number of rinsing repeats. "
assert len (
rinsing_solvents ) == rinsing_repeats , " Number of rinsing solvents must match number of rinsing repeats. "
else :
rinsing_solvents = [ rinsing_solvent ] * rinsing_repeats
for rinsing_solvent in rinsing_solvents :
solvent_vessel = f " flask_ { rinsing_solvent } "
# 检查溶剂容器是否存在
if solvent_vessel not in G . nodes :
print ( f " Warning: Solvent vessel ' { solvent_vessel } ' not found in graph. Skipping rinsing step. " )
continue
# 清洗泵 - 只有当所有必需的节点都存在且pump_backbone不为空时才执行
if pump_backbone and len ( pump_backbone ) > 0 and waste_vessel in G . nodes :
# 清洗泵
pump_action_sequence . extend (
generate_pump_protocol ( G , solvent_vessel , pump_backbone [ 0 ] , min_transfer_volume , flowrate ,
transfer_flowrate ) +
generate_pump_protocol ( G , pump_backbone [ 0 ] , pump_backbone [ - 1 ] , min_transfer_volume , flowrate ,
transfer_flowrate ) +
generate_pump_protocol ( G , pump_backbone [ - 1 ] , waste_vessel , min_transfer_volume , flowrate ,
transfer_flowrate )
)
# 如果转移的是溶液,第一种冲洗溶剂请选用溶液的溶剂,稀释泵内、转移管道内的溶液。后续冲洗溶剂不需要此操作。
if rinsing_solvent == rinsing_solvents [ 0 ] :
pump_action_sequence . extend (
generate_pump_protocol ( G , solvent_vessel , pump_backbone [ 0 ] , min_transfer _volume, flowrate , transfer_flowrate ) +
generate_pump_protocol ( G , pump_backbone [ 0 ] , pump_backbone [ - 1 ] , min_transfer_volume , flowrate , transfer_flowrate ) +
generate_pump_protocol ( G , pump_backbone [ - 1 ] , waste _vessel, min_transfer _volume, flowrate , transfer_flowrate )
)
# 如果转移的是溶液,第一种冲洗溶剂请选用溶液的溶剂,稀释泵内、转移管道内的溶液。后续冲洗溶剂不需要此操作。
if rinsing_solvent == rinsing_s olvents [ 0 ] :
pump_action_sequence . extend ( generate_pump_protocol ( G , solvent_vessel , from_vessel , rinsing_volume , flowrate , transfer_flowrate ) )
pump_action_sequence . extend ( generate_pump_protocol ( G , solvent_vessel , to_vessel , rinsing_volume , flowrate , transfer_flowrate ) )
pump_action_sequence . extend ( generate_pump_protocol ( G , air_vessel , solvent_vessel , rinsing_volume , flowrate , transfer_flowrate ) )
pump_action_sequence . extend ( generate_pump_protocol ( G , air_vessel , waste _vessel, rinsing_volume , flowrate , transfer_flowrate ) )
# 最后的空气清洗 - 只有当节点存在时才执行
if air_vessel in G . nodes :
pump_action_sequence . extend ( generate_pump_protocol ( G , air_vessel , from_vessel , rinsing_volume , flowrate , transfer_flowrate ) * 2 )
pump_action_sequence . extend ( generate_pump_protocol ( G , air_vessel , to_vessel , rinsing_volume , flowrate , transfer_flowrate ) * 2 )
generate_pump_protocol ( G , solvent_vessel , from_vessel , rinsing _volume, flowrate , transfer_flowrate ) )
pump_action_sequence . extend (
generate_pump_protocol ( G , solvent_vessel , to _vessel, rinsing _volume, flowrate , transfer_flowrate ) )
pump_action_sequence . extend (
generate_pump_protocol ( G , air_vessel , solvent_vessel , rinsing_volume , flowrate , transfer_flowrate ) )
pump_action_sequence . extend (
generate_pump_protocol ( G , air_vessel , waste_vessel , rinsing_v olume , flowrate , transfer_flowrate ) )
if rinsing_solvent != " " :
pump_action_sequence . extend (
generate_pump_protocol ( G , air_vessel , from_vessel , rinsing_volume , flowrate , transfer_flowrate ) * 2 )
pump_action_sequence . extend (
generate_pump_protocol ( G , air_vessel , to _vessel, rinsing_volume , flowrate , transfer_flowrate ) * 2 )
return pump_action_sequence
# End Protocols