action to resource & 0.9.12

This commit is contained in:
KCFeng425
2025-07-17 04:10:15 +08:00
parent f90be18926
commit 6b7564b9f9
53 changed files with 2526 additions and 761 deletions

View File

@@ -138,9 +138,50 @@ def validate_and_fix_params(stir_time: float, stir_speed: float, settling_time:
return stir_time, stir_speed, settling_time
def extract_vessel_id(vessel: Union[str, dict]) -> str:
"""
从vessel参数中提取vessel_id
Args:
vessel: vessel字典或vessel_id字符串
Returns:
str: vessel_id
"""
if isinstance(vessel, dict):
vessel_id = vessel.get("id", "")
debug_print(f"🔧 从vessel字典提取ID: {vessel_id}")
return vessel_id
elif isinstance(vessel, str):
debug_print(f"🔧 vessel参数为字符串: {vessel}")
return vessel
else:
debug_print(f"⚠️ 无效的vessel参数类型: {type(vessel)}")
return ""
def get_vessel_display_info(vessel: Union[str, dict]) -> str:
"""
获取容器的显示信息(用于日志)
Args:
vessel: vessel字典或vessel_id字符串
Returns:
str: 显示信息
"""
if isinstance(vessel, dict):
vessel_id = vessel.get("id", "unknown")
vessel_name = vessel.get("name", "")
if vessel_name:
return f"{vessel_id} ({vessel_name})"
else:
return vessel_id
else:
return str(vessel)
def generate_stir_protocol(
G: nx.DiGraph,
vessel: str,
vessel: Union[str, dict], # 🔧 修改支持vessel字典
time: Union[str, float, int] = "300",
stir_time: Union[str, float, int] = "0",
time_spec: str = "",
@@ -150,13 +191,31 @@ def generate_stir_protocol(
**kwargs
) -> List[Dict[str, Any]]:
"""
生成搅拌操作的协议序列(精简版)
生成搅拌操作的协议序列 - 支持vessel字典
Args:
G: 有向图,节点为设备和容器,边为流体管道
vessel: 搅拌容器字典从XDL传入或容器ID字符串
time: 搅拌时间(支持 "300", "5 min", "1 h" 等格式)
stir_time: 指定搅拌时间优先级比time低
time_spec: 时间规格(优先级最高)
event: 事件描述
stir_speed: 搅拌速度RPM
settling_time: 沉降时间
**kwargs: 其他可选参数
Returns:
List[Dict[str, Any]]: 搅拌操作的动作序列
"""
# 🔧 核心修改从vessel参数中提取vessel_id
vessel_id = extract_vessel_id(vessel)
vessel_display = get_vessel_display_info(vessel)
debug_print("🌪️" * 20)
debug_print("🚀 开始生成搅拌协议 ")
debug_print("🚀 开始生成搅拌协议支持vessel字典")
debug_print(f"📝 输入参数:")
debug_print(f" 🥽 vessel: {vessel}")
debug_print(f" 🥽 vessel: {vessel_display} (ID: {vessel_id})")
debug_print(f" ⏰ time: {time}")
debug_print(f" 🕐 stir_time: {stir_time}")
debug_print(f" 🎯 time_spec: {time_spec}")
@@ -166,13 +225,13 @@ def generate_stir_protocol(
# 📋 参数验证
debug_print("📍 步骤1: 参数验证... 🔧")
if not vessel:
if not vessel_id: # 🔧 使用 vessel_id
debug_print("❌ vessel 参数不能为空! 😱")
raise ValueError("vessel 参数不能为空")
if vessel not in G.nodes():
debug_print(f"❌ 容器 '{vessel}' 不存在于系统中! 😞")
raise ValueError(f"容器 '{vessel}' 不存在于系统中")
if vessel_id not in G.nodes(): # 🔧 使用 vessel_id
debug_print(f"❌ 容器 '{vessel_id}' 不存在于系统中! 😞")
raise ValueError(f"容器 '{vessel_id}' 不存在于系统中")
debug_print("✅ 基础参数验证通过 🎯")
@@ -220,7 +279,7 @@ def generate_stir_protocol(
# 🔍 查找设备
debug_print("📍 步骤3: 查找搅拌设备... 🔍")
try:
stirrer_id = find_connected_stirrer(G, vessel)
stirrer_id = find_connected_stirrer(G, vessel_id) # 🔧 使用 vessel_id
debug_print(f"🎉 使用搅拌设备: {stirrer_id}")
except Exception as e:
debug_print(f"❌ 设备查找失败: {str(e)} 😭")
@@ -234,7 +293,7 @@ def generate_stir_protocol(
"device_id": stirrer_id,
"action_name": "stir",
"action_kwargs": {
"vessel": vessel,
"vessel": vessel_id, # 🔧 使用 vessel_id
"time": str(time), # 保持原始格式
"event": event,
"time_spec": time_spec,
@@ -256,7 +315,7 @@ def generate_stir_protocol(
debug_print("🎊" * 20)
debug_print(f"🎉 搅拌协议生成完成! ✨")
debug_print(f"📊 总动作数: {len(action_sequence)}")
debug_print(f"🥽 搅拌容器: {vessel}")
debug_print(f"🥽 搅拌容器: {vessel_display}")
debug_print(f"🌪️ 搅拌参数: {stir_speed} RPM, {parsed_time}s, 沉降 {parsed_settling_time}s")
debug_print(f"⏱️ 预计总时间: {(parsed_time + parsed_settling_time)/60:.1f} 分钟 ⌛")
debug_print("🎊" * 20)
@@ -265,18 +324,36 @@ def generate_stir_protocol(
def generate_start_stir_protocol(
G: nx.DiGraph,
vessel: str,
vessel: Union[str, dict], # 🔧 修改支持vessel字典
stir_speed: float = 300.0,
purpose: str = "",
**kwargs
) -> List[Dict[str, Any]]:
"""生成开始搅拌操作的协议序列"""
"""
生成开始搅拌操作的协议序列 - 支持vessel字典
debug_print("🔄 开始生成启动搅拌协议 ✨")
debug_print(f"🥽 vessel: {vessel}, 🌪️ speed: {stir_speed} RPM")
Args:
G: 有向图
vessel: 搅拌容器字典或容器ID字符串
stir_speed: 搅拌速度RPM
purpose: 搅拌目的描述
**kwargs: 其他可选参数
Returns:
List[Dict[str, Any]]: 开始搅拌操作的动作序列
"""
# 🔧 核心修改从vessel参数中提取vessel_id
vessel_id = extract_vessel_id(vessel)
vessel_display = get_vessel_display_info(vessel)
debug_print("🔄 开始生成启动搅拌协议支持vessel字典")
debug_print(f"🥽 vessel: {vessel_display} (ID: {vessel_id})")
debug_print(f"🌪️ speed: {stir_speed} RPM")
debug_print(f"🎯 purpose: {purpose}")
# 基础验证
if not vessel or vessel not in G.nodes():
if not vessel_id or vessel_id not in G.nodes(): # 🔧 使用 vessel_id
debug_print("❌ 容器验证失败!")
raise ValueError("vessel 参数无效")
@@ -286,14 +363,14 @@ def generate_start_stir_protocol(
stir_speed = 300.0
# 查找设备
stirrer_id = find_connected_stirrer(G, vessel)
stirrer_id = find_connected_stirrer(G, vessel_id) # 🔧 使用 vessel_id
# 生成动作
action_sequence = [{
"device_id": stirrer_id,
"action_name": "start_stir",
"action_kwargs": {
"vessel": vessel,
"vessel": vessel_id, # 🔧 使用 vessel_id
"stir_speed": stir_speed,
"purpose": purpose or f"启动搅拌 {stir_speed} RPM"
}
@@ -304,38 +381,130 @@ def generate_start_stir_protocol(
def generate_stop_stir_protocol(
G: nx.DiGraph,
vessel: str,
vessel: Union[str, dict], # 🔧 修改支持vessel字典
**kwargs
) -> List[Dict[str, Any]]:
"""生成停止搅拌操作的协议序列"""
"""
生成停止搅拌操作的协议序列 - 支持vessel字典
debug_print("🛑 开始生成停止搅拌协议 ✨")
debug_print(f"🥽 vessel: {vessel}")
Args:
G: 有向图
vessel: 搅拌容器字典或容器ID字符串
**kwargs: 其他可选参数
Returns:
List[Dict[str, Any]]: 停止搅拌操作的动作序列
"""
# 🔧 核心修改从vessel参数中提取vessel_id
vessel_id = extract_vessel_id(vessel)
vessel_display = get_vessel_display_info(vessel)
debug_print("🛑 开始生成停止搅拌协议支持vessel字典")
debug_print(f"🥽 vessel: {vessel_display} (ID: {vessel_id})")
# 基础验证
if not vessel or vessel not in G.nodes():
if not vessel_id or vessel_id not in G.nodes(): # 🔧 使用 vessel_id
debug_print("❌ 容器验证失败!")
raise ValueError("vessel 参数无效")
# 查找设备
stirrer_id = find_connected_stirrer(G, vessel)
stirrer_id = find_connected_stirrer(G, vessel_id) # 🔧 使用 vessel_id
# 生成动作
action_sequence = [{
"device_id": stirrer_id,
"action_name": "stop_stir",
"action_kwargs": {
"vessel": vessel
"vessel": vessel_id # 🔧 使用 vessel_id
}
}]
debug_print(f"✅ 停止搅拌协议生成完成 🎯")
return action_sequence
# 🔧 新增:便捷函数
def stir_briefly(G: nx.DiGraph, vessel: Union[str, dict],
speed: float = 300.0) -> List[Dict[str, Any]]:
"""短时间搅拌30秒"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"⚡ 短时间搅拌: {vessel_display} @ {speed}RPM (30s)")
return generate_stir_protocol(G, vessel, time="30", stir_speed=speed)
def stir_slowly(G: nx.DiGraph, vessel: Union[str, dict],
time: Union[str, float] = "10 min") -> List[Dict[str, Any]]:
"""慢速搅拌"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"🐌 慢速搅拌: {vessel_display} @ 150RPM")
return generate_stir_protocol(G, vessel, time=time, stir_speed=150.0)
def stir_vigorously(G: nx.DiGraph, vessel: Union[str, dict],
time: Union[str, float] = "5 min") -> List[Dict[str, Any]]:
"""剧烈搅拌"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"💨 剧烈搅拌: {vessel_display} @ 800RPM")
return generate_stir_protocol(G, vessel, time=time, stir_speed=800.0)
def stir_for_reaction(G: nx.DiGraph, vessel: Union[str, dict],
time: Union[str, float] = "1 h") -> List[Dict[str, Any]]:
"""反应搅拌(标准速度,长时间)"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"🧪 反应搅拌: {vessel_display} @ 400RPM")
return generate_stir_protocol(G, vessel, time=time, stir_speed=400.0)
def stir_for_dissolution(G: nx.DiGraph, vessel: Union[str, dict],
time: Union[str, float] = "15 min") -> List[Dict[str, Any]]:
"""溶解搅拌(中等速度)"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"💧 溶解搅拌: {vessel_display} @ 500RPM")
return generate_stir_protocol(G, vessel, time=time, stir_speed=500.0)
def stir_gently(G: nx.DiGraph, vessel: Union[str, dict],
time: Union[str, float] = "30 min") -> List[Dict[str, Any]]:
"""温和搅拌"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"🍃 温和搅拌: {vessel_display} @ 200RPM")
return generate_stir_protocol(G, vessel, time=time, stir_speed=200.0)
def stir_overnight(G: nx.DiGraph, vessel: Union[str, dict]) -> List[Dict[str, Any]]:
"""过夜搅拌模拟时缩短为2小时"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"🌙 过夜搅拌模拟2小时: {vessel_display} @ 300RPM")
return generate_stir_protocol(G, vessel, time="2 h", stir_speed=300.0)
def start_continuous_stirring(G: nx.DiGraph, vessel: Union[str, dict],
speed: float = 300.0, purpose: str = "continuous stirring") -> List[Dict[str, Any]]:
"""开始连续搅拌"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"🔄 开始连续搅拌: {vessel_display} @ {speed}RPM")
return generate_start_stir_protocol(G, vessel, stir_speed=speed, purpose=purpose)
def stop_all_stirring(G: nx.DiGraph, vessel: Union[str, dict]) -> List[Dict[str, Any]]:
"""停止所有搅拌"""
vessel_display = get_vessel_display_info(vessel)
debug_print(f"🛑 停止搅拌: {vessel_display}")
return generate_stop_stir_protocol(G, vessel)
# 测试函数
def test_stir_protocol():
"""测试搅拌协议"""
debug_print("🧪 === STIR PROTOCOL 测试 === ✨")
# 测试vessel参数处理
debug_print("🔧 测试vessel参数处理...")
# 测试字典格式
vessel_dict = {"id": "flask_1", "name": "反应瓶1"}
vessel_id = extract_vessel_id(vessel_dict)
vessel_display = get_vessel_display_info(vessel_dict)
debug_print(f" 字典格式: {vessel_dict} → ID: {vessel_id}, 显示: {vessel_display}")
# 测试字符串格式
vessel_str = "flask_2"
vessel_id = extract_vessel_id(vessel_str)
vessel_display = get_vessel_display_info(vessel_str)
debug_print(f" 字符串格式: {vessel_str} → ID: {vessel_id}, 显示: {vessel_display}")
debug_print("✅ 测试完成 🎉")
if __name__ == "__main__":