1 Commits

Author SHA1 Message Date
calvincao
e60bf29a7f feat(workstation): 实现奔曜与扣电池装配工作流统一配置执行接口
- 新增 `run_bioyond_cell_workflow` 函数以支持通过配置驱动奔曜配液与转运流程
- 新增 `run_coin_cell_packaging_workflow` 函数以支持通过配置驱动扣电池装配流程
- 两个函数均接受字典配置参数,实现初始化、操作调用及日志记录等功能的灵活控制- 提供 keep_alive机制用于持续运行场景
- 更新主程序入口逻辑,使用新工作流函数替代原有手动调用方式
- 支持从配置中读取实验样本、调度器设置以及各项操作开关和日志选项- 添加对 Excel 订单创建路径的配置化支持- 引入路径对象处理文件输入,提升跨平台兼容性- 增强错误提示信息,确保必要字段如 create_orders 的 excel_path 存在
- 封装所有设备动作至标准化函数调用结构,便于维护和扩展
2025-11-19 09:51:24 +08:00
2 changed files with 209 additions and 27 deletions

View File

@@ -1168,23 +1168,129 @@ class BioyondCellWorkstation(BioyondWorkstation):
if __name__ == "__main__":
lab_registry.setup()
deck = BIOYOND_YB_Deck(setup=True)
ws = BioyondCellWorkstation(deck=deck)
# ws.update_push_ip() #直接修改奔耀端的报送ip地址
# ws.create_sample(name="配液瓶", board_type="配液瓶(小)板", bottle_type="配液瓶(小)", location_code="E01")
# ws.create_sample(name="分液瓶", board_type="5ml分液瓶板", bottle_type="5ml分液瓶", location_code="D01")
def run_bioyond_cell_workflow(config: Dict[str, Any]) -> BioyondCellWorkstation:
"""按照统一配置执行奔曜配液与转运工作流。
# # logger.info(ws.scheduler_stop())
# logger.info(ws.scheduler_start())
# logger.info(ws.auto_feeding4to3()) #搬运物料到3号箱
# 使用正斜杠或 Path 对象来指定文件路径
# excel_path = Path("/Users/calvincao/Desktop/work/uni-lab-all/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/2025092701.xlsx")
# logger.info(ws.create_orders(excel_path))
# logger.info(ws.transfer_3_to_2_to_1())
# logger.info(ws.transfer_1_to_2())
Args:
config: 统一的工作流配置。字段示例:
{
"lab_registry": {"setup": True},
"deck": {"setup": True},
"workstation": {"config": {...}},
"update_push_ip": True,
"samples": [
{"name": "...", "board_type": "...", "bottle_type": "...", "location_code": "...", "warehouse_name": "..."}
],
"scheduler": {"start": True, "log": True},
"operations": {
"auto_feeding4to3": {"enabled": True},
"create_orders": {"excel_path": "...", "log": True},
"transfer_3_to_2_to_1": {"enabled": True, "log": True},
"transfer_1_to_2": {"enabled": True, "log": True}
},
"keep_alive": False,
"keep_alive_interval": 1
}
Returns:
执行完毕的 `BioyondCellWorkstation` 实例。
"""
if config.get("lab_registry", {}).get("setup", True):
lab_registry.setup()
deck_config = config.get("deck")
if isinstance(deck_config, dict):
deck = BIOYOND_YB_Deck(**deck_config)
elif deck_config is None:
deck = BIOYOND_YB_Deck(setup=True)
else:
deck = deck_config
workstation_kwargs = dict(config.get("workstation", {}))
if "deck" not in workstation_kwargs:
workstation_kwargs["deck"] = deck
ws = BioyondCellWorkstation(**workstation_kwargs)
if config.get("update_push_ip", True):
ws.update_push_ip()
for sample_cfg in config.get("samples", []):
ws.create_sample(**sample_cfg)
scheduler_cfg = config.get("scheduler", {})
if scheduler_cfg.get("start", True):
result = ws.scheduler_start()
if scheduler_cfg.get("log", True):
logger.info(result)
operations_cfg = config.get("operations", {})
auto_feeding_cfg = operations_cfg.get("auto_feeding4to3", {})
if auto_feeding_cfg.get("enabled", True):
result = ws.auto_feeding4to3()
if auto_feeding_cfg.get("log", True):
logger.info(result)
create_orders_cfg = operations_cfg.get("create_orders")
if create_orders_cfg:
excel_path = create_orders_cfg.get("excel_path")
if not excel_path:
raise ValueError("create_orders 需要提供 excel_path。")
result = ws.create_orders(Path(excel_path))
if create_orders_cfg.get("log", True):
logger.info(result)
transfer_321_cfg = operations_cfg.get("transfer_3_to_2_to_1", {})
if transfer_321_cfg.get("enabled", True):
result = ws.transfer_3_to_2_to_1()
if transfer_321_cfg.get("log", True):
logger.info(result)
transfer_12_cfg = operations_cfg.get("transfer_1_to_2", {})
if transfer_12_cfg.get("enabled", True):
result = ws.transfer_1_to_2()
if transfer_12_cfg.get("log", True):
logger.info(result)
if config.get("keep_alive", False):
interval = config.get("keep_alive_interval", 1)
while True:
time.sleep(interval)
return ws
if __name__ == "__main__":
workflow_config = {
"deck": {"setup": True},
"update_push_ip": True,
"samples": [
{
"name": "配液瓶",
"board_type": "配液瓶(小)板",
"bottle_type": "配液瓶(小)",
"location_code": "E01",
},
{
"name": "分液瓶",
"board_type": "5ml分液瓶板",
"bottle_type": "5ml分液瓶",
"location_code": "D01",
},
],
"operations": {
"auto_feeding4to3": {"enabled": True, "log": True},
"create_orders": {
"excel_path": "/Users/calvincao/Desktop/work/uni-lab-all/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/2025092701.xlsx",
"log": True,
},
"transfer_3_to_2_to_1": {"enabled": True, "log": True},
"transfer_1_to_2": {"enabled": True, "log": True},
},
"keep_alive": True,
}
run_bioyond_cell_workflow(workflow_config)
# 1. location code
# 2. 实验文件

View File

@@ -1202,17 +1202,93 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
'''
def run_coin_cell_packaging_workflow(config: Dict[str, Any]) -> CoinCellAssemblyWorkstation:
"""根据统一配置顺序执行扣电池装配工作流。
Args:
config: 统一的工作流配置。字段示例:
{
"deck": {"setup": True, "name": "coin_cell_deck"},
"workstation": {"address": "...", "port": "...", "debug_mode": False},
"qiming": {...},
"init": True,
"auto": True,
"start": True,
"packaging": {
"bottle_num": 16,
"command": {...}
}
}
Returns:
执行完毕的 `CoinCellAssemblyWorkstation` 实例。
"""
deck_config = config.get("deck")
if isinstance(deck_config, Deck):
deck = deck_config
elif isinstance(deck_config, dict):
deck = CoincellDeck(**deck_config)
elif deck_config is None:
deck = CoincellDeck(setup=True, name="coin_cell_deck")
else:
raise ValueError("deck 配置需为 Deck 实例或 dict。")
workstation_config = dict(config.get("workstation", {}))
workstation_config.setdefault("deck", deck)
workstation = CoinCellAssemblyWorkstation(**workstation_config)
qiming_params = config.get("qiming", {})
if qiming_params:
workstation.qiming_coin_cell_code(**qiming_params)
if config.get("init", True):
workstation.func_pack_device_init()
if config.get("auto", True):
workstation.func_pack_device_auto()
if config.get("start", True):
workstation.func_pack_device_start()
packaging_config = config.get("packaging", {})
bottle_num = packaging_config.get("bottle_num")
if bottle_num is not None:
workstation.func_pack_send_bottle_num(bottle_num)
allpack_params = packaging_config.get("command", {})
if allpack_params:
workstation.func_allpack_cmd(**allpack_params)
return workstation
if __name__ == "__main__":
# 简单测试
workstation = CoinCellAssemblyWorkstation(deck=CoincellDeck(setup=True, name="coin_cell_deck"))
# workstation.qiming_coin_cell_code(fujipian_panshu=1, fujipian_juzhendianwei=2, gemopanshu=3, gemo_juzhendianwei=4, lvbodian=False, battery_pressure_mode=False, battery_pressure=4200, battery_clean_ignore=False)
# print(f"工作站创建成功: {workstation.deck.name}")
# print(f"料盘数量: {len(workstation.deck.children)}")
workstation.func_pack_device_init()
workstation.func_pack_device_auto()
workstation.func_pack_device_start()
workstation.func_pack_send_bottle_num(16)
workstation.func_allpack_cmd(elec_num=16, elec_use_num=16, elec_vol=50, assembly_type=7, assembly_pressure=4200, file_path="/Users/calvincao/Desktop/work/Uni-Lab-OS-hhm")
workflow_config = {
"deck": {"setup": True, "name": "coin_cell_deck"},
"workstation": {
"address": "172.16.28.102",
"port": "502",
"debug_mode": False,
},
"qiming": {
"fujipian_panshu": 1,
"fujipian_juzhendianwei": 2,
"gemopanshu": 3,
"gemo_juzhendianwei": 4,
"lvbodian": False,
"battery_pressure_mode": False,
"battery_pressure": 4200,
"battery_clean_ignore": False,
},
"packaging": {
"bottle_num": 16,
"command": {
"elec_num": 16,
"elec_use_num": 16,
"elec_vol": 50,
"assembly_type": 7,
"assembly_pressure": 4200,
"file_path": "/Users/calvincao/Desktop/work/Uni-Lab-OS-hhm",
},
},
}
run_coin_cell_packaging_workflow(workflow_config)