feat(workstation): 添加温度/粘度报送处理功能

- 在反应站设备配置中添加温度/粘度相关状态类型
- 实现温度/粘度报送处理逻辑并添加ROS消息发布
- 扩展HTTP服务支持温度/粘度报送端点
- 添加HTTP请求日志记录功能
This commit is contained in:
ZiWei
2025-11-16 14:35:53 +08:00
parent 0136630700
commit db1b5a869f
4 changed files with 204 additions and 4 deletions

View File

@@ -2,7 +2,10 @@ import json
import time import time
import requests import requests
from typing import List, Dict, Any from typing import List, Dict, Any
from pathlib import Path
from datetime import datetime
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String
from unilabos.devices.workstation.bioyond_studio.config import ( from unilabos.devices.workstation.bioyond_studio.config import (
WORKFLOW_STEP_IDS, WORKFLOW_STEP_IDS,
WORKFLOW_TO_SECTION_MAP, WORKFLOW_TO_SECTION_MAP,
@@ -38,6 +41,17 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"BioyondReactionStation初始化完成 - workflow_mappings: {self.workflow_mappings}") print(f"BioyondReactionStation初始化完成 - workflow_mappings: {self.workflow_mappings}")
print(f"workflow_mappings长度: {len(self.workflow_mappings)}") print(f"workflow_mappings长度: {len(self.workflow_mappings)}")
self.in_temperature = 0.0
self.out_temperature = 0.0
self.pt100_temperature = 0.0
self.sensor_average_temperature = 0.0
self.target_temperature = 0.0
self.setting_temperature = 0.0
self.viscosity = 0.0
self.average_viscosity = 0.0
self.speed = 0.0
self.force = 0.0
# ==================== 工作流方法 ==================== # ==================== 工作流方法 ====================
def reactor_taken_out(self): def reactor_taken_out(self):
@@ -514,6 +528,94 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"[DEBUG] 返回结果: {result}") print(f"[DEBUG] 返回结果: {result}")
return result return result
def process_temperature_cutoff_report(self, report_request) -> Dict[str, Any]:
try:
data = report_request.data
def _f(v):
try:
return float(v)
except Exception:
return 0.0
self.target_temperature = _f(data.get("targetTemperature"))
self.setting_temperature = _f(data.get("settingTemperature"))
self.in_temperature = _f(data.get("inTemperature"))
self.out_temperature = _f(data.get("outTemperature"))
self.pt100_temperature = _f(data.get("pt100Temperature"))
self.sensor_average_temperature = _f(data.get("sensorAverageTemperature"))
self.speed = _f(data.get("speed"))
self.force = _f(data.get("force"))
self.viscosity = _f(data.get("viscosity"))
self.average_viscosity = _f(data.get("averageViscosity"))
try:
if hasattr(self, "_ros_node") and self._ros_node is not None:
props = [
"in_temperature","out_temperature","pt100_temperature","sensor_average_temperature",
"target_temperature","setting_temperature","viscosity","average_viscosity",
"speed","force"
]
for name in props:
pub = self._ros_node._property_publishers.get(name)
if pub:
pub.publish_property()
except Exception:
pass
event = {
"frameCode": data.get("frameCode"),
"generateTime": data.get("generateTime"),
"targetTemperature": data.get("targetTemperature"),
"settingTemperature": data.get("settingTemperature"),
"inTemperature": data.get("inTemperature"),
"outTemperature": data.get("outTemperature"),
"pt100Temperature": data.get("pt100Temperature"),
"sensorAverageTemperature": data.get("sensorAverageTemperature"),
"speed": data.get("speed"),
"force": data.get("force"),
"viscosity": data.get("viscosity"),
"averageViscosity": data.get("averageViscosity"),
"request_time": report_request.request_time,
"timestamp": datetime.now().isoformat(),
}
base_dir = Path(__file__).resolve().parents[3] / "unilabos_data"
base_dir.mkdir(parents=True, exist_ok=True)
out_file = base_dir / "temperature_cutoff_events.json"
try:
existing = json.loads(out_file.read_text(encoding="utf-8")) if out_file.exists() else []
if not isinstance(existing, list):
existing = []
except Exception:
existing = []
existing.append(event)
out_file.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding="utf-8")
if hasattr(self, "_ros_node") and self._ros_node is not None:
ns = self._ros_node.namespace
topics = {
"targetTemperature": f"{ns}/metrics/temperature_cutoff/target_temperature",
"settingTemperature": f"{ns}/metrics/temperature_cutoff/setting_temperature",
"inTemperature": f"{ns}/metrics/temperature_cutoff/in_temperature",
"outTemperature": f"{ns}/metrics/temperature_cutoff/out_temperature",
"pt100Temperature": f"{ns}/metrics/temperature_cutoff/pt100_temperature",
"sensorAverageTemperature": f"{ns}/metrics/temperature_cutoff/sensor_average_temperature",
"speed": f"{ns}/metrics/temperature_cutoff/speed",
"force": f"{ns}/metrics/temperature_cutoff/force",
"viscosity": f"{ns}/metrics/temperature_cutoff/viscosity",
"averageViscosity": f"{ns}/metrics/temperature_cutoff/average_viscosity",
}
for k, t in topics.items():
v = data.get(k)
if v is not None:
pub = self._ros_node.create_publisher(Float64, t, 10)
pub.publish(convert_to_ros_msg(Float64, float(v)))
evt_pub = self._ros_node.create_publisher(String, f"{ns}/events/temperature_cutoff", 10)
evt_pub.publish(convert_to_ros_msg(String, json.dumps(event, ensure_ascii=False)))
return {"processed": True, "frame": data.get("frameCode")}
except Exception as e:
return {"processed": False, "error": str(e)}
def wait_for_multiple_orders_and_get_reports(self, batch_create_result: str = None, timeout: int = 7200, check_interval: int = 10) -> Dict[str, Any]: def wait_for_multiple_orders_and_get_reports(self, batch_create_result: str = None, timeout: int = 7200, check_interval: int = 10) -> Dict[str, Any]:
try: try:
timeout = int(timeout) if timeout else 7200 timeout = int(timeout) if timeout else 7200

View File

@@ -9,6 +9,7 @@ import traceback
from datetime import datetime from datetime import datetime
from typing import Dict, Any, List, Optional, Union from typing import Dict, Any, List, Optional, Union
import json import json
from pathlib import Path
from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
@@ -19,6 +20,7 @@ from unilabos.resources.graphio import resource_bioyond_to_plr, resource_plr_to_
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, BaseROS2DeviceNode from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, BaseROS2DeviceNode
from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode
from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String
from pylabrobot.resources.resource import Resource as ResourcePLR from pylabrobot.resources.resource import Resource as ResourcePLR
from unilabos.devices.workstation.bioyond_studio.config import ( from unilabos.devices.workstation.bioyond_studio.config import (
@@ -676,13 +678,13 @@ class BioyondWorkstation(WorkstationBase):
self._synced_resources = [] self._synced_resources = []
def transfer_resource_to_another(self, resource: List[ResourceSlot], mount_resource: List[ResourceSlot], sites: List[str], mount_device_id: DeviceSlot): def transfer_resource_to_another(self, resource: List[ResourceSlot], mount_resource: List[ResourceSlot], sites: List[str], mount_device_id: DeviceSlot):
time.sleep(3) future = ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True, **{
ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True, **{
"plr_resources": resource, "plr_resources": resource,
"target_device_id": mount_device_id, "target_device_id": mount_device_id,
"target_resources": mount_resource, "target_resources": mount_resource,
"sites": sites, "sites": sites,
}) })
return future
def _create_communication_module(self, config: Optional[Dict[str, Any]] = None) -> None: def _create_communication_module(self, config: Optional[Dict[str, Any]] = None) -> None:
"""创建Bioyond通信模块""" """创建Bioyond通信模块"""
@@ -1327,6 +1329,7 @@ class BioyondWorkstation(WorkstationBase):
logger.error(f"处理物料变更报送失败: {e}") logger.error(f"处理物料变更报送失败: {e}")
return {"processed": False, "error": str(e)} return {"processed": False, "error": str(e)}
def handle_external_error(self, error_data: Dict[str, Any]) -> Dict[str, Any]: def handle_external_error(self, error_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理错误处理报送 """处理错误处理报送

View File

@@ -22,6 +22,7 @@ from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse from urllib.parse import urlparse
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from datetime import datetime from datetime import datetime
from pathlib import Path
from unilabos.utils.log import logger from unilabos.utils.log import logger
@@ -76,6 +77,14 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler):
logger.info(f"收到工作站报送: {endpoint} - {request_data.get('token', 'unknown')}") logger.info(f"收到工作站报送: {endpoint} - {request_data.get('token', 'unknown')}")
try:
payload_for_log = {"method": "POST", **request_data}
self._save_raw_request(endpoint, payload_for_log)
if hasattr(self.workstation, '_reports_received_count'):
self.workstation._reports_received_count += 1
except Exception:
pass
# 统一的报送端点路由基于LIMS协议规范 # 统一的报送端点路由基于LIMS协议规范
if endpoint == '/report/step_finish': if endpoint == '/report/step_finish':
response = self._handle_step_finish_report(request_data) response = self._handle_step_finish_report(request_data)
@@ -90,6 +99,8 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler):
response = self._handle_material_change_report(request_data) response = self._handle_material_change_report(request_data)
elif endpoint == '/report/error_handling': elif endpoint == '/report/error_handling':
response = self._handle_error_handling_report(request_data) response = self._handle_error_handling_report(request_data)
elif endpoint == '/report/temperature-cutoff':
response = self._handle_temperature_cutoff_report(request_data)
# 保留LIMS协议端点以兼容现有系统 # 保留LIMS协议端点以兼容现有系统
elif endpoint == '/LIMS/step_finish': elif endpoint == '/LIMS/step_finish':
response = self._handle_step_finish_report(request_data) response = self._handle_step_finish_report(request_data)
@@ -107,7 +118,8 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler):
"/report/order_finish", "/report/order_finish",
"/report/batch_update", "/report/batch_update",
"/report/material_change", "/report/material_change",
"/report/error_handling" "/report/error_handling",
"/report/temperature-cutoff"
]} ]}
) )
@@ -128,6 +140,11 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler):
parsed_path = urlparse(self.path) parsed_path = urlparse(self.path)
endpoint = parsed_path.path endpoint = parsed_path.path
try:
self._save_raw_request(endpoint, {"method": "GET"})
except Exception:
pass
if endpoint == '/status': if endpoint == '/status':
response = self._handle_status_check() response = self._handle_status_check()
elif endpoint == '/health': elif endpoint == '/health':
@@ -318,6 +335,50 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler):
message=f"任务完成报送处理失败: {str(e)}" message=f"任务完成报送处理失败: {str(e)}"
) )
def _handle_temperature_cutoff_report(self, request_data: Dict[str, Any]) -> HttpResponse:
try:
required_fields = ['token', 'request_time', 'data']
if missing := [f for f in required_fields if f not in request_data]:
return HttpResponse(success=False, message=f"缺少必要字段: {', '.join(missing)}")
data = request_data['data']
metrics = [
'frameCode',
'generateTime',
'targetTemperature',
'settingTemperature',
'inTemperature',
'outTemperature',
'pt100Temperature',
'sensorAverageTemperature',
'speed',
'force',
'viscosity',
'averageViscosity'
]
if miss := [f for f in metrics if f not in data]:
return HttpResponse(success=False, message=f"data字段缺少必要内容: {', '.join(miss)}")
report_request = WorkstationReportRequest(
token=request_data['token'],
request_time=request_data['request_time'],
data=data
)
result = {}
if hasattr(self.workstation, 'process_temperature_cutoff_report'):
result = self.workstation.process_temperature_cutoff_report(report_request)
return HttpResponse(
success=True,
message=f"温度/粘度报送已处理: 帧{data['frameCode']}",
acknowledgment_id=f"TEMP_CUTOFF_{int(time.time()*1000)}_{data['frameCode']}",
data=result
)
except Exception as e:
logger.error(f"处理温度/粘度报送失败: {e}\n{traceback.format_exc()}")
return HttpResponse(success=False, message=f"温度/粘度报送处理失败: {str(e)}")
def _handle_batch_update_report(self, request_data: Dict[str, Any]) -> HttpResponse: def _handle_batch_update_report(self, request_data: Dict[str, Any]) -> HttpResponse:
"""处理批量报送""" """处理批量报送"""
try: try:
@@ -510,6 +571,7 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler):
"POST /report/batch_update", "POST /report/batch_update",
"POST /report/material_change", "POST /report/material_change",
"POST /report/error_handling", "POST /report/error_handling",
"POST /report/temperature-cutoff",
"GET /status", "GET /status",
"GET /health" "GET /health"
] ]
@@ -547,6 +609,22 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler):
"""重写日志方法""" """重写日志方法"""
logger.debug(f"HTTP请求: {format % args}") logger.debug(f"HTTP请求: {format % args}")
def _save_raw_request(self, endpoint: str, request_data: Dict[str, Any]) -> None:
try:
base_dir = Path(__file__).resolve().parents[3] / "unilabos_data" / "http_reports"
base_dir.mkdir(parents=True, exist_ok=True)
log_path = getattr(self.workstation, "_http_log_path", None)
log_file = Path(log_path) if log_path else (base_dir / f"http_{int(time.time()*1000)}.log")
payload = {
"endpoint": endpoint,
"received_at": datetime.now().isoformat(),
"body": request_data
}
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
except Exception:
pass
class WorkstationHTTPService: class WorkstationHTTPService:
"""工作站HTTP服务""" """工作站HTTP服务"""
@@ -572,6 +650,10 @@ class WorkstationHTTPService:
# 创建HTTP服务器 # 创建HTTP服务器
self.server = HTTPServer((self.host, self.port), handler_factory) self.server = HTTPServer((self.host, self.port), handler_factory)
base_dir = Path(__file__).resolve().parents[3] / "unilabos_data" / "http_reports"
base_dir.mkdir(parents=True, exist_ok=True)
session_log = base_dir / f"http_{int(time.time()*1000)}.log"
setattr(self.workstation, "_http_log_path", str(session_log))
# 安全地获取 device_id 用于线程命名 # 安全地获取 device_id 用于线程命名
device_id = "unknown" device_id = "unknown"
@@ -599,6 +681,7 @@ class WorkstationHTTPService:
logger.info("扩展报送端点:") logger.info("扩展报送端点:")
logger.info(" - POST /report/material_change # 物料变更报送") logger.info(" - POST /report/material_change # 物料变更报送")
logger.info(" - POST /report/error_handling # 错误处理报送") logger.info(" - POST /report/error_handling # 错误处理报送")
logger.info(" - POST /report/temperature-cutoff # 温度/粘度报送")
logger.info("兼容端点:") logger.info("兼容端点:")
logger.info(" - POST /LIMS/step_finish # 兼容LIMS步骤完成") logger.info(" - POST /LIMS/step_finish # 兼容LIMS步骤完成")
logger.info(" - POST /LIMS/preintake_finish # 兼容LIMS通量完成") logger.info(" - POST /LIMS/preintake_finish # 兼容LIMS通量完成")
@@ -700,6 +783,9 @@ if __name__ == "__main__":
def handle_external_error(self, error_data): def handle_external_error(self, error_data):
return {"handled": True} return {"handled": True}
def process_temperature_cutoff_report(self, report_request):
return {"processed": True, "metrics": report_request.data}
workstation = BioyondWorkstation() workstation = BioyondWorkstation()
http_service = WorkstationHTTPService(workstation) http_service = WorkstationHTTPService(workstation)
@@ -723,4 +809,3 @@ if __name__ == "__main__":
except Exception as e: except Exception as e:
print(f"服务器运行错误: {e}") print(f"服务器运行错误: {e}")
http_service.stop() http_service.stop()

View File

@@ -525,7 +525,17 @@ reaction_station.bioyond:
protocol_type: [] protocol_type: []
status_types: status_types:
all_workflows: dict all_workflows: dict
average_viscosity: float
bioyond_status: dict bioyond_status: dict
force: float
in_temperature: float
out_temperature: float
pt100_temperature: float
sensor_average_temperature: float
setting_temperature: float
speed: float
target_temperature: float
viscosity: float
workstation_status: dict workstation_status: dict
type: python type: python
config_info: [] config_info: []