mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 04:51:10 +00:00
Compare commits
1 Commits
workstatio
...
e60bf29a7f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e60bf29a7f |
@@ -1168,23 +1168,129 @@ class BioyondCellWorkstation(BioyondWorkstation):
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
lab_registry.setup()
|
||||
deck = BIOYOND_YB_Deck(setup=True)
|
||||
ws = BioyondCellWorkstation(deck=deck)
|
||||
# ws.update_push_ip() #直接修改奔耀端的报送ip地址
|
||||
# ws.create_sample(name="配液瓶", board_type="配液瓶(小)板", bottle_type="配液瓶(小)", location_code="E01")
|
||||
# ws.create_sample(name="分液瓶", board_type="5ml分液瓶板", bottle_type="5ml分液瓶", location_code="D01")
|
||||
def run_bioyond_cell_workflow(config: Dict[str, Any]) -> BioyondCellWorkstation:
|
||||
"""按照统一配置执行奔曜配液与转运工作流。
|
||||
|
||||
# # logger.info(ws.scheduler_stop())
|
||||
# logger.info(ws.scheduler_start())
|
||||
|
||||
# logger.info(ws.auto_feeding4to3()) #搬运物料到3号箱
|
||||
# 使用正斜杠或 Path 对象来指定文件路径
|
||||
# excel_path = Path("/Users/calvincao/Desktop/work/uni-lab-all/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/2025092701.xlsx")
|
||||
# logger.info(ws.create_orders(excel_path))
|
||||
# logger.info(ws.transfer_3_to_2_to_1())
|
||||
# logger.info(ws.transfer_1_to_2())
|
||||
Args:
|
||||
config: 统一的工作流配置。字段示例:
|
||||
{
|
||||
"lab_registry": {"setup": True},
|
||||
"deck": {"setup": True},
|
||||
"workstation": {"config": {...}},
|
||||
"update_push_ip": True,
|
||||
"samples": [
|
||||
{"name": "...", "board_type": "...", "bottle_type": "...", "location_code": "...", "warehouse_name": "..."}
|
||||
],
|
||||
"scheduler": {"start": True, "log": True},
|
||||
"operations": {
|
||||
"auto_feeding4to3": {"enabled": True},
|
||||
"create_orders": {"excel_path": "...", "log": True},
|
||||
"transfer_3_to_2_to_1": {"enabled": True, "log": True},
|
||||
"transfer_1_to_2": {"enabled": True, "log": True}
|
||||
},
|
||||
"keep_alive": False,
|
||||
"keep_alive_interval": 1
|
||||
}
|
||||
|
||||
Returns:
|
||||
执行完毕的 `BioyondCellWorkstation` 实例。
|
||||
"""
|
||||
|
||||
if config.get("lab_registry", {}).get("setup", True):
|
||||
lab_registry.setup()
|
||||
|
||||
deck_config = config.get("deck")
|
||||
if isinstance(deck_config, dict):
|
||||
deck = BIOYOND_YB_Deck(**deck_config)
|
||||
elif deck_config is None:
|
||||
deck = BIOYOND_YB_Deck(setup=True)
|
||||
else:
|
||||
deck = deck_config
|
||||
|
||||
workstation_kwargs = dict(config.get("workstation", {}))
|
||||
if "deck" not in workstation_kwargs:
|
||||
workstation_kwargs["deck"] = deck
|
||||
ws = BioyondCellWorkstation(**workstation_kwargs)
|
||||
|
||||
if config.get("update_push_ip", True):
|
||||
ws.update_push_ip()
|
||||
|
||||
for sample_cfg in config.get("samples", []):
|
||||
ws.create_sample(**sample_cfg)
|
||||
|
||||
scheduler_cfg = config.get("scheduler", {})
|
||||
if scheduler_cfg.get("start", True):
|
||||
result = ws.scheduler_start()
|
||||
if scheduler_cfg.get("log", True):
|
||||
logger.info(result)
|
||||
|
||||
operations_cfg = config.get("operations", {})
|
||||
|
||||
auto_feeding_cfg = operations_cfg.get("auto_feeding4to3", {})
|
||||
if auto_feeding_cfg.get("enabled", True):
|
||||
result = ws.auto_feeding4to3()
|
||||
if auto_feeding_cfg.get("log", True):
|
||||
logger.info(result)
|
||||
|
||||
create_orders_cfg = operations_cfg.get("create_orders")
|
||||
if create_orders_cfg:
|
||||
excel_path = create_orders_cfg.get("excel_path")
|
||||
if not excel_path:
|
||||
raise ValueError("create_orders 需要提供 excel_path。")
|
||||
result = ws.create_orders(Path(excel_path))
|
||||
if create_orders_cfg.get("log", True):
|
||||
logger.info(result)
|
||||
|
||||
transfer_321_cfg = operations_cfg.get("transfer_3_to_2_to_1", {})
|
||||
if transfer_321_cfg.get("enabled", True):
|
||||
result = ws.transfer_3_to_2_to_1()
|
||||
if transfer_321_cfg.get("log", True):
|
||||
logger.info(result)
|
||||
|
||||
transfer_12_cfg = operations_cfg.get("transfer_1_to_2", {})
|
||||
if transfer_12_cfg.get("enabled", True):
|
||||
result = ws.transfer_1_to_2()
|
||||
if transfer_12_cfg.get("log", True):
|
||||
logger.info(result)
|
||||
|
||||
if config.get("keep_alive", False):
|
||||
interval = config.get("keep_alive_interval", 1)
|
||||
while True:
|
||||
time.sleep(interval)
|
||||
|
||||
return ws
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
workflow_config = {
|
||||
"deck": {"setup": True},
|
||||
"update_push_ip": True,
|
||||
"samples": [
|
||||
{
|
||||
"name": "配液瓶",
|
||||
"board_type": "配液瓶(小)板",
|
||||
"bottle_type": "配液瓶(小)",
|
||||
"location_code": "E01",
|
||||
},
|
||||
{
|
||||
"name": "分液瓶",
|
||||
"board_type": "5ml分液瓶板",
|
||||
"bottle_type": "5ml分液瓶",
|
||||
"location_code": "D01",
|
||||
},
|
||||
],
|
||||
"operations": {
|
||||
"auto_feeding4to3": {"enabled": True, "log": True},
|
||||
"create_orders": {
|
||||
"excel_path": "/Users/calvincao/Desktop/work/uni-lab-all/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/2025092701.xlsx",
|
||||
"log": True,
|
||||
},
|
||||
"transfer_3_to_2_to_1": {"enabled": True, "log": True},
|
||||
"transfer_1_to_2": {"enabled": True, "log": True},
|
||||
},
|
||||
"keep_alive": True,
|
||||
}
|
||||
run_bioyond_cell_workflow(workflow_config)
|
||||
|
||||
# 1. location code
|
||||
# 2. 实验文件
|
||||
|
||||
@@ -1202,17 +1202,93 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
|
||||
|
||||
'''
|
||||
|
||||
def run_coin_cell_packaging_workflow(config: Dict[str, Any]) -> CoinCellAssemblyWorkstation:
|
||||
"""根据统一配置顺序执行扣电池装配工作流。
|
||||
|
||||
Args:
|
||||
config: 统一的工作流配置。字段示例:
|
||||
{
|
||||
"deck": {"setup": True, "name": "coin_cell_deck"},
|
||||
"workstation": {"address": "...", "port": "...", "debug_mode": False},
|
||||
"qiming": {...},
|
||||
"init": True,
|
||||
"auto": True,
|
||||
"start": True,
|
||||
"packaging": {
|
||||
"bottle_num": 16,
|
||||
"command": {...}
|
||||
}
|
||||
}
|
||||
|
||||
Returns:
|
||||
执行完毕的 `CoinCellAssemblyWorkstation` 实例。
|
||||
"""
|
||||
|
||||
deck_config = config.get("deck")
|
||||
if isinstance(deck_config, Deck):
|
||||
deck = deck_config
|
||||
elif isinstance(deck_config, dict):
|
||||
deck = CoincellDeck(**deck_config)
|
||||
elif deck_config is None:
|
||||
deck = CoincellDeck(setup=True, name="coin_cell_deck")
|
||||
else:
|
||||
raise ValueError("deck 配置需为 Deck 实例或 dict。")
|
||||
|
||||
workstation_config = dict(config.get("workstation", {}))
|
||||
workstation_config.setdefault("deck", deck)
|
||||
workstation = CoinCellAssemblyWorkstation(**workstation_config)
|
||||
|
||||
qiming_params = config.get("qiming", {})
|
||||
if qiming_params:
|
||||
workstation.qiming_coin_cell_code(**qiming_params)
|
||||
|
||||
if config.get("init", True):
|
||||
workstation.func_pack_device_init()
|
||||
if config.get("auto", True):
|
||||
workstation.func_pack_device_auto()
|
||||
if config.get("start", True):
|
||||
workstation.func_pack_device_start()
|
||||
|
||||
packaging_config = config.get("packaging", {})
|
||||
bottle_num = packaging_config.get("bottle_num")
|
||||
if bottle_num is not None:
|
||||
workstation.func_pack_send_bottle_num(bottle_num)
|
||||
|
||||
allpack_params = packaging_config.get("command", {})
|
||||
if allpack_params:
|
||||
workstation.func_allpack_cmd(**allpack_params)
|
||||
|
||||
return workstation
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 简单测试
|
||||
workstation = CoinCellAssemblyWorkstation(deck=CoincellDeck(setup=True, name="coin_cell_deck"))
|
||||
# workstation.qiming_coin_cell_code(fujipian_panshu=1, fujipian_juzhendianwei=2, gemopanshu=3, gemo_juzhendianwei=4, lvbodian=False, battery_pressure_mode=False, battery_pressure=4200, battery_clean_ignore=False)
|
||||
# print(f"工作站创建成功: {workstation.deck.name}")
|
||||
# print(f"料盘数量: {len(workstation.deck.children)}")
|
||||
workstation.func_pack_device_init()
|
||||
workstation.func_pack_device_auto()
|
||||
workstation.func_pack_device_start()
|
||||
workstation.func_pack_send_bottle_num(16)
|
||||
workstation.func_allpack_cmd(elec_num=16, elec_use_num=16, elec_vol=50, assembly_type=7, assembly_pressure=4200, file_path="/Users/calvincao/Desktop/work/Uni-Lab-OS-hhm")
|
||||
|
||||
workflow_config = {
|
||||
"deck": {"setup": True, "name": "coin_cell_deck"},
|
||||
"workstation": {
|
||||
"address": "172.16.28.102",
|
||||
"port": "502",
|
||||
"debug_mode": False,
|
||||
},
|
||||
"qiming": {
|
||||
"fujipian_panshu": 1,
|
||||
"fujipian_juzhendianwei": 2,
|
||||
"gemopanshu": 3,
|
||||
"gemo_juzhendianwei": 4,
|
||||
"lvbodian": False,
|
||||
"battery_pressure_mode": False,
|
||||
"battery_pressure": 4200,
|
||||
"battery_clean_ignore": False,
|
||||
},
|
||||
"packaging": {
|
||||
"bottle_num": 16,
|
||||
"command": {
|
||||
"elec_num": 16,
|
||||
"elec_use_num": 16,
|
||||
"elec_vol": 50,
|
||||
"assembly_type": 7,
|
||||
"assembly_pressure": 4200,
|
||||
"file_path": "/Users/calvincao/Desktop/work/Uni-Lab-OS-hhm",
|
||||
},
|
||||
},
|
||||
}
|
||||
run_coin_cell_packaging_workflow(workflow_config)
|
||||
Reference in New Issue
Block a user