mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-18 21:41:16 +00:00
feat: enhance BioyondCellWorkstation and CoinCellAssembly workflows
- Added support for transferring resources between workstations with detailed logging. - Introduced new methods for material conversion and resource registration. - Updated YAML configurations to reflect new parameters and structures for workflows. - Enhanced error handling and logging for better debugging and operational clarity.
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from cgi import print_arguments
|
||||
from doctest import debug
|
||||
from typing import Dict, Any, List, Optional
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
import requests
|
||||
from pylabrobot.resources.resource import Resource as ResourcePLR
|
||||
from pathlib import Path
|
||||
@@ -22,6 +22,7 @@ from unilabos.resources.bioyond.decks import BIOYOND_YB_Deck
|
||||
from unilabos.resources.graphio import resource_bioyond_to_plr
|
||||
from unilabos.utils.log import logger
|
||||
from unilabos.registry.registry import lab_registry
|
||||
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
|
||||
|
||||
def _iso_local_now_ms() -> str:
|
||||
# 文档要求:到毫秒 + Z,例如 2025-08-15T05:43:22.814Z
|
||||
@@ -41,12 +42,14 @@ class BioyondCellWorkstation(BioyondWorkstation):
|
||||
def __init__(self, config: dict = None, deck=None, protocol_type=None, **kwargs):
|
||||
|
||||
# 使用统一配置,支持自定义覆盖, 从 config.py 加载完整配置
|
||||
self.bioyond_config ={
|
||||
self.bioyond_config = {
|
||||
**API_CONFIG,
|
||||
"material_type_mappings": MATERIAL_TYPE_MAPPINGS,
|
||||
"warehouse_mapping": WAREHOUSE_MAPPING,
|
||||
"debug_mode": False
|
||||
}
|
||||
"debug_mode": False,
|
||||
}
|
||||
if config:
|
||||
self.bioyond_config.update(config)
|
||||
|
||||
# "material_type_mappings": MATERIAL_TYPE_MAPPINGS
|
||||
# "warehouse_mapping": WAREHOUSE_MAPPING
|
||||
@@ -57,6 +60,12 @@ class BioyondCellWorkstation(BioyondWorkstation):
|
||||
self.http_service_started = self.debug_mode
|
||||
self._device_id = "bioyond_cell_workstation" # 默认值,后续会从_ros_node获取
|
||||
super().__init__(bioyond_config=config, deck=deck)
|
||||
self.transfer_target_device_id = self.bioyond_config.get("transfer_target_device_id", "BatteryStation")
|
||||
self.transfer_target_parent = self.bioyond_config.get("transfer_target_parent", "YB_YH_Deck")
|
||||
self.transfer_timeout = float(self.bioyond_config.get("transfer_timeout", 180.0))
|
||||
self.coin_cell_workflow_config = self.bioyond_config.get("coin_cell_workflow_config", {})
|
||||
self.pending_transfer_materials: List[Dict[str, Any]] = []
|
||||
self.pending_transfer_plr: List[ResourcePLR] = []
|
||||
self.update_push_ip() #直接修改奔耀端的报送ip地址
|
||||
logger.info("已更新奔耀端推送 IP 地址")
|
||||
|
||||
@@ -472,7 +481,7 @@ class BioyondCellWorkstation(BioyondWorkstation):
|
||||
return response
|
||||
|
||||
# 2.14 新建实验
|
||||
def create_orders(self, xlsx_path: str) -> Dict[str, Any]:
|
||||
def create_orders(self, xlsx_path: str, *, material_filter: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
从 Excel 解析并创建实验(2.14)
|
||||
约定:
|
||||
@@ -629,7 +638,33 @@ class BioyondCellWorkstation(BioyondWorkstation):
|
||||
return response
|
||||
# 等待完成报送
|
||||
result = self.wait_for_order_finish(order_code)
|
||||
return result
|
||||
report_data = result.get("report") if isinstance(result, dict) else None
|
||||
materials_from_report = (
|
||||
report_data.get("usedMaterials") if isinstance(report_data, dict) else None
|
||||
)
|
||||
if materials_from_report:
|
||||
materials = materials_from_report
|
||||
logger.info(
|
||||
"[create_orders] 使用订单完成报送中的物料信息: "
|
||||
f"{len(materials)} 条"
|
||||
)
|
||||
else:
|
||||
materials = self._fetch_bioyond_materials(filter_keyword=material_filter)
|
||||
logger.info(
|
||||
"[create_orders] 未收到订单报送物料信息,回退到实时查询"
|
||||
)
|
||||
print("materials_from_report:", materials_from_report)
|
||||
self.transfer_resource_to_another(
|
||||
plr_resources=[materials],
|
||||
target_device_id="BatteryStation",
|
||||
target_resources=["YB_YH_Deck"],
|
||||
sites=[None]
|
||||
)
|
||||
return {
|
||||
"api_response": response,
|
||||
"order_finish": result,
|
||||
"materials": materials,
|
||||
}
|
||||
|
||||
# 2.7 启动调度
|
||||
def scheduler_start(self) -> Dict[str, Any]:
|
||||
@@ -701,6 +736,7 @@ class BioyondCellWorkstation(BioyondWorkstation):
|
||||
return response
|
||||
# 等待完成报送
|
||||
result = self.wait_for_order_finish(order_code)
|
||||
|
||||
return result
|
||||
|
||||
# 2.5 批量查询实验报告(post过滤关键字查询)
|
||||
@@ -1198,6 +1234,108 @@ class BioyondCellWorkstation(BioyondWorkstation):
|
||||
|
||||
return raw_materials
|
||||
|
||||
def _convert_materials_to_plr(self, materials: List[Dict[str, Any]]) -> List[ResourcePLR]:
|
||||
try:
|
||||
return resource_bioyond_to_plr(
|
||||
deepcopy(materials),
|
||||
type_mapping=self.bioyond_config.get("material_type_mappings", MATERIAL_TYPE_MAPPINGS),
|
||||
deck=self.deck,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(f"物料转换为 PLR 失败: {exc}", exc_info=True)
|
||||
return []
|
||||
|
||||
def _wait_for_future(self, future, stage: str, timeout: Optional[float] = None):
|
||||
if future is None:
|
||||
return None
|
||||
timeout = timeout or self.transfer_timeout
|
||||
start = time.time()
|
||||
while not future.done():
|
||||
if (time.time() - start) > timeout:
|
||||
raise TimeoutError(f"{stage} 超时 {timeout}s")
|
||||
time.sleep(0.05)
|
||||
return future.result()
|
||||
|
||||
def _register_plr_resources(self, resources: List[ResourcePLR]) -> None:
|
||||
if not resources or not hasattr(self, "_ros_node") or self._ros_node is None:
|
||||
return
|
||||
future = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, resources=resources)
|
||||
self._wait_for_future(future, "update_resource")
|
||||
|
||||
def _get_target_resource(self, name: str) -> ResourcePLR:
|
||||
if not hasattr(self, "_ros_node") or self._ros_node is None:
|
||||
raise RuntimeError("ROS 节点未初始化,无法获取资源")
|
||||
resource = self._ros_node.resource_tracker.figure_resource({"name": name}, try_mode=False) # type: ignore
|
||||
if resource is None:
|
||||
raise ValueError(f"未找到目标资源: {name}")
|
||||
return resource
|
||||
|
||||
def _allocate_sites(self, parent_resource: ResourcePLR, count: int) -> List[str]:
|
||||
if not hasattr(parent_resource, "get_free_sites"):
|
||||
raise ValueError(f"资源 {parent_resource} 不支持自动分配站位")
|
||||
free_indices = list(parent_resource.get_free_sites())
|
||||
if len(free_indices) < count:
|
||||
raise ValueError(f"{parent_resource.name} 可用站位不足 (need {count}, have {len(free_indices)})")
|
||||
ordering = list(getattr(parent_resource, "_ordering", {}).keys())
|
||||
sites: List[str] = []
|
||||
for idx in free_indices[:count]:
|
||||
if ordering and idx < len(ordering):
|
||||
sites.append(ordering[idx])
|
||||
else:
|
||||
sites.append(str(idx))
|
||||
return sites
|
||||
|
||||
def _invoke_coin_cell_workflow(self, material_payload: List[Dict[str, Any]]) -> Any:
|
||||
timeout = float(self.bioyond_config.get("coin_cell_workflow_timeout", 300.0))
|
||||
workflow_payload: Dict[str, Any] = {}
|
||||
if isinstance(self.coin_cell_workflow_config, dict):
|
||||
workflow_payload.update(deepcopy(self.coin_cell_workflow_config))
|
||||
workflow_payload["materials"] = deepcopy(material_payload)
|
||||
return self._call_remote_device_method(
|
||||
self.transfer_target_device_id,
|
||||
"run_coin_cell_assembly_workflow",
|
||||
timeout=timeout,
|
||||
workflow_config=workflow_payload,
|
||||
)
|
||||
|
||||
def _call_remote_device_method(
|
||||
self,
|
||||
device_id: str,
|
||||
method: str,
|
||||
*,
|
||||
timeout: Optional[float] = None,
|
||||
**kwargs,
|
||||
) -> Any:
|
||||
if not hasattr(self, "_ros_node") or self._ros_node is None:
|
||||
raise RuntimeError("ROS 节点未初始化,无法调用远程设备")
|
||||
if not device_id:
|
||||
raise ValueError("device_id 不能为空")
|
||||
if not method:
|
||||
raise ValueError("method 不能为空")
|
||||
|
||||
timeout = timeout or self.transfer_timeout
|
||||
payload = json.dumps(
|
||||
{
|
||||
"function_name": method,
|
||||
"function_args": kwargs,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
future = ROS2DeviceNode.run_async_func(
|
||||
self._ros_node.execute_single_action,
|
||||
True,
|
||||
device_id=device_id,
|
||||
action_name="_execute_driver_command_async",
|
||||
action_kwargs={"string": payload},
|
||||
)
|
||||
result = self._wait_for_future(future, f"{device_id}.{method}", timeout)
|
||||
if hasattr(result, "return_info"):
|
||||
try:
|
||||
return json.loads(result.return_info)
|
||||
except Exception:
|
||||
return result.return_info
|
||||
return result
|
||||
|
||||
def run_feeding_stage(self) -> Dict[str, List[Dict[str, Any]]]:
|
||||
self.create_sample(
|
||||
board_type="配液瓶(小)板",
|
||||
@@ -1240,16 +1378,38 @@ class BioyondCellWorkstation(BioyondWorkstation):
|
||||
self,
|
||||
liquid_materials: Optional[List[Dict[str, Any]]] = None,
|
||||
) -> Dict[str, List[Dict[str, Any]]]:
|
||||
self.transfer_3_to_2_to_1(
|
||||
source_wh_id="3a19debc-84b4-0359-e2d4-b3beea49348b",
|
||||
source_x=1,
|
||||
source_y=1,
|
||||
source_z=1,
|
||||
)
|
||||
transfer_summary: Dict[str, Any] = {}
|
||||
try:
|
||||
source_materials = liquid_materials or self._fetch_bioyond_materials()
|
||||
transfer_plr = self._convert_materials_to_plr(source_materials)
|
||||
transfer_summary["plr_count"] = len(transfer_plr)
|
||||
|
||||
if transfer_plr:
|
||||
self._register_plr_resources(transfer_plr)
|
||||
target_parent = self._get_target_resource(self.transfer_target_parent)
|
||||
sites = self._allocate_sites(target_parent, len(transfer_plr))
|
||||
future = ROS2DeviceNode.run_async_func(
|
||||
self._ros_node.transfer_resource_to_another, # type: ignore[arg-type]
|
||||
True,
|
||||
plr_resources=transfer_plr,
|
||||
target_device_id=self.transfer_target_device_id,
|
||||
target_resources=[target_parent] * len(transfer_plr),
|
||||
sites=sites,
|
||||
)
|
||||
self._wait_for_future(future, "transfer_resource_to_another")
|
||||
transfer_summary["sites"] = sites
|
||||
|
||||
coin_cell_result = self._invoke_coin_cell_workflow(source_materials)
|
||||
transfer_summary["coin_cell_result"] = coin_cell_result
|
||||
except Exception as exc:
|
||||
transfer_summary["error"] = str(exc)
|
||||
logger.error(f"跨工站转运失败: {exc}", exc_info=True)
|
||||
|
||||
transfer_materials = self._fetch_bioyond_materials()
|
||||
return {
|
||||
"liquid_materials": liquid_materials or [],
|
||||
"transfer_materials": transfer_materials,
|
||||
"transfer_summary": transfer_summary,
|
||||
}
|
||||
if __name__ == "__main__":
|
||||
deck = BIOYOND_YB_Deck(setup=True)
|
||||
|
||||
Reference in New Issue
Block a user