From db1b5a869f672c6ca6f3f57a0e7a13025bf5b134 Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Sun, 16 Nov 2025 14:35:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(workstation):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=B8=A9=E5=BA=A6/=E7=B2=98=E5=BA=A6=E6=8A=A5=E9=80=81?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在反应站设备配置中添加温度/粘度相关状态类型 - 实现温度/粘度报送处理逻辑并添加ROS消息发布 - 扩展HTTP服务支持温度/粘度报送端点 - 添加HTTP请求日志记录功能 --- .../bioyond_studio/reaction_station.py | 102 ++++++++++++++++++ .../workstation/bioyond_studio/station.py | 7 +- .../workstation/workstation_http_service.py | 89 ++++++++++++++- .../devices/reaction_station_bioyond.yaml | 10 ++ 4 files changed, 204 insertions(+), 4 deletions(-) diff --git a/unilabos/devices/workstation/bioyond_studio/reaction_station.py b/unilabos/devices/workstation/bioyond_studio/reaction_station.py index 10d0c089..21cd5abc 100644 --- a/unilabos/devices/workstation/bioyond_studio/reaction_station.py +++ b/unilabos/devices/workstation/bioyond_studio/reaction_station.py @@ -2,7 +2,10 @@ import json import time import requests from typing import List, Dict, Any +from pathlib import Path +from datetime import datetime from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation +from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String from unilabos.devices.workstation.bioyond_studio.config import ( WORKFLOW_STEP_IDS, WORKFLOW_TO_SECTION_MAP, @@ -38,6 +41,17 @@ class BioyondReactionStation(BioyondWorkstation): print(f"BioyondReactionStation初始化完成 - workflow_mappings: {self.workflow_mappings}") print(f"workflow_mappings长度: {len(self.workflow_mappings)}") + self.in_temperature = 0.0 + self.out_temperature = 0.0 + self.pt100_temperature = 0.0 + self.sensor_average_temperature = 0.0 + self.target_temperature = 0.0 + self.setting_temperature = 0.0 + self.viscosity = 0.0 + self.average_viscosity = 0.0 + self.speed = 0.0 + self.force = 0.0 + # ==================== 工作流方法 ==================== def reactor_taken_out(self): @@ -514,6 +528,94 @@ class BioyondReactionStation(BioyondWorkstation): print(f"[DEBUG] 返回结果: {result}") return result + def process_temperature_cutoff_report(self, report_request) -> Dict[str, Any]: + try: + data = report_request.data + def _f(v): + try: + return float(v) + except Exception: + return 0.0 + self.target_temperature = _f(data.get("targetTemperature")) + self.setting_temperature = _f(data.get("settingTemperature")) + self.in_temperature = _f(data.get("inTemperature")) + self.out_temperature = _f(data.get("outTemperature")) + self.pt100_temperature = _f(data.get("pt100Temperature")) + self.sensor_average_temperature = _f(data.get("sensorAverageTemperature")) + self.speed = _f(data.get("speed")) + self.force = _f(data.get("force")) + self.viscosity = _f(data.get("viscosity")) + self.average_viscosity = _f(data.get("averageViscosity")) + + try: + if hasattr(self, "_ros_node") and self._ros_node is not None: + props = [ + "in_temperature","out_temperature","pt100_temperature","sensor_average_temperature", + "target_temperature","setting_temperature","viscosity","average_viscosity", + "speed","force" + ] + for name in props: + pub = self._ros_node._property_publishers.get(name) + if pub: + pub.publish_property() + except Exception: + pass + event = { + "frameCode": data.get("frameCode"), + "generateTime": data.get("generateTime"), + "targetTemperature": data.get("targetTemperature"), + "settingTemperature": data.get("settingTemperature"), + "inTemperature": data.get("inTemperature"), + "outTemperature": data.get("outTemperature"), + "pt100Temperature": data.get("pt100Temperature"), + "sensorAverageTemperature": data.get("sensorAverageTemperature"), + "speed": data.get("speed"), + "force": data.get("force"), + "viscosity": data.get("viscosity"), + "averageViscosity": data.get("averageViscosity"), + "request_time": report_request.request_time, + "timestamp": datetime.now().isoformat(), + } + + base_dir = Path(__file__).resolve().parents[3] / "unilabos_data" + base_dir.mkdir(parents=True, exist_ok=True) + out_file = base_dir / "temperature_cutoff_events.json" + try: + existing = json.loads(out_file.read_text(encoding="utf-8")) if out_file.exists() else [] + if not isinstance(existing, list): + existing = [] + except Exception: + existing = [] + existing.append(event) + out_file.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding="utf-8") + + if hasattr(self, "_ros_node") and self._ros_node is not None: + ns = self._ros_node.namespace + topics = { + "targetTemperature": f"{ns}/metrics/temperature_cutoff/target_temperature", + "settingTemperature": f"{ns}/metrics/temperature_cutoff/setting_temperature", + "inTemperature": f"{ns}/metrics/temperature_cutoff/in_temperature", + "outTemperature": f"{ns}/metrics/temperature_cutoff/out_temperature", + "pt100Temperature": f"{ns}/metrics/temperature_cutoff/pt100_temperature", + "sensorAverageTemperature": f"{ns}/metrics/temperature_cutoff/sensor_average_temperature", + "speed": f"{ns}/metrics/temperature_cutoff/speed", + "force": f"{ns}/metrics/temperature_cutoff/force", + "viscosity": f"{ns}/metrics/temperature_cutoff/viscosity", + "averageViscosity": f"{ns}/metrics/temperature_cutoff/average_viscosity", + } + for k, t in topics.items(): + v = data.get(k) + if v is not None: + pub = self._ros_node.create_publisher(Float64, t, 10) + pub.publish(convert_to_ros_msg(Float64, float(v))) + + evt_pub = self._ros_node.create_publisher(String, f"{ns}/events/temperature_cutoff", 10) + evt_pub.publish(convert_to_ros_msg(String, json.dumps(event, ensure_ascii=False))) + + return {"processed": True, "frame": data.get("frameCode")} + except Exception as e: + return {"processed": False, "error": str(e)} + def wait_for_multiple_orders_and_get_reports(self, batch_create_result: str = None, timeout: int = 7200, check_interval: int = 10) -> Dict[str, Any]: try: timeout = int(timeout) if timeout else 7200 diff --git a/unilabos/devices/workstation/bioyond_studio/station.py b/unilabos/devices/workstation/bioyond_studio/station.py index cb080f02..e349b083 100644 --- a/unilabos/devices/workstation/bioyond_studio/station.py +++ b/unilabos/devices/workstation/bioyond_studio/station.py @@ -9,6 +9,7 @@ import traceback from datetime import datetime from typing import Dict, Any, List, Optional, Union import json +from pathlib import Path from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC @@ -19,6 +20,7 @@ from unilabos.resources.graphio import resource_bioyond_to_plr, resource_plr_to_ from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, BaseROS2DeviceNode from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode +from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String from pylabrobot.resources.resource import Resource as ResourcePLR from unilabos.devices.workstation.bioyond_studio.config import ( @@ -676,13 +678,13 @@ class BioyondWorkstation(WorkstationBase): self._synced_resources = [] def transfer_resource_to_another(self, resource: List[ResourceSlot], mount_resource: List[ResourceSlot], sites: List[str], mount_device_id: DeviceSlot): - time.sleep(3) - ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True, **{ + future = ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True, **{ "plr_resources": resource, "target_device_id": mount_device_id, "target_resources": mount_resource, "sites": sites, }) + return future def _create_communication_module(self, config: Optional[Dict[str, Any]] = None) -> None: """创建Bioyond通信模块""" @@ -1327,6 +1329,7 @@ class BioyondWorkstation(WorkstationBase): logger.error(f"处理物料变更报送失败: {e}") return {"processed": False, "error": str(e)} + def handle_external_error(self, error_data: Dict[str, Any]) -> Dict[str, Any]: """处理错误处理报送 diff --git a/unilabos/devices/workstation/workstation_http_service.py b/unilabos/devices/workstation/workstation_http_service.py index 87f5332b..11d87693 100644 --- a/unilabos/devices/workstation/workstation_http_service.py +++ b/unilabos/devices/workstation/workstation_http_service.py @@ -22,6 +22,7 @@ from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import urlparse from dataclasses import dataclass, asdict from datetime import datetime +from pathlib import Path from unilabos.utils.log import logger @@ -76,6 +77,14 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler): logger.info(f"收到工作站报送: {endpoint} - {request_data.get('token', 'unknown')}") + try: + payload_for_log = {"method": "POST", **request_data} + self._save_raw_request(endpoint, payload_for_log) + if hasattr(self.workstation, '_reports_received_count'): + self.workstation._reports_received_count += 1 + except Exception: + pass + # 统一的报送端点路由(基于LIMS协议规范) if endpoint == '/report/step_finish': response = self._handle_step_finish_report(request_data) @@ -90,6 +99,8 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler): response = self._handle_material_change_report(request_data) elif endpoint == '/report/error_handling': response = self._handle_error_handling_report(request_data) + elif endpoint == '/report/temperature-cutoff': + response = self._handle_temperature_cutoff_report(request_data) # 保留LIMS协议端点以兼容现有系统 elif endpoint == '/LIMS/step_finish': response = self._handle_step_finish_report(request_data) @@ -107,7 +118,8 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler): "/report/order_finish", "/report/batch_update", "/report/material_change", - "/report/error_handling" + "/report/error_handling", + "/report/temperature-cutoff" ]} ) @@ -128,6 +140,11 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler): parsed_path = urlparse(self.path) endpoint = parsed_path.path + try: + self._save_raw_request(endpoint, {"method": "GET"}) + except Exception: + pass + if endpoint == '/status': response = self._handle_status_check() elif endpoint == '/health': @@ -318,6 +335,50 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler): message=f"任务完成报送处理失败: {str(e)}" ) + def _handle_temperature_cutoff_report(self, request_data: Dict[str, Any]) -> HttpResponse: + try: + required_fields = ['token', 'request_time', 'data'] + if missing := [f for f in required_fields if f not in request_data]: + return HttpResponse(success=False, message=f"缺少必要字段: {', '.join(missing)}") + + data = request_data['data'] + metrics = [ + 'frameCode', + 'generateTime', + 'targetTemperature', + 'settingTemperature', + 'inTemperature', + 'outTemperature', + 'pt100Temperature', + 'sensorAverageTemperature', + 'speed', + 'force', + 'viscosity', + 'averageViscosity' + ] + if miss := [f for f in metrics if f not in data]: + return HttpResponse(success=False, message=f"data字段缺少必要内容: {', '.join(miss)}") + + report_request = WorkstationReportRequest( + token=request_data['token'], + request_time=request_data['request_time'], + data=data + ) + + result = {} + if hasattr(self.workstation, 'process_temperature_cutoff_report'): + result = self.workstation.process_temperature_cutoff_report(report_request) + + return HttpResponse( + success=True, + message=f"温度/粘度报送已处理: 帧{data['frameCode']}", + acknowledgment_id=f"TEMP_CUTOFF_{int(time.time()*1000)}_{data['frameCode']}", + data=result + ) + except Exception as e: + logger.error(f"处理温度/粘度报送失败: {e}\n{traceback.format_exc()}") + return HttpResponse(success=False, message=f"温度/粘度报送处理失败: {str(e)}") + def _handle_batch_update_report(self, request_data: Dict[str, Any]) -> HttpResponse: """处理批量报送""" try: @@ -510,6 +571,7 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler): "POST /report/batch_update", "POST /report/material_change", "POST /report/error_handling", + "POST /report/temperature-cutoff", "GET /status", "GET /health" ] @@ -547,6 +609,22 @@ class WorkstationHTTPHandler(BaseHTTPRequestHandler): """重写日志方法""" logger.debug(f"HTTP请求: {format % args}") + def _save_raw_request(self, endpoint: str, request_data: Dict[str, Any]) -> None: + try: + base_dir = Path(__file__).resolve().parents[3] / "unilabos_data" / "http_reports" + base_dir.mkdir(parents=True, exist_ok=True) + log_path = getattr(self.workstation, "_http_log_path", None) + log_file = Path(log_path) if log_path else (base_dir / f"http_{int(time.time()*1000)}.log") + payload = { + "endpoint": endpoint, + "received_at": datetime.now().isoformat(), + "body": request_data + } + with open(log_file, "a", encoding="utf-8") as f: + f.write(json.dumps(payload, ensure_ascii=False) + "\n") + except Exception: + pass + class WorkstationHTTPService: """工作站HTTP服务""" @@ -572,6 +650,10 @@ class WorkstationHTTPService: # 创建HTTP服务器 self.server = HTTPServer((self.host, self.port), handler_factory) + base_dir = Path(__file__).resolve().parents[3] / "unilabos_data" / "http_reports" + base_dir.mkdir(parents=True, exist_ok=True) + session_log = base_dir / f"http_{int(time.time()*1000)}.log" + setattr(self.workstation, "_http_log_path", str(session_log)) # 安全地获取 device_id 用于线程命名 device_id = "unknown" @@ -599,6 +681,7 @@ class WorkstationHTTPService: logger.info("扩展报送端点:") logger.info(" - POST /report/material_change # 物料变更报送") logger.info(" - POST /report/error_handling # 错误处理报送") + logger.info(" - POST /report/temperature-cutoff # 温度/粘度报送") logger.info("兼容端点:") logger.info(" - POST /LIMS/step_finish # 兼容LIMS步骤完成") logger.info(" - POST /LIMS/preintake_finish # 兼容LIMS通量完成") @@ -700,6 +783,9 @@ if __name__ == "__main__": def handle_external_error(self, error_data): return {"handled": True} + def process_temperature_cutoff_report(self, report_request): + return {"processed": True, "metrics": report_request.data} + workstation = BioyondWorkstation() http_service = WorkstationHTTPService(workstation) @@ -723,4 +809,3 @@ if __name__ == "__main__": except Exception as e: print(f"服务器运行错误: {e}") http_service.stop() - diff --git a/unilabos/registry/devices/reaction_station_bioyond.yaml b/unilabos/registry/devices/reaction_station_bioyond.yaml index 32627319..efe7e121 100644 --- a/unilabos/registry/devices/reaction_station_bioyond.yaml +++ b/unilabos/registry/devices/reaction_station_bioyond.yaml @@ -525,7 +525,17 @@ reaction_station.bioyond: protocol_type: [] status_types: all_workflows: dict + average_viscosity: float bioyond_status: dict + force: float + in_temperature: float + out_temperature: float + pt100_temperature: float + sensor_average_temperature: float + setting_temperature: float + speed: float + target_temperature: float + viscosity: float workstation_status: dict type: python config_info: []