From beaa1d72133e1e1af076d154c96271b52253db9b Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Thu, 25 Dec 2025 09:58:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E4=BA=8B=E4=BB=B6=E5=8F=91=E5=B8=83=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E7=9B=91=E6=8E=A7=E5=B9=B6=E6=8A=A5=E5=91=8A?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=BF=90=E8=A1=8C=E3=80=81=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E3=80=81=E5=AE=8C=E6=88=90=E5=92=8C=E9=94=99=E8=AF=AF=E7=8A=B6?= =?UTF-8?q?=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bioyond_studio/reaction_station.py | 36 ++++ .../workstation/bioyond_studio/station.py | 188 +++++++++++++++++- 2 files changed, 223 insertions(+), 1 deletion(-) diff --git a/unilabos/devices/workstation/bioyond_studio/reaction_station.py b/unilabos/devices/workstation/bioyond_studio/reaction_station.py index 5fc6a46..6ea76df 100644 --- a/unilabos/devices/workstation/bioyond_studio/reaction_station.py +++ b/unilabos/devices/workstation/bioyond_studio/reaction_station.py @@ -837,6 +837,17 @@ class BioyondReactionStation(BioyondWorkstation): raise ValueError("order_codes与order_ids数量不匹配") total = len(order_codes) pending = {c: {"order_id": order_ids[i], "completed": False} for i, c in enumerate(order_codes)} + + # 发布初始状态事件 + for i, oc in enumerate(order_codes): + self._publish_task_status( + task_id=order_ids[i], + task_code=oc, + task_type="bioyond_workflow", + status="running", + progress=0.0 + ) + reports = [] start_time = time.time() while pending: @@ -852,6 +863,14 @@ class BioyondReactionStation(BioyondWorkstation): "extracted": None, "elapsed_time": elapsed_time }) + # 发布超时事件 + self._publish_task_status( + task_id=pending[oc]["order_id"], + task_code=oc, + task_type="bioyond_workflow", + status="timeout", + result={"elapsed_time": elapsed_time} + ) break completed_round = [] for oc in list(pending.keys()): @@ -874,6 +893,15 @@ class BioyondReactionStation(BioyondWorkstation): "extracted": self._extract_actuals_from_report(rep), "elapsed_time": elapsed_time }) + # 发布完成事件 + self._publish_task_status( + task_id=oid, + task_code=oc, + task_type="bioyond_workflow", + status="completed", + progress=1.0, + result=rep + ) completed_round.append(oc) del self.order_completion_status[oc] except Exception as e: @@ -887,6 +915,14 @@ class BioyondReactionStation(BioyondWorkstation): "error": str(e), "elapsed_time": elapsed_time }) + # 发布错误事件 + self._publish_task_status( + task_id=oid, + task_code=oc, + task_type="bioyond_workflow", + status="error", + result={"error": str(e)} + ) completed_round.append(oc) for oc in completed_round: del pending[oc] diff --git a/unilabos/devices/workstation/bioyond_studio/station.py b/unilabos/devices/workstation/bioyond_studio/station.py index 2800e5b..5025f79 100644 --- a/unilabos/devices/workstation/bioyond_studio/station.py +++ b/unilabos/devices/workstation/bioyond_studio/station.py @@ -6,6 +6,7 @@ Bioyond Workstation Implementation """ import time import traceback +import threading from datetime import datetime from typing import Dict, Any, List, Optional, Union import json @@ -29,6 +30,90 @@ from unilabos.devices.workstation.bioyond_studio.config import ( from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService +class ConnectionMonitor: + """Bioyond连接监控器""" + def __init__(self, workstation, check_interval=30): + self.workstation = workstation + self.check_interval = check_interval + self._running = False + self._thread = None + self._last_status = "unknown" + + def start(self): + if self._running: + return + self._running = True + self._thread = threading.Thread(target=self._monitor_loop, daemon=True, name="BioyondConnectionMonitor") + self._thread.start() + logger.info("Bioyond连接监控器已启动") + + def stop(self): + self._running = False + if self._thread: + self._thread.join(timeout=2) + logger.info("Bioyond连接监控器已停止") + + def _monitor_loop(self): + while self._running: + try: + # 使用 lightweight API 检查连接 + # query_matial_type_list 是比较快的查询 + start_time = time.time() + result = self.workstation.hardware_interface.material_type_list() + + status = "online" if result else "offline" + msg = "Connection established" if status == "online" else "Failed to get material type list" + + if status != self._last_status: + logger.info(f"Bioyond连接状态变更: {self._last_status} -> {status}") + self._publish_event(status, msg) + self._last_status = status + + # 发布心跳 (可选,或者只在状态变更时发布) + # self._publish_event(status, msg) + + except Exception as e: + logger.error(f"Bioyond连接检查异常: {e}") + if self._last_status != "error": + self._publish_event("error", str(e)) + self._last_status = "error" + + time.sleep(self.check_interval) + + def _publish_event(self, status, message): + try: + if hasattr(self.workstation, "_ros_node") and self.workstation._ros_node: + event_data = { + "status": status, + "message": message, + "timestamp": datetime.now().isoformat() + } + + # 动态发布消息,需要在 ROS2DeviceNode 中有对应支持 + # 这里假设通用事件发布机制,使用 String 类型的 topic + # 话题: //events/device_status + ns = self.workstation._ros_node.namespace + topic = f"{ns}/events/device_status" + + # 使用 ROS2DeviceNode 的发布功能 + # 如果没有预定义的 publisher,需要动态创建 + # 注意:workstation base node 可能没有自动创建 arbitrary publishers 的机制 + # 这里我们先尝试用 String json 发布 + + # 在 ROS2DeviceNode 中通常需要先 create_publisher + # 为了简单起见,我们检查是否已有 publisher,没有则创建 + if not hasattr(self.workstation, "_device_status_pub"): + self.workstation._device_status_pub = self.workstation._ros_node.create_publisher( + String, topic, 10 + ) + + self.workstation._device_status_pub.publish( + convert_to_ros_msg(String, json.dumps(event_data, ensure_ascii=False)) + ) + except Exception as e: + logger.error(f"发布设备状态事件失败: {e}") + + class BioyondResourceSynchronizer(ResourceSynchronizer): """Bioyond资源同步器 @@ -594,6 +679,44 @@ class BioyondWorkstation(WorkstationBase): 集成Bioyond物料管理的工作站实现 """ + def _publish_task_status( + self, + task_id: str, + task_type: str, + status: str, + result: dict = None, + progress: float = 0.0, + task_code: str = None + ): + """发布任务状态事件""" + try: + if not getattr(self, "_ros_node", None): + return + + event_data = { + "task_id": task_id, + "task_code": task_code, + "task_type": task_type, + "status": status, + "progress": progress, + "timestamp": datetime.now().isoformat() + } + if result: + event_data["result"] = result + + topic = f"{self._ros_node.namespace}/events/task_status" + + if not hasattr(self, "_task_status_pub"): + self._task_status_pub = self._ros_node.create_publisher( + String, topic, 10 + ) + + self._task_status_pub.publish( + convert_to_ros_msg(String, json.dumps(event_data, ensure_ascii=False)) + ) + except Exception as e: + logger.error(f"发布任务状态事件失败: {e}") + def __init__( self, bioyond_config: Optional[Dict[str, Any]] = None, @@ -642,13 +765,16 @@ class BioyondWorkstation(WorkstationBase): "host": bioyond_config.get("http_service_host", HTTP_SERVICE_CONFIG["http_service_host"]), "port": bioyond_config.get("http_service_port", HTTP_SERVICE_CONFIG["http_service_port"]) } - self.http_service = None # 将在 post_init 中启动 + self.http_service = None # 将在 post_init 启动 + self.connection_monitor = None # 将在 post_init 启动 logger.info(f"Bioyond工作站初始化完成") def __del__(self): """析构函数:清理资源,停止 HTTP 服务""" try: + if hasattr(self, 'connection_monitor') and self.connection_monitor: + self.connection_monitor.stop() if hasattr(self, 'http_service') and self.http_service is not None: logger.info("正在停止 HTTP 报送服务...") self.http_service.stop() @@ -658,6 +784,13 @@ class BioyondWorkstation(WorkstationBase): def post_init(self, ros_node: ROS2WorkstationNode): self._ros_node = ros_node + # 启动连接监控 + try: + self.connection_monitor = ConnectionMonitor(self) + self.connection_monitor.start() + except Exception as e: + logger.error(f"启动连接监控失败: {e}") + # 启动 HTTP 报送接收服务(现在 device_id 已可用) if hasattr(self, '_http_service_config'): try: @@ -1233,6 +1366,22 @@ class BioyondWorkstation(WorkstationBase): # TODO: 根据实际业务需求处理步骤完成逻辑 # 例如:更新数据库、触发后续流程等 + # 发布任务状态事件 (running/progress update) + self._publish_task_status( + task_id=data.get('orderCode'), # 使用 OrderCode 作为关联 ID + task_code=data.get('orderCode'), + task_type="bioyond_step", + status="running", + progress=0.5, # 步骤完成视为任务进行中 + result={"step_name": data.get('stepName'), "step_id": data.get('stepId')} + ) + + # 更新物料信息 + # 步骤完成后,物料状态可能发生变化(如位置、用量等),触发同步 + logger.info(f"[步骤完成报送] 触发物料同步...") + self.resource_synchronizer.sync_from_external() + + return { "processed": True, "step_id": data.get('stepId'), @@ -1267,6 +1416,17 @@ class BioyondWorkstation(WorkstationBase): # TODO: 根据实际业务需求处理通量完成逻辑 + # 发布任务状态事件 + self._publish_task_status( + task_id=data.get('orderCode'), + task_code=data.get('orderCode'), + task_type="bioyond_sample", + status="running", + progress=0.7, + result={"sample_id": data.get('sampleId'), "status": status_desc} + ) + + return { "processed": True, "sample_id": data.get('sampleId'), @@ -1306,6 +1466,32 @@ class BioyondWorkstation(WorkstationBase): # TODO: 根据实际业务需求处理任务完成逻辑 # 例如:更新物料库存、生成报表等 + # 映射状态到事件状态 + event_status = "completed" + if str(data.get('status')) in ["-11", "-12"]: + event_status = "error" + elif str(data.get('status')) == "30": + event_status = "completed" + else: + event_status = "running" # 其他状态视为运行中(或根据实际定义) + + # 发布任务状态事件 + self._publish_task_status( + task_id=data.get('orderCode'), + task_code=data.get('orderCode'), + task_type="bioyond_order", + status=event_status, + progress=1.0 if event_status in ["completed", "error"] else 0.9, + result={"order_name": data.get('orderName'), "status": status_desc, "materials_count": len(used_materials)} + ) + + # 更新物料信息 + # 任务完成后,且状态为完成时,触发同步以更新最终物料状态 + if event_status == "completed": + logger.info(f"[任务完成报送] 触发物料同步...") + self.resource_synchronizer.sync_from_external() + + return { "processed": True, "order_code": data.get('orderCode'),