feat: 添加任务状态事件发布功能,监控并报告任务运行、超时、完成和错误状态

This commit is contained in:
ZiWei
2025-12-25 09:58:03 +08:00
parent 1e5f6b0c04
commit beaa1d7213
2 changed files with 223 additions and 1 deletions

View File

@@ -837,6 +837,17 @@ class BioyondReactionStation(BioyondWorkstation):
raise ValueError("order_codes与order_ids数量不匹配") raise ValueError("order_codes与order_ids数量不匹配")
total = len(order_codes) total = len(order_codes)
pending = {c: {"order_id": order_ids[i], "completed": False} for i, c in enumerate(order_codes)} pending = {c: {"order_id": order_ids[i], "completed": False} for i, c in enumerate(order_codes)}
# 发布初始状态事件
for i, oc in enumerate(order_codes):
self._publish_task_status(
task_id=order_ids[i],
task_code=oc,
task_type="bioyond_workflow",
status="running",
progress=0.0
)
reports = [] reports = []
start_time = time.time() start_time = time.time()
while pending: while pending:
@@ -852,6 +863,14 @@ class BioyondReactionStation(BioyondWorkstation):
"extracted": None, "extracted": None,
"elapsed_time": elapsed_time "elapsed_time": elapsed_time
}) })
# 发布超时事件
self._publish_task_status(
task_id=pending[oc]["order_id"],
task_code=oc,
task_type="bioyond_workflow",
status="timeout",
result={"elapsed_time": elapsed_time}
)
break break
completed_round = [] completed_round = []
for oc in list(pending.keys()): for oc in list(pending.keys()):
@@ -874,6 +893,15 @@ class BioyondReactionStation(BioyondWorkstation):
"extracted": self._extract_actuals_from_report(rep), "extracted": self._extract_actuals_from_report(rep),
"elapsed_time": elapsed_time "elapsed_time": elapsed_time
}) })
# 发布完成事件
self._publish_task_status(
task_id=oid,
task_code=oc,
task_type="bioyond_workflow",
status="completed",
progress=1.0,
result=rep
)
completed_round.append(oc) completed_round.append(oc)
del self.order_completion_status[oc] del self.order_completion_status[oc]
except Exception as e: except Exception as e:
@@ -887,6 +915,14 @@ class BioyondReactionStation(BioyondWorkstation):
"error": str(e), "error": str(e),
"elapsed_time": elapsed_time "elapsed_time": elapsed_time
}) })
# 发布错误事件
self._publish_task_status(
task_id=oid,
task_code=oc,
task_type="bioyond_workflow",
status="error",
result={"error": str(e)}
)
completed_round.append(oc) completed_round.append(oc)
for oc in completed_round: for oc in completed_round:
del pending[oc] del pending[oc]

View File

@@ -6,6 +6,7 @@ Bioyond Workstation Implementation
""" """
import time import time
import traceback import traceback
import threading
from datetime import datetime from datetime import datetime
from typing import Dict, Any, List, Optional, Union from typing import Dict, Any, List, Optional, Union
import json import json
@@ -29,6 +30,90 @@ from unilabos.devices.workstation.bioyond_studio.config import (
from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService
class ConnectionMonitor:
"""Bioyond连接监控器"""
def __init__(self, workstation, check_interval=30):
self.workstation = workstation
self.check_interval = check_interval
self._running = False
self._thread = None
self._last_status = "unknown"
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._monitor_loop, daemon=True, name="BioyondConnectionMonitor")
self._thread.start()
logger.info("Bioyond连接监控器已启动")
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=2)
logger.info("Bioyond连接监控器已停止")
def _monitor_loop(self):
while self._running:
try:
# 使用 lightweight API 检查连接
# query_matial_type_list 是比较快的查询
start_time = time.time()
result = self.workstation.hardware_interface.material_type_list()
status = "online" if result else "offline"
msg = "Connection established" if status == "online" else "Failed to get material type list"
if status != self._last_status:
logger.info(f"Bioyond连接状态变更: {self._last_status} -> {status}")
self._publish_event(status, msg)
self._last_status = status
# 发布心跳 (可选,或者只在状态变更时发布)
# self._publish_event(status, msg)
except Exception as e:
logger.error(f"Bioyond连接检查异常: {e}")
if self._last_status != "error":
self._publish_event("error", str(e))
self._last_status = "error"
time.sleep(self.check_interval)
def _publish_event(self, status, message):
try:
if hasattr(self.workstation, "_ros_node") and self.workstation._ros_node:
event_data = {
"status": status,
"message": message,
"timestamp": datetime.now().isoformat()
}
# 动态发布消息,需要在 ROS2DeviceNode 中有对应支持
# 这里假设通用事件发布机制,使用 String 类型的 topic
# 话题: /<namespace>/events/device_status
ns = self.workstation._ros_node.namespace
topic = f"{ns}/events/device_status"
# 使用 ROS2DeviceNode 的发布功能
# 如果没有预定义的 publisher需要动态创建
# 注意workstation base node 可能没有自动创建 arbitrary publishers 的机制
# 这里我们先尝试用 String json 发布
# 在 ROS2DeviceNode 中通常需要先 create_publisher
# 为了简单起见,我们检查是否已有 publisher没有则创建
if not hasattr(self.workstation, "_device_status_pub"):
self.workstation._device_status_pub = self.workstation._ros_node.create_publisher(
String, topic, 10
)
self.workstation._device_status_pub.publish(
convert_to_ros_msg(String, json.dumps(event_data, ensure_ascii=False))
)
except Exception as e:
logger.error(f"发布设备状态事件失败: {e}")
class BioyondResourceSynchronizer(ResourceSynchronizer): class BioyondResourceSynchronizer(ResourceSynchronizer):
"""Bioyond资源同步器 """Bioyond资源同步器
@@ -594,6 +679,44 @@ class BioyondWorkstation(WorkstationBase):
集成Bioyond物料管理的工作站实现 集成Bioyond物料管理的工作站实现
""" """
def _publish_task_status(
self,
task_id: str,
task_type: str,
status: str,
result: dict = None,
progress: float = 0.0,
task_code: str = None
):
"""发布任务状态事件"""
try:
if not getattr(self, "_ros_node", None):
return
event_data = {
"task_id": task_id,
"task_code": task_code,
"task_type": task_type,
"status": status,
"progress": progress,
"timestamp": datetime.now().isoformat()
}
if result:
event_data["result"] = result
topic = f"{self._ros_node.namespace}/events/task_status"
if not hasattr(self, "_task_status_pub"):
self._task_status_pub = self._ros_node.create_publisher(
String, topic, 10
)
self._task_status_pub.publish(
convert_to_ros_msg(String, json.dumps(event_data, ensure_ascii=False))
)
except Exception as e:
logger.error(f"发布任务状态事件失败: {e}")
def __init__( def __init__(
self, self,
bioyond_config: Optional[Dict[str, Any]] = None, bioyond_config: Optional[Dict[str, Any]] = None,
@@ -642,13 +765,16 @@ class BioyondWorkstation(WorkstationBase):
"host": bioyond_config.get("http_service_host", HTTP_SERVICE_CONFIG["http_service_host"]), "host": bioyond_config.get("http_service_host", HTTP_SERVICE_CONFIG["http_service_host"]),
"port": bioyond_config.get("http_service_port", HTTP_SERVICE_CONFIG["http_service_port"]) "port": bioyond_config.get("http_service_port", HTTP_SERVICE_CONFIG["http_service_port"])
} }
self.http_service = None # 将在 post_init 启动 self.http_service = None # 将在 post_init 启动
self.connection_monitor = None # 将在 post_init 启动
logger.info(f"Bioyond工作站初始化完成") logger.info(f"Bioyond工作站初始化完成")
def __del__(self): def __del__(self):
"""析构函数:清理资源,停止 HTTP 服务""" """析构函数:清理资源,停止 HTTP 服务"""
try: try:
if hasattr(self, 'connection_monitor') and self.connection_monitor:
self.connection_monitor.stop()
if hasattr(self, 'http_service') and self.http_service is not None: if hasattr(self, 'http_service') and self.http_service is not None:
logger.info("正在停止 HTTP 报送服务...") logger.info("正在停止 HTTP 报送服务...")
self.http_service.stop() self.http_service.stop()
@@ -658,6 +784,13 @@ class BioyondWorkstation(WorkstationBase):
def post_init(self, ros_node: ROS2WorkstationNode): def post_init(self, ros_node: ROS2WorkstationNode):
self._ros_node = ros_node self._ros_node = ros_node
# 启动连接监控
try:
self.connection_monitor = ConnectionMonitor(self)
self.connection_monitor.start()
except Exception as e:
logger.error(f"启动连接监控失败: {e}")
# 启动 HTTP 报送接收服务(现在 device_id 已可用) # 启动 HTTP 报送接收服务(现在 device_id 已可用)
if hasattr(self, '_http_service_config'): if hasattr(self, '_http_service_config'):
try: try:
@@ -1233,6 +1366,22 @@ class BioyondWorkstation(WorkstationBase):
# TODO: 根据实际业务需求处理步骤完成逻辑 # TODO: 根据实际业务需求处理步骤完成逻辑
# 例如:更新数据库、触发后续流程等 # 例如:更新数据库、触发后续流程等
# 发布任务状态事件 (running/progress update)
self._publish_task_status(
task_id=data.get('orderCode'), # 使用 OrderCode 作为关联 ID
task_code=data.get('orderCode'),
task_type="bioyond_step",
status="running",
progress=0.5, # 步骤完成视为任务进行中
result={"step_name": data.get('stepName'), "step_id": data.get('stepId')}
)
# 更新物料信息
# 步骤完成后,物料状态可能发生变化(如位置、用量等),触发同步
logger.info(f"[步骤完成报送] 触发物料同步...")
self.resource_synchronizer.sync_from_external()
return { return {
"processed": True, "processed": True,
"step_id": data.get('stepId'), "step_id": data.get('stepId'),
@@ -1267,6 +1416,17 @@ class BioyondWorkstation(WorkstationBase):
# TODO: 根据实际业务需求处理通量完成逻辑 # TODO: 根据实际业务需求处理通量完成逻辑
# 发布任务状态事件
self._publish_task_status(
task_id=data.get('orderCode'),
task_code=data.get('orderCode'),
task_type="bioyond_sample",
status="running",
progress=0.7,
result={"sample_id": data.get('sampleId'), "status": status_desc}
)
return { return {
"processed": True, "processed": True,
"sample_id": data.get('sampleId'), "sample_id": data.get('sampleId'),
@@ -1306,6 +1466,32 @@ class BioyondWorkstation(WorkstationBase):
# TODO: 根据实际业务需求处理任务完成逻辑 # TODO: 根据实际业务需求处理任务完成逻辑
# 例如:更新物料库存、生成报表等 # 例如:更新物料库存、生成报表等
# 映射状态到事件状态
event_status = "completed"
if str(data.get('status')) in ["-11", "-12"]:
event_status = "error"
elif str(data.get('status')) == "30":
event_status = "completed"
else:
event_status = "running" # 其他状态视为运行中(或根据实际定义)
# 发布任务状态事件
self._publish_task_status(
task_id=data.get('orderCode'),
task_code=data.get('orderCode'),
task_type="bioyond_order",
status=event_status,
progress=1.0 if event_status in ["completed", "error"] else 0.9,
result={"order_name": data.get('orderName'), "status": status_desc, "materials_count": len(used_materials)}
)
# 更新物料信息
# 任务完成后,且状态为完成时,触发同步以更新最终物料状态
if event_status == "completed":
logger.info(f"[任务完成报送] 触发物料同步...")
self.resource_synchronizer.sync_from_external()
return { return {
"processed": True, "processed": True,
"order_code": data.get('orderCode'), "order_code": data.get('orderCode'),