Compare commits

...

11 Commits

Author SHA1 Message Date
ZiWei
4ddabdcb65 Refactor Bioyond workstation and experiment workflow (#105)
Refactored the Bioyond workstation classes to improve parameter handling and workflow management. Updated experiment.py to use BioyondReactionStation with deck and material mappings, and enhanced workflow step parameter mapping and execution logic. Adjusted JSON experiment configs, improved workflow sequence handling, and added UUID assignment to PLR materials. Removed unused station_config and material cache logic, and added detailed docstrings and debug output for workflow methods.
2025-10-14 02:46:31 +08:00
Xuwznln
a5b0325301 Tip more error log 2025-10-14 02:29:14 +08:00
Xuwznln
50b44938c7 Force confirm uuid 2025-10-14 02:22:39 +08:00
Xuwznln
df0d2235b0 Fix resource tree update 2025-10-14 01:55:29 +08:00
Xuwznln
4e434eeb97 Fix resource tree update 2025-10-14 01:53:04 +08:00
Xuwznln
ca027bf0eb Fix multiple resource error 2025-10-14 01:45:08 +08:00
Xuwznln
635a332b4e Fix workstation deck & children resource dupe 2025-10-14 00:21:37 +08:00
Xuwznln
edf7a117ca Fix workstation deck & children resource dupe 2025-10-14 00:21:16 +08:00
Xuwznln
70b2715996 Fix workstation resource not tracking 2025-10-14 00:05:41 +08:00
Xuwznln
7e8dfc2dc5 Fix children key error 2025-10-13 23:34:17 +08:00
Xuwznln
9b626489a8 Fix children key error 2025-10-13 21:20:42 +08:00
11 changed files with 643 additions and 176 deletions

View File

@@ -8,7 +8,7 @@
],
"parent": null,
"type": "device",
"class": "dispensing_station.bioyond",
"class": "workstation.bioyond_dispensing_station",
"config": {
"config": {
"api_key": "DE9BDDA0",
@@ -20,13 +20,6 @@
"_resource_type": "unilabos.resources.bioyond.decks:BIOYOND_PolymerPreparationStation_Deck"
}
},
"station_config": {
"station_type": "dispensing_station",
"enable_dispensing_station": true,
"enable_reaction_station": false,
"station_name": "DispensingStation_001",
"description": "Bioyond配液工作站"
},
"protocol_type": []
},
"data": {}
@@ -57,4 +50,4 @@
"data": {}
}
]
}
}

View File

@@ -24,9 +24,13 @@
"Drip_back": "3a162cf9-6aac-565a-ddd7-682ba1796a4a"
},
"material_type_mappings": {
"烧杯": "BIOYOND_PolymerStation_1FlaskCarrier",
"试剂瓶": "BIOYOND_PolymerStation_1BottleCarrier",
"样品板": "BIOYOND_PolymerStation_6VialCarrier"
"烧杯": ["BIOYOND_PolymerStation_1FlaskCarrier", "3a14196b-24f2-ca49-9081-0cab8021bf1a"],
"试剂瓶": ["BIOYOND_PolymerStation_1BottleCarrier", ""],
"样品板": ["BIOYOND_PolymerStation_6StockCarrier", "3a14196e-b7a0-a5da-1931-35f3000281e9"],
"分装板": ["BIOYOND_PolymerStation_6VialCarrier", "3a14196e-5dfe-6e21-0c79-fe2036d052c4"],
"样品瓶": ["BIOYOND_PolymerStation_Solid_Stock", "3a14196a-cf7d-8aea-48d8-b9662c7dba94"],
"90%分装小瓶": ["BIOYOND_PolymerStation_Solid_Vial", "3a14196c-cdcf-088d-dc7d-5cf38f0ad9ea"],
"10%分装小瓶": ["BIOYOND_PolymerStation_Liquid_Vial", "3a14196c-76be-2279-4e22-7310d69aed68"]
}
},
"deck": {
@@ -42,7 +46,6 @@
{
"id": "Bioyond_Deck",
"name": "Bioyond_Deck",
"sample_id": null,
"children": [
],
"parent": "reaction_station_bioyond",

View File

@@ -6,8 +6,15 @@ from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstati
class BioyondDispensingStation(BioyondWorkstation):
def __init__(self, config):
super().__init__(config)
def __init__(
self,
config,
# 桌子
deck,
*args,
**kwargs,
):
super().__init__(config, deck, *args, **kwargs)
# self.config = config
# self.api_key = config["api_key"]
# self.host = config["api_host"]

View File

@@ -1,203 +1,205 @@
# experiment_workflow.py
"""
实验流程主程序
"""
import json
from bioyond_rpc import BioyondV1RPC
from config import API_CONFIG, WORKFLOW_MAPPINGS
from reaction_station import BioyondReactionStation
from config import API_CONFIG, WORKFLOW_MAPPINGS, DECK_CONFIG, MATERIAL_TYPE_MAPPINGS
def run_experiment():
"""运行实验流程"""
# 初始化Bioyond客户端
config = {
**API_CONFIG,
"workflow_mappings": WORKFLOW_MAPPINGS
"workflow_mappings": WORKFLOW_MAPPINGS,
"material_type_mappings": MATERIAL_TYPE_MAPPINGS
}
Bioyond = BioyondV1RPC(config)
# 创建BioyondReactionStation实例传入deck配置
Bioyond = BioyondReactionStation(
config=config,
deck=DECK_CONFIG
)
print("\n============= 多工作流参数测试(简化接口+材料缓存)=============")
# 显示可用的材料名称前20个
available_materials = Bioyond.get_available_materials()
available_materials = Bioyond.hardware_interface.get_available_materials()
print(f"可用材料名称前20个: {available_materials[:20]}")
print(f"总共有 {len(available_materials)} 个材料可用\n")
# 1. 反应器放入
print("1. 添加反应器放入工作流,带参数...")
Bioyond.reactor_taken_in(
assign_material_name="BTDA-DD",
cutoff="10000",
assign_material_name="BTDA-DD",
cutoff="10000",
temperature="-10"
)
# 2. 液体投料-烧杯 (第一个)
print("2. 添加液体投料-烧杯,带参数...")
Bioyond.liquid_feeding_beaker(
volume="34768.7",
volume="34768.7",
assign_material_name="ODA",
time="0",
torque_variation="1",
titrationType="1",
time="0",
torque_variation="1",
titrationType="1",
temperature=-10
)
# 3. 液体投料-烧杯 (第二个)
print("3. 添加液体投料-烧杯,带参数...")
Bioyond.liquid_feeding_beaker(
volume="34080.9",
volume="34080.9",
assign_material_name="MPDA",
time="5",
torque_variation="2",
titrationType="1",
time="5",
torque_variation="2",
titrationType="1",
temperature=0
)
# 4. 液体投料-小瓶非滴定
print("4. 添加液体投料-小瓶非滴定,带参数...")
Bioyond.liquid_feeding_vials_non_titration(
volumeFormula="639.5",
assign_material_name="SIDA",
titration_type="1",
time="0",
torque_variation="1",
volumeFormula="639.5",
assign_material_name="SIDA",
titration_type="1",
time="0",
torque_variation="1",
temperature=-10
)
# 5. 液体投料溶剂
print("5. 添加液体投料溶剂,带参数...")
Bioyond.liquid_feeding_solvents(
assign_material_name="NMP",
volume="19000",
titration_type="1",
time="5",
torque_variation="2",
volume="19000",
titration_type="1",
time="5",
torque_variation="2",
temperature=-10
)
# 6-8. 固体进料小瓶 (三个)
print("6. 添加固体进料小瓶,带参数...")
Bioyond.solid_feeding_vials(
material_id="3",
time="180",
material_id="3",
time="180",
torque_variation="2",
assign_material_name="BTDA-1",
assign_material_name="BTDA1",
temperature=-10.00
)
#二杆样品版90
print("7. 添加固体进料小瓶,带参数...")
Bioyond.solid_feeding_vials(
material_id="3",
time="180",
material_id="3",
time="180",
torque_variation="2",
assign_material_name="BTDA-2",
assign_material_name="BTDA2",
temperature=25.00
)
#二杆样品版90
print("8. 添加固体进料小瓶,带参数...")
Bioyond.solid_feeding_vials(
material_id="3",
time="480",
material_id="3",
time="480",
torque_variation="2",
assign_material_name="BTDA-3",
assign_material_name="BTDA3",
temperature=25.00
)
# 液体投料滴定(第一个)
print("9. 添加液体投料滴定,带参数...") # ODPA
Bioyond.liquid_feeding_titration(
volume_formula="1000",
volume_formula="{{6-0-5}}+{{7-0-5}}+{{8-0-5}}",
assign_material_name="BTDA-DD",
titration_type="1",
time="360",
torque_variation="2",
titration_type="1",
time="360",
torque_variation="2",
temperature="25.00"
)
# 液体投料滴定(第二个)
print("10. 添加液体投料滴定,带参数...") # ODPA
Bioyond.liquid_feeding_titration(
volume_formula="500",
volume_formula="500",
assign_material_name="BTDA-DD",
titration_type="1",
time="360",
torque_variation="2",
titration_type="1",
time="360",
torque_variation="2",
temperature="25.00"
)
# 液体投料滴定(第三个)
print("11. 添加液体投料滴定,带参数...") # ODPA
Bioyond.liquid_feeding_titration(
volume_formula="500",
volume_formula="500",
assign_material_name="BTDA-DD",
titration_type="1",
time="360",
torque_variation="2",
titration_type="1",
time="360",
torque_variation="2",
temperature="25.00"
)
print("12. 添加液体投料滴定,带参数...") # ODPA
Bioyond.liquid_feeding_titration(
volume_formula="500",
volume_formula="500",
assign_material_name="BTDA-DD",
titration_type="1",
time="360",
torque_variation="2",
titration_type="1",
time="360",
torque_variation="2",
temperature="25.00"
)
print("13. 添加液体投料滴定,带参数...") # ODPA
Bioyond.liquid_feeding_titration(
volume_formula="500",
volume_formula="500",
assign_material_name="BTDA-DD",
titration_type="1",
time="360",
torque_variation="2",
titration_type="1",
time="360",
torque_variation="2",
temperature="25.00"
)
print("14. 添加液体投料滴定,带参数...") # ODPA
Bioyond.liquid_feeding_titration(
volume_formula="500",
volume_formula="500",
assign_material_name="BTDA-DD",
titration_type="1",
time="360",
torque_variation="2",
titration_type="1",
time="360",
torque_variation="2",
temperature="25.00"
)
print("15. 添加液体投料溶剂,带参数...")
Bioyond.liquid_feeding_solvents(
assign_material_name="PGME",
volume="16894.6",
titration_type="1",
time="360",
torque_variation="2",
volume="16894.6",
titration_type="1",
time="360",
torque_variation="2",
temperature=25.00
)
# 16. 反应器取出
print("16. 添加反应器取出工作流...")
Bioyond.reactor_taken_out()
# 显示当前工作流序列
sequence = Bioyond.get_workflow_sequence()
print("\n当前工作流执行顺序:")
print(sequence)
# 执行process_and_execute_workflow合并工作流并创建任务
print("\n4. 执行process_and_execute_workflow...")
result = Bioyond.process_and_execute_workflow(
workflow_name="test3_86",
task_name="实验3_86"
workflow_name="test3_8",
task_name="实验3_8"
)
# 显示执行结果
print("\n5. 执行结果:")
if isinstance(result, str):
@@ -220,16 +222,16 @@ def run_experiment():
print(f"- 任务结果: {result.get('task')}")
else:
print(f"任务创建失败: {result.get('error')}")
# 可选:启动调度器
# Bioyond.scheduler_start()
return Bioyond
def prepare_materials(bioyond):
"""准备实验材料(可选)"""
# 样品板材料数据定义
material_data_yp_1 = {
"typeId": "3a142339-80de-8f25-6093-1b1b1b6c322e",
@@ -288,7 +290,7 @@ def prepare_materials(bioyond):
],
"Parameters": "{}"
}
material_data_yp_2 = {
"typeId": "3a142339-80de-8f25-6093-1b1b1b6c322e",
"name": "样品板-2",
@@ -338,7 +340,7 @@ def prepare_materials(bioyond):
],
"Parameters": "{}"
}
# 烧杯材料数据定义
beaker_materials = [
{
@@ -377,12 +379,12 @@ def prepare_materials(bioyond):
"parameters": "{\"DeviceMaterialType\":\"NMP\"}"
}
]
# 如果需要可以在这里调用add_material方法添加材料
# 例如:
# result = bioyond.add_material(json.dumps(material_data_yp_1))
# print(f"添加材料结果: {result}")
return {
"sample_plates": [material_data_yp_1, material_data_yp_2],
"beakers": beaker_materials
@@ -392,7 +394,7 @@ def prepare_materials(bioyond):
if __name__ == "__main__":
# 运行主实验流程
bioyond_client = run_experiment()
# 可选:准备材料数据
# materials = prepare_materials(bioyond_client)
# print(f"\n准备的材料数据: {materials}")

View File

@@ -1,30 +1,67 @@
import json
from typing import List, Dict, Any
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
from unilabos.devices.workstation.bioyond_studio.config import (
API_CONFIG, WORKFLOW_MAPPINGS, WORKFLOW_STEP_IDS, MATERIAL_TYPE_MAPPINGS,
STATION_TYPES, DEFAULT_STATION_CONFIG
WORKFLOW_STEP_IDS,
WORKFLOW_TO_SECTION_MAP
)
class BioyondReactionStation(BioyondWorkstation):
def __init__(self, config: dict = None):
super().__init__(config)
"""Bioyond反应站类
继承自BioyondWorkstation,提供反应站特定的业务方法
"""
def __init__(self, config: dict = None, deck=None):
"""初始化反应站
Args:
config: 配置字典,应包含workflow_mappings等配置
deck: Deck对象
"""
# 如果 deck 作为独立参数传入,使用它;否则尝试从 config 中提取
if deck is None and config:
deck = config.get('deck')
# 调试信息:检查传入的config
print(f"BioyondReactionStation初始化 - config包含workflow_mappings: {'workflow_mappings' in (config or {})}")
if config and 'workflow_mappings' in config:
print(f"workflow_mappings内容: {config['workflow_mappings']}")
# 将 config 作为 bioyond_config 传递给父类
super().__init__(bioyond_config=config, deck=deck)
# 调试信息:检查初始化后的workflow_mappings
print(f"BioyondReactionStation初始化完成 - workflow_mappings: {self.workflow_mappings}")
print(f"workflow_mappings长度: {len(self.workflow_mappings)}")
# ==================== 工作流方法 ====================
# 工作流方法
def reactor_taken_out(self):
"""反应器取出"""
self.hardware_interface.append_to_workflow_sequence('{"web_workflow_name": "reactor_taken_out"}')
self.append_to_workflow_sequence('{"web_workflow_name": "reactor_taken_out"}')
reactor_taken_out_params = {"param_values": {}}
self.hardware_interface.pending_task_params.append(reactor_taken_out_params)
self.pending_task_params.append(reactor_taken_out_params)
print(f"成功添加反应器取出工作流")
print(f"当前队列长度: {len(self.hardware_interface.pending_task_params)}")
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
def reactor_taken_in(self, assign_material_name: str, cutoff: str = "900000", temperature: float = -10.00):
"""反应器放入"""
def reactor_taken_in(
self,
assign_material_name: str,
cutoff: str = "900000",
temperature: float = -10.00
):
"""反应器放入
Args:
assign_material_name: 物料名称
cutoff: 截止参数
temperature: 温度
"""
self.append_to_workflow_sequence('{"web_workflow_name": "reactor_taken_in"}')
material_id = self._get_material_id_by_name(assign_material_name)
material_id = self.hardware_interface._get_material_id_by_name(assign_material_name)
if isinstance(temperature, str):
temperature = float(temperature)
@@ -45,11 +82,25 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
def solid_feeding_vials(self, material_id: str, time: str = "0", torque_variation: str = "1",
assign_material_name: str = None, temperature: float = 25.00):
"""固体进料小瓶"""
def solid_feeding_vials(
self,
material_id: str,
time: str = "0",
torque_variation: str = "1",
assign_material_name: str = None,
temperature: float = 25.00
):
"""固体进料小瓶
Args:
material_id: 物料ID
time: 时间
torque_variation: 扭矩变化
assign_material_name: 物料名称
temperature: 温度
"""
self.append_to_workflow_sequence('{"web_workflow_name": "Solid_feeding_vials"}')
material_id_m = self._get_material_id_by_name(assign_material_name)
material_id_m = self.hardware_interface._get_material_id_by_name(assign_material_name)
if isinstance(temperature, str):
temperature = float(temperature)
@@ -76,12 +127,27 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
def liquid_feeding_vials_non_titration(self, volumeFormula: str, assign_material_name: str,
titration_type: str = "1", time: str = "0",
torque_variation: str = "1", temperature: float = 25.00):
"""液体进料小瓶(非滴定)"""
def liquid_feeding_vials_non_titration(
self,
volumeFormula: str,
assign_material_name: str,
titration_type: str = "1",
time: str = "0",
torque_variation: str = "1",
temperature: float = 25.00
):
"""液体进料小瓶(非滴定)
Args:
volumeFormula: 体积公式
assign_material_name: 物料名称
titration_type: 滴定类型
time: 时间
torque_variation: 扭矩变化
temperature: 温度
"""
self.append_to_workflow_sequence('{"web_workflow_name": "Liquid_feeding_vials(non-titration)"}')
material_id = self._get_material_id_by_name(assign_material_name)
material_id = self.hardware_interface._get_material_id_by_name(assign_material_name)
if isinstance(temperature, str):
temperature = float(temperature)
@@ -109,11 +175,27 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
def liquid_feeding_solvents(self, assign_material_name: str, volume: str, titration_type: str = "1",
time: str = "360", torque_variation: str = "2", temperature: float = 25.00):
"""液体进料溶剂"""
def liquid_feeding_solvents(
self,
assign_material_name: str,
volume: str,
titration_type: str = "1",
time: str = "360",
torque_variation: str = "2",
temperature: float = 25.00
):
"""液体进料溶剂
Args:
assign_material_name: 物料名称
volume: 体积
titration_type: 滴定类型
time: 时间
torque_variation: 扭矩变化
temperature: 温度
"""
self.append_to_workflow_sequence('{"web_workflow_name": "Liquid_feeding_solvents"}')
material_id = self._get_material_id_by_name(assign_material_name)
material_id = self.hardware_interface._get_material_id_by_name(assign_material_name)
if isinstance(temperature, str):
temperature = float(temperature)
@@ -141,11 +223,27 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
def liquid_feeding_titration(self, volume_formula: str, assign_material_name: str, titration_type: str = "1",
time: str = "90", torque_variation: int = 2, temperature: float = 25.00):
"""液体进料(滴定)"""
def liquid_feeding_titration(
self,
volume_formula: str,
assign_material_name: str,
titration_type: str = "1",
time: str = "90",
torque_variation: int = 2,
temperature: float = 25.00
):
"""液体进料(滴定)
Args:
volume_formula: 体积公式
assign_material_name: 物料名称
titration_type: 滴定类型
time: 时间
torque_variation: 扭矩变化
temperature: 温度
"""
self.append_to_workflow_sequence('{"web_workflow_name": "Liquid_feeding(titration)"}')
material_id = self._get_material_id_by_name(assign_material_name)
material_id = self.hardware_interface._get_material_id_by_name(assign_material_name)
if isinstance(temperature, str):
temperature = float(temperature)
@@ -173,12 +271,27 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
def liquid_feeding_beaker(self, volume: str = "35000", assign_material_name: str = "BAPP",
time: str = "0", torque_variation: str = "1", titrationType: str = "1",
temperature: float = 25.00):
"""液体进料烧杯"""
def liquid_feeding_beaker(
self,
volume: str = "35000",
assign_material_name: str = "BAPP",
time: str = "0",
torque_variation: str = "1",
titrationType: str = "1",
temperature: float = 25.00
):
"""液体进料烧杯
Args:
volume: 体积
assign_material_name: 物料名称
time: 时间
torque_variation: 扭矩变化
titrationType: 滴定类型
temperature: 温度
"""
self.append_to_workflow_sequence('{"web_workflow_name": "liquid_feeding_beaker"}')
material_id = self._get_material_id_by_name(assign_material_name)
material_id = self.hardware_interface._get_material_id_by_name(assign_material_name)
if isinstance(temperature, str):
temperature = float(temperature)
@@ -204,4 +317,323 @@ class BioyondReactionStation(BioyondWorkstation):
self.pending_task_params.append(params)
print(f"成功添加液体进料烧杯参数: volume={volume}μL, material={assign_material_name}->ID:{material_id}")
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
return json.dumps({"suc": True})
# ==================== 工作流管理方法 ====================
def get_workflow_sequence(self) -> List[str]:
"""获取当前工作流执行顺序
Returns:
工作流名称列表
"""
id_to_name = {workflow_id: name for name, workflow_id in self.workflow_mappings.items()}
workflow_names = []
for workflow_id in self.workflow_sequence:
workflow_names.append(id_to_name.get(workflow_id, workflow_id))
return workflow_names
def workflow_step_query(self, workflow_id: str) -> dict:
"""查询工作流步骤参数
Args:
workflow_id: 工作流ID
Returns:
工作流步骤参数字典
"""
return self.hardware_interface.workflow_step_query(workflow_id)
def create_order(self, json_str: str) -> dict:
"""创建订单
Args:
json_str: 订单参数的JSON字符串
Returns:
创建结果
"""
return self.hardware_interface.create_order(json_str)
# ==================== 工作流执行核心方法 ====================
# 发布任务
def process_and_execute_workflow(self, workflow_name: str, task_name: str) -> dict:
web_workflow_list = self.get_workflow_sequence()
workflow_name = workflow_name
pending_params_backup = self.pending_task_params.copy()
print(f"保存pending_task_params副本{len(pending_params_backup)}个参数")
# 1. 处理网页工作流列表
print(f"处理网页工作流列表: {web_workflow_list}")
web_workflow_json = json.dumps({"web_workflow_list": web_workflow_list})
workflows_result = self.process_web_workflows(web_workflow_json)
if not workflows_result:
error_msg = "处理网页工作流列表失败"
print(error_msg)
result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "process_web_workflows"})
return result
# 2. 合并工作流序列
print(f"合并工作流序列,名称: {workflow_name}")
merge_json = json.dumps({"name": workflow_name})
merged_workflow = self.merge_sequence_workflow(merge_json)
print(f"合并工作流序列结果: {merged_workflow}")
if not merged_workflow:
error_msg = "合并工作流序列失败"
print(error_msg)
result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "merge_sequence_workflow"})
return result
# 3. 合并所有参数并创建任务
# 新API只返回状态信息需要适配处理
if isinstance(merged_workflow, dict) and merged_workflow.get("code") == 1:
# 新API返回格式{code: 1, message: "", timestamp: 0}
# 使用传入的工作流名称和生成的临时ID
final_workflow_name = workflow_name
workflow_id = f"merged_{workflow_name}_{self.hardware_interface.get_current_time_iso8601().replace('-', '').replace('T', '').replace(':', '').replace('.', '')[:14]}"
print(f"新API合并成功使用工作流创建任务: {final_workflow_name} (临时ID: {workflow_id})")
else:
# 旧API返回格式包含详细工作流信息
final_workflow_name = merged_workflow.get("name", workflow_name)
workflow_id = merged_workflow.get("subWorkflows", [{}])[0].get("id", "")
print(f"旧API格式使用工作流创建任务: {final_workflow_name} (ID: {workflow_id})")
if not workflow_id:
error_msg = "无法获取工作流ID"
print(error_msg)
result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "get_workflow_id"})
return result
workflow_query_json = json.dumps({"workflow_id": workflow_id})
workflow_params_structure = self.workflow_step_query(workflow_query_json)
self.pending_task_params = pending_params_backup
print(f"恢复pending_task_params{len(self.pending_task_params)}个参数")
param_values = self.generate_task_param_values(workflow_params_structure)
task_params = [{
"orderCode": f"BSO{self.hardware_interface.get_current_time_iso8601().replace('-', '').replace('T', '').replace(':', '').replace('.', '')[:14]}",
"orderName": f"实验-{self.hardware_interface.get_current_time_iso8601()[:10].replace('-', '')}",
"workFlowId": workflow_id,
"borderNumber": 1,
"paramValues": param_values,
"extendProperties": ""
}]
task_json = json.dumps(task_params)
print(f"创建任务参数: {type(task_json)}")
result = self.create_order(task_json)
if not result:
error_msg = "创建任务失败"
print(error_msg)
result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "create_order"})
return result
print(f"任务创建成功: {result}")
self.pending_task_params.clear()
print("已清空pending_task_params")
return {
"success": True,
"workflow": {"name": final_workflow_name, "id": workflow_id},
"task": result,
"method": "process_and_execute_workflow"
}
def merge_sequence_workflow(self, json_str: str) -> dict:
"""合并当前工作流序列
Args:
json_str: 包含name等参数的JSON字符串
Returns:
合并结果
"""
try:
data = json.loads(json_str)
name = data.get("name", "合并工作流")
step_parameters = data.get("stepParameters", {})
variables = data.get("variables", {})
except json.JSONDecodeError:
return {}
if not self.workflow_sequence:
print("工作流序列为空,无法合并")
return {}
# 将工作流ID列表转换为新API要求的格式
workflows = [{"id": workflow_id} for workflow_id in self.workflow_sequence]
# 构建新的API参数格式
params = {
"name": name,
"workflows": workflows,
"stepParameters": step_parameters,
"variables": variables
}
# 使用新的API接口
response = self.hardware_interface.post(
url=f'{self.hardware_interface.host}/api/lims/workflow/merge-workflow-with-parameters',
params={
"apiKey": self.hardware_interface.api_key,
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": params,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def generate_task_param_values(self, workflow_params_structure: dict) -> dict:
"""生成任务参数值
根据工作流参数结构和待处理的任务参数,生成最终的任务参数值
Args:
workflow_params_structure: 工作流参数结构
Returns:
任务参数值字典
"""
if not workflow_params_structure:
print("workflow_params_structure为空")
return {}
data = workflow_params_structure
# 从pending_task_params中提取实际参数值,按DisplaySectionName和Key组织
pending_params_by_section = {}
print(f"开始处理pending_task_params,共{len(self.pending_task_params)}个任务参数组")
# 获取工作流执行顺序,用于按顺序匹配参数
workflow_sequence = self.get_workflow_sequence()
print(f"工作流执行顺序: {workflow_sequence}")
workflow_index = 0
# 遍历所有待处理的任务参数
for i, task_param in enumerate(self.pending_task_params):
if 'param_values' in task_param:
print(f"处理第{i+1}个任务参数组,包含{len(task_param['param_values'])}个步骤")
if workflow_index < len(workflow_sequence):
current_workflow = workflow_sequence[workflow_index]
section_name = WORKFLOW_TO_SECTION_MAP.get(current_workflow)
print(f" 匹配到工作流: {current_workflow} -> {section_name}")
workflow_index += 1
else:
print(f" 警告: 参数组{i+1}超出了工作流序列范围")
continue
if not section_name:
print(f" 警告: 工作流{current_workflow}没有对应的DisplaySectionName")
continue
if section_name not in pending_params_by_section:
pending_params_by_section[section_name] = {}
# 处理每个步骤的参数
for step_id, param_list in task_param['param_values'].items():
print(f" 步骤ID: {step_id},参数数量: {len(param_list)}")
for param_item in param_list:
key = param_item.get('Key', '')
value = param_item.get('Value', '')
m = param_item.get('m', 0)
n = param_item.get('n', 0)
print(f" 参数: {key} = {value} (m={m}, n={n}) -> 分组到{section_name}")
param_key = f"{section_name}.{key}"
if param_key not in pending_params_by_section[section_name]:
pending_params_by_section[section_name][param_key] = []
pending_params_by_section[section_name][param_key].append({
'value': value,
'm': m,
'n': n
})
print(f"pending_params_by_section构建完成,包含{len(pending_params_by_section)}个分组")
# 收集所有参数,过滤TaskDisplayable为0的项
filtered_params = []
for step_id, step_info in data.items():
if isinstance(step_info, list):
for step_item in step_info:
param_list = step_item.get("parameterList", [])
for param in param_list:
if param.get("TaskDisplayable") == 0:
continue
param_with_step = param.copy()
param_with_step['step_id'] = step_id
param_with_step['step_name'] = step_item.get("name", "")
param_with_step['step_m'] = step_item.get("m", 0)
param_with_step['step_n'] = step_item.get("n", 0)
filtered_params.append(param_with_step)
# 按DisplaySectionIndex排序
filtered_params.sort(key=lambda x: x.get('DisplaySectionIndex', 0))
# 生成参数映射
param_mapping = {}
step_params = {}
for param in filtered_params:
step_id = param['step_id']
if step_id not in step_params:
step_params[step_id] = []
step_params[step_id].append(param)
# 为每个步骤生成参数
for step_id, params in step_params.items():
param_list = []
for param in params:
key = param.get('Key', '')
display_section_index = param.get('DisplaySectionIndex', 0)
step_m = param.get('step_m', 0)
step_n = param.get('step_n', 0)
section_name = param.get('DisplaySectionName', '')
param_key = f"{section_name}.{key}"
if section_name in pending_params_by_section and param_key in pending_params_by_section[section_name]:
pending_param_list = pending_params_by_section[section_name][param_key]
if pending_param_list:
pending_param = pending_param_list[0]
value = pending_param['value']
m = step_m
n = step_n
print(f" 匹配成功: {section_name}.{key} = {value} (m={m}, n={n})")
pending_param_list.pop(0)
else:
value = "1"
m = step_m
n = step_n
print(f" 匹配失败: {section_name}.{key},参数列表为空,使用默认值 = {value}")
else:
value = "1"
m = display_section_index
n = step_n
print(f" 匹配失败: {section_name}.{key},使用默认值 = {value} (m={m}, n={n})")
param_item = {
"m": m,
"n": n,
"key": key,
"value": str(value).strip()
}
param_list.append(param_item)
if param_list:
param_mapping[step_id] = param_list
print(f"生成任务参数值,包含 {len(param_mapping)} 个步骤")
return param_mapping

View File

@@ -129,7 +129,6 @@ class BioyondWorkstation(WorkstationBase):
self,
bioyond_config: Optional[Dict[str, Any]] = None,
deck: Optional[Any] = None,
station_config: Optional[Dict[str, Any]] = None,
*args,
**kwargs,
):
@@ -152,9 +151,6 @@ class BioyondWorkstation(WorkstationBase):
if isinstance(resource, WareHouse):
self.deck.warehouses[resource.name] = resource
# 配置站点类型
self._configure_station_type(station_config)
# 创建通信模块
self._create_communication_module(bioyond_config)
self.resource_synchronizer = BioyondResourceSynchronizer(self)
@@ -167,8 +163,6 @@ class BioyondWorkstation(WorkstationBase):
self.workflow_mappings = {}
self.workflow_sequence = []
self.pending_task_params = []
self.material_cache = {}
self._load_material_cache()
if "workflow_mappings" in bioyond_config:
self._set_workflow_mappings(bioyond_config["workflow_mappings"])
@@ -325,10 +319,22 @@ class BioyondWorkstation(WorkstationBase):
}
def append_to_workflow_sequence(self, web_workflow_name: str) -> bool:
workflow_id = self._get_workflow(web_workflow_name)
# 检查是否为JSON格式的字符串
actual_workflow_name = web_workflow_name
if web_workflow_name.startswith('{') and web_workflow_name.endswith('}'):
try:
data = json.loads(web_workflow_name)
actual_workflow_name = data.get("web_workflow_name", web_workflow_name)
print(f"解析JSON格式工作流名称: {web_workflow_name} -> {actual_workflow_name}")
except json.JSONDecodeError:
print(f"JSON解析失败使用原始字符串: {web_workflow_name}")
workflow_id = self._get_workflow(actual_workflow_name)
if workflow_id:
self.workflow_sequence.append(workflow_id)
print(f"添加工作流到执行顺序: {web_workflow_name} -> {workflow_id}")
print(f"添加工作流到执行顺序: {actual_workflow_name} -> {workflow_id}")
return True
return False
def set_workflow_sequence(self, json_str: str) -> List[str]:
try:

View File

@@ -171,7 +171,6 @@ class WorkstationBase(ABC):
def post_init(self, ros_node: ROS2WorkstationNode) -> None:
# 初始化物料系统
self._ros_node = ros_node
self._ros_node.update_resource([self.deck])
def _build_resource_mappings(self, deck: Deck):
"""递归构建资源映射"""

View File

@@ -4,6 +4,7 @@ import json
import os.path
import traceback
from typing import Union, Any, Dict, List, Tuple
import uuid
import networkx as nx
from pylabrobot.resources import ResourceHolder
from unilabos_msgs.msg import Resource
@@ -52,7 +53,7 @@ def canonicalize_nodes_data(
if not node.get("type"):
node["type"] = "device"
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'type', defaulting to 'device'", "warning")
if not node.get("name"):
if node.get("name", None) is None:
node["name"] = node.get("id")
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning")
if not isinstance(node.get("position"), dict):
@@ -67,7 +68,7 @@ def canonicalize_nodes_data(
if z is not None:
node["position"]["position"]["z"] = z
for k in list(node.keys()):
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data"]:
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children"]:
v = node.pop(k)
node["config"][k] = v
@@ -629,6 +630,7 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
{"name": material["name"], "class": className}, resource_type=ResourcePLR
)
plr_material.code = material.get("code", "") and material.get("barCode", "") or ""
plr_material.unilabos_uuid = str(uuid.uuid4())
# 处理子物料detail
if material.get("detail") and len(material["detail"]) > 0:

View File

@@ -266,7 +266,7 @@ class HostNode(BaseROS2DeviceNode):
old_uuid = reverse_uuid_mapping.get(sub_node.res_content.uuid)
if old_uuid:
# 找到旧UUID使用UUID查找
resource_instance = device_tracker.figure_resource({"uuid": old_uuid})
resource_instance = device_tracker.uuid_to_resources.get(old_uuid)
else:
# 未找到旧UUID使用name查找
resource_instance = device_tracker.figure_resource(
@@ -932,18 +932,25 @@ class HostNode(BaseROS2DeviceNode):
from unilabos.app.web.client import http_client
resource_start_time = time.time()
uuid_mapping = http_client.resource_tree_update(resource_tree_set, "", False)
success = bool(uuid_mapping)
resource_end_time = time.time()
self.lab_logger().info(
f"[Host Node-Resource] 物料更新上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms"
)
if uuid_mapping:
self.lab_logger().info(f"[Host Node-Resource] UUID映射: {len(uuid_mapping)} 个节点")
# 还需要加入到资源图中,暂不实现,考虑资源图新的获取方式
response.response = json.dumps(uuid_mapping)
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
uuid_to_trees: Dict[str, List[ResourceTreeInstance]] = collections.defaultdict(list)
for tree in resource_tree_set.trees:
uuid_to_trees[tree.root_node.res_content.uuid].append(tree)
for uid, trees in uuid_to_trees.items():
new_tree_set = ResourceTreeSet(trees)
resource_start_time = time.time()
uuid_mapping = http_client.resource_tree_add(new_tree_set, uid, False)
success = bool(uuid_mapping)
resource_end_time = time.time()
self.lab_logger().info(
f"[Host Node-Resource] 物料 {[root_node.res_content.id for root_node in new_tree_set.root_nodes]} 挂载 {uid} P{trees[0].root_node.res_content.parent} 更新上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms"
)
if uuid_mapping:
self.lab_logger().info(f"[Host Node-Resource] UUID映射: {len(uuid_mapping)} 个节点")
# 还需要加入到资源图中,暂不实现,考虑资源图新的获取方式
response.response = json.dumps(uuid_mapping)
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response):
"""

View File

@@ -1,3 +1,4 @@
import traceback
import uuid
from pydantic import BaseModel, field_serializer, field_validator
from pydantic import Field
@@ -140,7 +141,7 @@ class ResourceDictInstance(object):
def get_nested_dict(self) -> Dict[str, Any]:
"""获取资源实例的嵌套字典表示"""
res_dict = self.res_content.model_dump(by_alias=True)
res_dict["children"] = {child.res_content.name: child.get_nested_dict() for child in self.children}
res_dict["children"] = {child.res_content.id: child.get_nested_dict() for child in self.children}
res_dict["parent"] = self.res_content.parent_instance_name
res_dict["position"] = self.res_content.position.position.model_dump()
return res_dict
@@ -213,7 +214,7 @@ class ResourceTreeInstance(object):
if node.res_content.uuid:
known_uuids.add(node.res_content.uuid)
else:
print(f"警告: 资源 {node.res_content.id} 没有uuid")
logger.warning(f"警告: 资源 {node.res_content.id} 没有uuid")
# 验证并递归处理子节点
for child in node.children:
@@ -318,7 +319,12 @@ class ResourceTreeSet(object):
def build_uuid_mapping(res: "PLRResource", uuid_list: list):
"""递归构建uuid映射字典"""
uuid_list.append(getattr(res, "unilabos_uuid", ""))
uid = getattr(res, "unilabos_uuid", "")
if not uid:
uid = str(uuid.uuid4())
res.unilabos_uuid = uid
logger.warning(f"{res}没有uuid请设置后再传入默认填充{uid}\n{traceback.format_exc()}")
uuid_list.append(uid)
for child in res.children:
build_uuid_mapping(child, uuid_list)
@@ -868,8 +874,9 @@ class DeviceNodeResourceTracker(object):
def process(res):
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid:
old = self.uuid_to_resources.get(current_uuid)
self.uuid_to_resources[current_uuid] = res
logger.debug(f"收集资源UUID映射: {current_uuid} -> {res}")
logger.debug(f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}")
return 0
self._traverse_and_process(resource, process)
@@ -1037,13 +1044,19 @@ class DeviceNodeResourceTracker(object):
) -> List[Tuple[Any, Any]]:
res_list = []
# print(resource, target_resource_cls_type, identifier_key, compare_value)
children = getattr(resource, "children", [])
children = []
if not isinstance(resource, dict):
children = getattr(resource, "children", [])
else:
children = resource.get("children")
if children is not None:
children = list(children.values()) if isinstance(children, dict) else children
for child in children:
res_list.extend(
self.loop_find_resource(child, target_resource_cls_type, identifier_key, compare_value, resource)
)
if issubclass(type(resource), target_resource_cls_type):
if target_resource_cls_type == dict:
if type(resource) == dict:
# 对于字典类型,直接检查 identifier_key
if identifier_key in resource:
if resource[identifier_key] == compare_value:

View File

@@ -336,6 +336,9 @@ class WorkstationNodeCreator(DeviceClassCreator[T]):
try:
# 创建实例额外补充一个给protocol node的字段后面考虑取消
data["children"] = self.children
for material_id, child in self.children.items():
if child["type"] != "device":
self.resource_tracker.add_resource(self.children[material_id])
deck_dict = data.get("deck")
if deck_dict:
from pylabrobot.resources import Deck, Resource