From 37641c43895a84f13830c1ef2dec32d918d648e1 Mon Sep 17 00:00:00 2001 From: lixinyu1011 <674842481@qq.com> Date: Tue, 21 Oct 2025 14:48:55 +0800 Subject: [PATCH] =?UTF-8?q?xinyu1021=E6=8E=A8=E9=80=81=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bioyond_cell/bioyond_cell_workstation.py | 120 +++++- .../bioyond_material_management.py | 374 ------------------ .../bioyond_cell/bioyond_yihua_YB.json | 8 +- .../workstation/bioyond_studio/bioyond_rpc.py | 6 +- .../workstation/workstation_http_service.py | 6 +- 5 files changed, 123 insertions(+), 391 deletions(-) delete mode 100644 unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_material_management.py diff --git a/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_cell_workstation.py b/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_cell_workstation.py index 8f8f052a..be331254 100644 --- a/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_cell_workstation.py +++ b/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_cell_workstation.py @@ -42,7 +42,7 @@ class BioyondCellWorkstation(WorkstationBase): "api_key": "8A819E5C", "timeout": 30, "report_token": "CHANGE_ME_TOKEN", - "HTTP_host": "172.21.32.90", + "HTTP_host": "172.21.33.126", "HTTP_port": 8080, "debug_mode": False } # report_token :unilab自己的令牌report_token(0928未启用) @@ -51,6 +51,8 @@ class BioyondCellWorkstation(WorkstationBase): deck = kwargs.pop("deck", None) self.device_id = kwargs.pop("device_id", "bioyond_cell_workstation") super().__init__(deck=deck, station_resource=station_resource, *args, **kwargs) + # 步骤通量任务通知铃 + self._pending_events: dict[str, threading.Event] = {} logger.info(f"Bioyond工作站初始化完成 (debug_mode={self.debug_mode})") # 实例化并在后台线程启动 HTTP 报送服务 @@ -62,6 +64,98 @@ class BioyondCellWorkstation(WorkstationBase): except Exception as e: logger.error(f"unilab-HTTP后台线程启动失败: {e}") + # http报送服务 + def process_step_finish_report(self, report_request): + stepId = report_request.data.get("stepId") + logger.info(f"步骤完成: stepId: {stepId}, stepName:{report_request.data.get('stepName')}") + return report_request.data.get('executionStatus') + + def process_sample_finish_report(self, report_request): + logger.info(f"通量完成: {report_request.data.get('sampleId')}") + return {"status": "received"} + + def process_order_finish_report(self, report_request, used_materials=None): + order_code = report_request.data.get("orderCode") + + logger.info(f"任务完成: {order_code}, status={report_request.data.get('status')}") + self._set_pending_event(order_code) + return {"status": "received"} + + def _set_pending_event(self, taskname: Optional[str]) -> None: + if not taskname: + return + event = self._pending_events.get(taskname) + if event is None: + event = threading.Event() + self._pending_events[taskname] = event + event.set() + + def _wait_for_order_completion(self, order_code: Optional[str], timeout: int = 600) -> bool: + if not order_code: + logger.warning("无法等待任务完成:order_code 为空") + return False + event = self._pending_events.get(order_code) + if event is None: + event = threading.Event() + self._pending_events[order_code] = event + elif event.is_set(): + logger.info(f"任务 {order_code} 在等待之前已完成") + self._pending_events.pop(order_code, None) + return True + logger.info(f"等待任务 {order_code} 完成 (timeout={timeout}s)") + finished = event.wait(timeout) + if not finished: + logger.warning(f"等待任务 {order_code} 完成超时({timeout}s)") + self._pending_events.pop(order_code, None) + return finished + + def _wait_for_response_orders(self, response: Dict[str, Any], context: str, timeout: int = 600) -> None: + order_codes = self._extract_order_codes(response) + if not order_codes: + logger.warning(f"{context} 响应中未找到 orderCode,无法跟踪任务完成") + return + for code in order_codes: + self._wait_for_order_completion(code, timeout=timeout) + + @staticmethod + def _extract_order_codes(response: Dict[str, Any]) -> List[str]: + order_codes: List[str] = [] + if not isinstance(response, dict): + return order_codes + data = response.get("data") + keys = ["orderCode", "order_code", "orderId", "order_id"] + if isinstance(data, dict): + for key in keys: + if key in data and data[key]: + order_codes.append(str(data[key])) + if not order_codes and "orders" in data and isinstance(data["orders"], list): + for order in data["orders"]: + if isinstance(order, dict): + for key in keys: + if key in order and order[key]: + order_codes.append(str(order[key])) + elif isinstance(data, list): + for item in data: + if isinstance(item, dict): + for key in keys: + if key in item and item[key]: + order_codes.append(str(item[key])) + elif isinstance(data, str): + if data: + order_codes.append(data) + meta = response.get("orderCode") + if meta: + order_codes.append(str(meta)) + # 去重 + seen = set() + unique_codes: List[str] = [] + for code in order_codes: + if code not in seen: + seen.add(code) + unique_codes.append(code) + return unique_codes + + def _start_http_service(self, host: Optional[str] = None, port: Optional[int] = None) -> None: host = host or self.bioyond_config.get("HTTP_host", "") port = port or self.bioyond_config.get("HTTP_port", ) @@ -261,7 +355,9 @@ class BioyondCellWorkstation(WorkstationBase): logger.warning("没有有效的上料条目,已跳过提交。") return {"code": 0, "message": "no valid items", "data": []} logger.info(items) - return self._post_lims("/api/lims/order/auto-feeding4to3", items) + response = self._post_lims("/api/lims/order/auto-feeding4to3", items) + self._wait_for_response_orders(response, "auto_feeding4to3") + return response @@ -327,7 +423,9 @@ class BioyondCellWorkstation(WorkstationBase): if item["materialId"] or item["materialType"]: items.append(item) - return self._post_lims("/api/lims/order/auto-feeding4to3", items) + response = self._post_lims("/api/lims/order/auto-feeding4to3", items) + self._wait_for_response_orders(response, "auto_feeding4to3_from_xlsx") + return response def auto_batch_outbound_from_xlsx(self, xlsx_path: str) -> Dict[str, Any]: """ @@ -392,7 +490,9 @@ class BioyondCellWorkstation(WorkstationBase): "z": as_int(row[c_z]), }) - return self._post_lims("/api/lims/storage/auto-batch-out-bound", items) + response = self._post_lims("/api/lims/storage/auto-batch-out-bound", items) + self._wait_for_response_orders(response, "auto_batch_outbound_from_xlsx") + return response # 2.14 新建实验 def create_orders(self, xlsx_path: str) -> Dict[str, Any]: @@ -517,6 +617,7 @@ class BioyondCellWorkstation(WorkstationBase): # self.wait_for_transfer_task() # logger.info(f"3-2-1 转运完成,返回结果") # return r321 + self._wait_for_response_orders(response, "create_orders", timeout=1800) return response # 2.7 启动调度 @@ -641,9 +742,18 @@ class BioyondCellWorkstation(WorkstationBase): # -------------------------------- if __name__ == "__main__": ws = BioyondCellWorkstation() - # logger.info(ws.scheduler_start()) + logger.info(ws.scheduler_start()) logger.info(ws.auto_feeding4to3()) + logger.info(ws.create_orders(r"unilabos\devices\workstation\bioyond_studio\bioyond_cell\2025092701.xlsx")) + logger.info(ws.transfer_3_to_2_to_1()) + + logger.info(ws.transfer_1_to_2()) + + + + while True: + time.sleep(1) # re=ws.scheduler_stop() # re = ws.transfer_3_to_2_to_1() diff --git a/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_material_management.py b/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_material_management.py deleted file mode 100644 index 245da15c..00000000 --- a/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_material_management.py +++ /dev/null @@ -1,374 +0,0 @@ -""" -Bioyond物料管理实现 -Bioyond Material Management Implementation - -基于Bioyond系统的物料管理,支持从Bioyond系统同步物料到UniLab工作站 -""" -from typing import Dict, Any, List, Optional, Union -import json -import asyncio -from abc import ABC, abstractmethod - -from pylabrobot.resources import ( - Resource as PLRResource, - Container, - Deck, - Coordinate as PLRCoordinate, -) - -from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker -from unilabos.utils.log import logger -from unilabos.resources.graphio import ( - resource_plr_to_ulab, - resource_ulab_to_plr, - resource_bioyond_to_ulab, - resource_bioyond_container_to_ulab, - resource_ulab_to_bioyond -) -from .workstation_material_management import MaterialManagementBase - - -class BioyondMaterialManagement(MaterialManagementBase): - """Bioyond物料管理类 - - 实现从Bioyond系统同步物料到UniLab工作站的功能: - 1. 从Bioyond系统获取物料数据 - 2. 转换为UniLab格式 - 3. 同步到PyLabRobot Deck - 4. 支持双向同步 - """ - - def __init__( - self, - device_id: str, - deck_config: Dict[str, Any], - resource_tracker: DeviceNodeResourceTracker, - children_config: Dict[str, Dict[str, Any]] = None, - bioyond_config: Dict[str, Any] = None - ): - self.bioyond_config = bioyond_config or {} - self.bioyond_api_client = None - self.sync_interval = self.bioyond_config.get("sync_interval", 30) # 同步间隔(秒) - - # 初始化父类 - super().__init__(device_id, deck_config, resource_tracker, children_config) - - # 初始化Bioyond API客户端 - self._initialize_bioyond_client() - - # 启动同步任务 - self._start_sync_task() - - def _initialize_bioyond_client(self): - """初始化Bioyond API客户端""" - try: - # 这里应该根据实际的Bioyond API实现 - # 暂时使用模拟客户端 - self.bioyond_api_client = BioyondAPIClient(self.bioyond_config) - logger.info(f"Bioyond API客户端初始化成功") - except Exception as e: - logger.error(f"Bioyond API客户端初始化失败: {e}") - self.bioyond_api_client = None - - def _start_sync_task(self): - """启动同步任务""" - if self.bioyond_api_client: - # 创建异步同步任务 - asyncio.create_task(self._periodic_sync()) - logger.info(f"Bioyond同步任务已启动,间隔: {self.sync_interval}秒") - - async def _periodic_sync(self): - """定期同步任务""" - while True: - try: - await self.sync_from_bioyond() - await asyncio.sleep(self.sync_interval) - except Exception as e: - logger.error(f"Bioyond同步任务出错: {e}") - await asyncio.sleep(self.sync_interval) - - async def sync_from_bioyond(self) -> bool: - """从Bioyond系统同步物料""" - try: - if not self.bioyond_api_client: - logger.warning("Bioyond API客户端未初始化") - return False - - # 1. 从Bioyond获取物料数据 - bioyond_data = await self.bioyond_api_client.get_materials() - if not bioyond_data: - logger.warning("从Bioyond获取物料数据为空") - return False - - # 2. 转换为UniLab格式 - if isinstance(bioyond_data, dict) and "data" in bioyond_data: - # 容器格式数据 - unilab_resources = resource_bioyond_container_to_ulab(bioyond_data) - else: - # 物料列表格式数据 - unilab_resources = resource_bioyond_to_ulab(bioyond_data) - - # 3. 转换为PLR格式并分配到Deck - await self._assign_resources_to_deck(unilab_resources) - - logger.info(f"从Bioyond同步了 {len(unilab_resources)} 个资源") - return True - - except Exception as e: - logger.error(f"从Bioyond同步物料失败: {e}") - return False - - async def sync_to_bioyond(self, plr_resource: PLRResource) -> bool: - """将本地物料变更同步到Bioyond系统""" - try: - if not self.bioyond_api_client: - logger.warning("Bioyond API客户端未初始化") - return False - - # 1. 转换为UniLab格式 - unilab_resource = resource_plr_to_ulab(plr_resource) - - # 2. 转换为Bioyond格式 - bioyond_materials = resource_ulab_to_bioyond([unilab_resource]) - - # 3. 发送到Bioyond系统 - success = await self.bioyond_api_client.update_materials(bioyond_materials) - - if success: - logger.info(f"成功同步物料 {plr_resource.name} 到Bioyond") - else: - logger.warning(f"同步物料 {plr_resource.name} 到Bioyond失败") - - return success - - except Exception as e: - logger.error(f"同步物料到Bioyond失败: {e}") - return False - - async def _assign_resources_to_deck(self, unilab_resources: List[Dict[str, Any]]): - """将UniLab资源分配到Deck""" - try: - # 转换为PLR格式 - from unilabos.resources.graphio import list_to_nested_dict - nested_resources = list_to_nested_dict(unilab_resources) - plr_resources = resource_ulab_to_plr(nested_resources) - - # 分配资源到Deck - if hasattr(plr_resources, 'children'): - resources_to_assign = plr_resources.children - elif isinstance(plr_resources, list): - resources_to_assign = plr_resources - else: - resources_to_assign = [plr_resources] - - for resource in resources_to_assign: - try: - # 获取资源位置 - if hasattr(resource, 'location') and resource.location: - location = PLRCoordinate(resource.location.x, resource.location.y, resource.location.z) - else: - location = PLRCoordinate(0, 0, 0) - - # 分配资源到Deck - self.plr_deck.assign_child_resource(resource, location) - - # 注册到resource tracker - self.resource_tracker.add_resource(resource) - - # 保存资源引用 - self.plr_resources[resource.name] = resource - - except Exception as e: - logger.error(f"分配资源 {resource.name} 到Deck失败: {e}") - - logger.info(f"成功分配了 {len(resources_to_assign)} 个资源到Deck") - - except Exception as e: - logger.error(f"分配资源到Deck失败: {e}") - - def _create_resource_by_type( - self, - resource_id: str, - resource_type: str, - config: Dict[str, Any], - data: Dict[str, Any], - location: PLRCoordinate - ) -> Optional[PLRResource]: - """根据类型创建Bioyond相关资源""" - try: - # 这里可以根据需要实现特定的Bioyond资源类型 - # 目前使用通用的容器类型 - if resource_type in ["container", "plate", "well"]: - return self._create_generic_container(resource_id, resource_type, config, data, location) - else: - logger.warning(f"未知的Bioyond资源类型: {resource_type}") - return None - - except Exception as e: - logger.error(f"创建Bioyond资源失败 {resource_id} ({resource_type}): {e}") - return None - - def _create_generic_container( - self, - resource_id: str, - resource_type: str, - config: Dict[str, Any], - data: Dict[str, Any], - location: PLRCoordinate - ) -> Optional[PLRResource]: - """创建通用容器资源""" - try: - from pylabrobot.resources import Plate, Well - - if resource_type == "plate": - return Plate( - name=resource_id, - size_x=config.get("size_x", 127.76), - size_y=config.get("size_y", 85.48), - size_z=config.get("size_z", 14.35), - location=location, - category="plate" - ) - elif resource_type == "well": - return Well( - name=resource_id, - size_x=config.get("size_x", 9.0), - size_y=config.get("size_y", 9.0), - size_z=config.get("size_z", 10.0), - location=location, - category="well" - ) - else: - return Container( - name=resource_id, - size_x=config.get("size_x", 50.0), - size_y=config.get("size_y", 50.0), - size_z=config.get("size_z", 10.0), - location=location, - category="container" - ) - - except Exception as e: - logger.error(f"创建通用容器失败 {resource_id}: {e}") - return None - - def get_bioyond_materials(self) -> List[Dict[str, Any]]: - """获取当前Bioyond物料列表""" - try: - # 将当前PLR资源转换为Bioyond格式 - bioyond_materials = [] - for resource in self.plr_resources.values(): - unilab_resource = resource_plr_to_ulab(resource) - bioyond_materials.extend(resource_ulab_to_bioyond([unilab_resource])) - return bioyond_materials - except Exception as e: - logger.error(f"获取Bioyond物料列表失败: {e}") - return [] - - def update_material_from_bioyond(self, material_id: str, bioyond_data: Dict[str, Any]) -> bool: - """从Bioyond数据更新指定物料""" - try: - # 查找现有物料 - material = self.find_material_by_id(material_id) - if not material: - logger.warning(f"未找到物料: {material_id}") - return False - - # 转换Bioyond数据为UniLab格式 - unilab_resources = resource_bioyond_to_ulab([bioyond_data]) - if not unilab_resources: - logger.warning(f"转换Bioyond数据失败: {material_id}") - return False - - # 更新物料属性 - unilab_resource = unilab_resources[0] - material.name = unilab_resource.get("name", material.name) - - # 更新位置 - position = unilab_resource.get("position", {}) - if position: - material.location = PLRCoordinate( - position.get("x", 0), - position.get("y", 0), - position.get("z", 0) - ) - - logger.info(f"成功更新物料: {material_id}") - return True - - except Exception as e: - logger.error(f"更新物料失败 {material_id}: {e}") - return False - - -class BioyondAPIClient: - """Bioyond API客户端(模拟实现) - - 实际使用时需要根据Bioyond系统的API接口实现 - """ - - def __init__(self, config: Dict[str, Any]): - self.config = config - self.base_url = config.get("base_url", "http://localhost:8080") - self.api_key = config.get("api_key", "") - self.timeout = config.get("timeout", 30) - - async def get_materials(self) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]: - """从Bioyond系统获取物料数据""" - try: - # 这里应该实现实际的API调用 - # 暂时返回模拟数据 - logger.info("从Bioyond API获取物料数据") - - # 模拟API调用延迟 - await asyncio.sleep(0.1) - - # 返回模拟数据(实际应该从API获取) - return { - "data": [], - "code": 1, - "message": "success", - "timestamp": 1234567890 - } - - except Exception as e: - logger.error(f"Bioyond API调用失败: {e}") - return None - - async def update_materials(self, materials: List[Dict[str, Any]]) -> bool: - """更新Bioyond系统中的物料数据""" - try: - # 这里应该实现实际的API调用 - logger.info(f"更新Bioyond系统中的 {len(materials)} 个物料") - - # 模拟API调用延迟 - await asyncio.sleep(0.1) - - # 模拟成功响应 - return True - - except Exception as e: - logger.error(f"更新Bioyond物料失败: {e}") - return False - - async def get_material_by_id(self, material_id: str) -> Optional[Dict[str, Any]]: - """根据ID获取单个物料""" - try: - # 这里应该实现实际的API调用 - logger.info(f"从Bioyond API获取物料: {material_id}") - - # 模拟API调用延迟 - await asyncio.sleep(0.1) - - # 返回模拟数据 - return { - "id": material_id, - "name": f"material_{material_id}", - "type": "container", - "quantity": 1.0, - "unit": "个" - } - - except Exception as e: - logger.error(f"获取Bioyond物料失败 {material_id}: {e}") - return None diff --git a/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_yihua_YB.json b/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_yihua_YB.json index 29f62547..d6a917f2 100644 --- a/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_yihua_YB.json +++ b/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_yihua_YB.json @@ -10,13 +10,9 @@ "class": "bioyond_cell", "config": { "protocol_type": [], - "station_resource": {}, + "station_resource": {} + - "bioyond_config": { - "api_key": "8A819E5C", - "api_host": "http://192.168.1.200:44388", - "debug_mode": false - } }, "data": {} }, diff --git a/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py b/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py index 45d0cadb..5aef45ba 100644 --- a/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py +++ b/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py @@ -47,8 +47,8 @@ class BioyondV1RPC(BaseRequest): super().__init__() print("开始初始化 BioyondV1RPC") self.config = config - self.api_key = config["api_key"] - self.host = config["api_host"] + self.api_key = config["8A819E5C"] + self.host = config["http://172.16.11.219:44388"] self._logger = SimpleLogger() self.material_cache = {} self._load_material_cache() @@ -61,7 +61,7 @@ class BioyondV1RPC(BaseRequest): :return: 当前时间的 ISO 8601 格式字符串 """ - current_time = datetime.now(timezone.utc).isoformat( + current_time = datetime.now().isoformat( timespec='milliseconds' ) # 替换时区部分为 'Z' diff --git a/unilabos/devices/workstation/workstation_http_service.py b/unilabos/devices/workstation/workstation_http_service.py index 27f869c0..12eb9262 100644 --- a/unilabos/devices/workstation/workstation_http_service.py +++ b/unilabos/devices/workstation/workstation_http_service.py @@ -71,11 +71,11 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler): if content_length > 0: post_data = self.rfile.read(content_length) request_data = json.loads(post_data.decode('utf-8')) + else: request_data = {} - - logger.info(f"收到工作站报送: {endpoint} - {request_data.get('token', 'unknown')}") - + logger.info(f"收到工作站报送: {endpoint} 收到接受数据:{request_data}") + # logger.info(f"收到的json数据: {request_data}") # 统一的报送端点路由(基于LIMS协议规范) if endpoint == '/report/step_finish': response = self._handle_step_finish_report(request_data)