Feature/xprbalance-zhida (#80)

* feat(devices): add Zhida GC/MS pretreatment automation workstation

* feat(devices): add mettler_toledo xpr balance

* balance
This commit is contained in:
Xie Qiming
2025-09-19 11:43:25 +08:00
committed by GitHub
parent 41eaa88c6f
commit ace98a4472
20 changed files with 2451 additions and 0 deletions

View File

@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
WSDL Template for Mettler Toledo XPR/XSR Balance
IMPORTANT: This is a template file. You need to obtain the actual WSDL file
from Mettler Toledo for your specific balance model.
To use this driver:
1. Contact Mettler Toledo support to obtain the official WSDL file
2. Replace this template with the actual WSDL file
3. Rename it to: MT.Laboratory.Balance.XprXsr.V03.wsdl
The WSDL file contains proprietary information and cannot be distributed
with this open-source project.
-->
<wsdl:definitions xmlns:wsx="http://schemas.xmlsoap.org/ws/2004/09/mex"
xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
xmlns:wsa10="http://www.w3.org/2005/08/addressing"
xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy"
xmlns:wsap="http://schemas.xmlsoap.org/ws/2004/08/addressing/policy"
xmlns:msc="http://schemas.microsoft.com/ws/2005/12/wsdl/contract"
xmlns:soap12="http://schemas.xmlsoap.org/wsdl/soap12/"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing"
xmlns:wsam="http://www.w3.org/2007/05/addressing/metadata"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://MT/Laboratory/Balance/XprXsr/V03"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:wsaw="http://www.w3.org/2006/05/addressing/wsdl"
xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
targetNamespace="http://MT/Laboratory/Balance/XprXsr/V03"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/">
<!--
PLACEHOLDER CONTENT
This template contains only the basic structure.
The actual WSDL file should contain:
- Service definitions
- Port types
- Message definitions
- Binding information
- Endpoint addresses with template variables: {{host}}, {{port}}, {{api_path}}
-->
<wsdl:types>
<!-- Schema definitions will be here in the actual WSDL -->
</wsdl:types>
<!-- Service definitions will be here in the actual WSDL -->
</wsdl:definitions>

View File

@@ -0,0 +1,255 @@
# 梅特勒天平 ROS2 使用指南 / Mettler Toledo Balance ROS2 User Guide
## 概述 / Overview
梅特勒托利多XPR/XSR天平驱动支持通过ROS2动作进行操作包括去皮、清零、读取重量等功能。
The Mettler Toledo XPR/XSR balance driver supports operations through ROS2 actions, including tare, zero, weight reading, and other functions.
## 主要功能 / Main Features
### 1. 去皮操作 / Tare Operation (`tare`)
- **功能 / Function**: 执行天平去皮操作 / Perform balance tare operation
- **输入 / Input**: `{"immediate": bool}` - 是否立即去皮 / Whether to tare immediately
- **输出 / Output**: `{"return_info": str, "success": bool}`
### 2. 清零操作 / Zero Operation (`zero`)
- **功能 / Function**: 执行天平清零操作 / Perform balance zero operation
- **输入 / Input**: `{"immediate": bool}` - 是否立即清零 / Whether to zero immediately
- **输出 / Output**: `{"return_info": str, "success": bool}`
### 3. 读取重量 / Read Weight (`read` / `get_weight`)
- **功能 / Function**: 读取当前天平重量 / Read current balance weight
- **输入 / Input**: 无参数 / No parameters
- **输出 / Output**: `{"return_info": str, "success": bool}` - 包含重量信息 / Contains weight information
## 使用方法 / Usage Methods
### ROS2命令行使用 / ROS2 Command Line Usage
### 1. 去皮操作 / Tare Operation
```bash
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"tare\", \"params\": {\"immediate\": false}}'
}"
```
### 2. 清零操作 / Zero Operation
```bash
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"zero\", \"params\": {\"immediate\": false}}'
}"
```
### 3. 读取重量 / Read Weight
```bash
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"read\"}'
}"
```
### 4. 推荐的去皮读取流程 / Recommended Tare and Read Workflow
**步骤1: 去皮操作 / Step 1: Tare Operation**
```bash
# 放置空容器后执行去皮 / Execute tare after placing empty container
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"tare\", \"params\": {\"immediate\": false}}'
}"
```
**步骤2: 读取净重 / Step 2: Read Net Weight**
```bash
# 添加物质后读取净重 / Read net weight after adding substance
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"read\"}'
}"
```
**优势 / Advantages**:
- 可以在去皮和读取之间进行确认 / Can confirm between taring and reading
- 更好的错误处理和调试 / Better error handling and debugging
- 操作流程更加清晰 / Clearer operation workflow
## 命令格式说明 / Command Format Description
所有命令都使用JSON格式包含以下字段 / All commands use JSON format with the following fields
```json
{
"command": "命令名称 / Command name",
"params": {
"参数名 / Parameter name": "参数值 / Parameter value"
}
}
```
**注意事项 / Notes**
1. JSON字符串需要正确转义引号 / JSON strings need proper quote escaping
2. 布尔值使用小写true/false/ Boolean values use lowercase (true/false)
3. 如果命令不需要参数,可以省略`params`字段 / If command doesn't need parameters, `params` field can be omitted
## 返回结果 / Return Results
所有命令都会返回包含以下字段的结果 / All commands return results with the following fields
- `success`: 布尔值,表示操作是否成功 / Boolean value indicating operation success
- `return_info`: 字符串,包含操作结果的详细信息 / String containing detailed operation result information
## 成功执行示例 / Successful Execution Example
以下是一个成功执行读取重量命令的示例 / Here is an example of successfully executing a weight reading command
```bash
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"read\"}'
}"
```
**成功返回结果 / Successful Return Result**
```
Waiting for an action server to become available...
Sending goal:
command: '{"command": "read"}'
Goal accepted :)
Result:
success: True
return_info: Weight: 0.24866 Milligram
Goal finished with status: SUCCEEDED
```
### Python代码使用 / Python Code Usage
```python
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from unilabos_msgs.action import SendCmd
import json
class BalanceController(Node):
"""梅特勒天平控制器 / Mettler Balance Controller"""
def __init__(self):
super().__init__('balance_controller')
self._action_client = ActionClient(self, SendCmd, '/devices/BALANCE_STATION/send_cmd')
def send_command(self, command, params=None):
"""发送命令到天平 / Send command to balance"""
goal_msg = SendCmd.Goal()
cmd_data = {'command': command}
if params:
cmd_data['params'] = params
goal_msg.command = json.dumps(cmd_data)
self._action_client.wait_for_server()
future = self._action_client.send_goal_async(goal_msg)
return future
def tare_balance(self, immediate=False):
"""去皮操作 / Tare operation"""
return self.send_command('tare', {'immediate': immediate})
def zero_balance(self, immediate=False):
"""清零操作 / Zero operation"""
return self.send_command('zero', {'immediate': immediate})
def read_weight(self):
"""读取重量 / Read weight"""
return self.send_command('read')
# 使用示例 / Usage Example
def main():
rclpy.init()
controller = BalanceController()
# 去皮操作 / Tare operation
future = controller.tare_balance(immediate=False)
rclpy.spin_until_future_complete(controller, future)
result = future.result().result
print(f"去皮结果 / Tare result: {result.success}, 信息 / Info: {result.return_info}")
# 读取重量 / Read weight
future = controller.read_weight()
rclpy.spin_until_future_complete(controller, future)
result = future.result().result
print(f"读取结果 / Read result: {result.success}, 信息 / Info: {result.return_info}")
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
```
## 使用注意事项 / Usage Notes
1. **设备连接 / Device Connection**: 确保梅特勒天平设备已连接并可访问 / Ensure Mettler balance device is connected and accessible
2. **命令格式 / Command Format**: JSON字符串需要正确转义引号 / JSON strings need proper quote escaping
3. **参数类型 / Parameter Types**: 布尔值使用小写true/false/ Boolean values use lowercase (true/false)
4. **权限 / Permissions**: 确保有操作天平的权限 / Ensure you have permission to operate the balance
## 故障排除 / Troubleshooting
### 常见问题 / Common Issues
1. **JSON格式错误 / JSON Format Error**: 确保JSON字符串格式正确且引号已转义 / Ensure JSON string format is correct and quotes are escaped
2. **未知命令名称 / Unknown Command Name**: 检查命令名称是否正确 / Check if command name is correct
3. **设备连接失败 / Device Connection Failed**: 检查网络连接和设备状态 / Check network connection and device status
4. **操作超时 / Operation Timeout**: 检查设备是否响应正常 / Check if device is responding normally
### 错误处理 / Error Handling
如果命令执行失败,返回结果中的`success`字段将为`false``return_info`字段将包含错误信息。
If command execution fails, the `success` field in the return result will be `false`, and the `return_info` field will contain error information.
### 调试技巧 / Debugging Tips
1. 检查设备节点是否正在运行 / Check if device node is running
```bash
ros2 node list | grep BALANCE
```
2. 查看可用的action / View available actions
```bash
ros2 action list | grep BALANCE
```
3. 检查action接口 / Check action interface
```bash
ros2 action info /devices/BALANCE_STATION/send_cmd
```
4. 查看节点日志 / View node logs
```bash
ros2 topic echo /rosout
```
## 总结 / Summary
梅特勒托利多天平设备现在支持 / Mettler Toledo balance device now supports:
1. 通过ROS2 SendCmd动作进行统一操作 / Unified operations through ROS2 SendCmd actions
2. 完整的天平功能支持(去皮、清零、读重等)/ Complete balance function support (tare, zero, weight reading, etc.)
3. 完善的错误处理和日志记录 / Comprehensive error handling and logging
4. 简化的操作流程和调试方法 / Simplified operation workflow and debugging methods

View File

@@ -0,0 +1,123 @@
# Mettler Toledo XPR/XSR Balance Driver
## 概述
本驱动程序为梅特勒托利多XPR/XSR系列天平提供标准接口支持去皮、清零和重量读取等操作。
## ⚠️ 重要说明 - WSDL文件配置
### 问题说明
本驱动程序需要使用梅特勒托利多官方提供的WSDL文件来与天平通信。由于该WSDL文件包含专有信息不能随开源项目一起分发。
### 配置步骤
1. **获取WSDL文件**
- 联系梅特勒托利多技术支持
- 或从您的天平设备Web界面下载
- 或从梅特勒托利多官方SDK获取
2. **安装WSDL文件**
```bash
# 将获取的WSDL文件复制到驱动目录
cp /path/to/your/MT.Laboratory.Balance.XprXsr.V03.wsdl \
unilabos/devices/balance/mettler_toledo_xpr/
```
3. **验证安装**
- 确保文件名为:`MT.Laboratory.Balance.XprXsr.V03.wsdl`
- 确保文件包含Jinja2模板变量`{{host}}`、`{{port}}`、`{{api_path}}`
### WSDL文件要求
- 文件必须是有效的WSDL格式
- 必须包含SessionService和WeighingService的定义
- 端点地址应使用模板变量以支持动态IP配置
```xml
<soap:address location="http://{{host}}:{{port}}/{{api_path}}/SessionService" />
<soap:address location="http://{{host}}:{{port}}/{{api_path}}/WeighingService" />
```
### 文件结构
```
mettler_toledo_xpr/
├── MT.Laboratory.Balance.XprXsr.V03.wsdl # 实际WSDL文件用户提供
├── MT.Laboratory.Balance.XprXsr.V03.wsdl.template # 模板文件(仅供参考)
├── mettler_toledo_xpr.py # 驱动程序
├── balance.yaml # 设备配置
├── SendCmd_Usage_Guide.md # 使用指南
└── README.md # 本文件
```
## 使用方法
### 基本配置
```python
from unilabos.devices.balance.mettler_toledo_xpr import MettlerToledoXPR
# 创建天平实例
balance = MettlerToledoXPR(
ip="192.168.1.10", # 天平IP地址
port=81, # 天平端口
password="123456", # 天平密码
timeout=10 # 连接超时时间
)
# 执行操作
balance.tare() # 去皮
balance.zero() # 清零
weight = balance.get_weight() # 读取重量
```
### ROS2 SendCmd Action
详细的ROS2使用方法请参考 [SendCmd_Usage_Guide.md](SendCmd_Usage_Guide.md)
## 故障排除
### 常见错误
1. **FileNotFoundError: WSDL template not found**
- 确保WSDL文件已正确放置在驱动目录中
- 检查文件名是否正确
2. **连接失败**
- 检查天平IP地址和端口配置
- 确保天平Web服务已启用
- 验证网络连接
3. **认证失败**
- 检查天平密码是否正确
- 确保天平允许Web服务访问
### 调试模式
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# 创建天平实例,将显示详细日志
balance = MettlerToledoXPR(ip="192.168.1.10")
```
## 支持的操作
- **去皮 (Tare)**: 将当前重量设为零点
- **清零 (Zero)**: 重新校准零点
- **读取重量 (Get Weight)**: 获取当前重量值
- **带去皮读取**: 先去皮再读取重量
- **连接管理**: 自动连接和断开
## 技术支持
如果您在配置WSDL文件时遇到问题
1. 查看梅特勒托利多官方文档
2. 联系梅特勒托利多技术支持
3. 在项目GitHub页面提交Issue
## 许可证
本驱动程序遵循项目主许可证。WSDL文件的使用需遵循梅特勒托利多的许可条款。

View File

@@ -0,0 +1,5 @@
# Mettler Toledo XPR Balance Driver Module
from .mettler_toledo_xpr import MettlerToledoXPR
__all__ = ['MettlerToledoXPR']

View File

@@ -0,0 +1,256 @@
balance.mettler_toledo_xpr:
category:
- balance
class:
action_value_mappings:
disconnect:
feedback: {}
goal: {}
goal_default: {}
handles: []
result:
success: success
schema:
description: Disconnect from balance
properties:
feedback: {}
goal:
properties: {}
required: []
type: object
result:
properties:
success:
description: Whether disconnect was successful
type: boolean
required:
- success
type: object
required:
- goal
type: object
type: UniLabJsonCommand
get_weight:
feedback: {}
goal: {}
goal_default: {}
handles: []
result:
unit: unit
weight: weight
schema:
description: Get current weight reading
properties:
feedback: {}
goal:
properties: {}
required: []
type: object
result:
properties:
unit:
description: Weight unit (e.g., g, kg)
type: string
weight:
description: Weight value
type: number
required:
- weight
- unit
type: object
required:
- goal
type: object
type: UniLabJsonCommand
read_with_tare:
feedback: {}
goal:
immediate_tare: immediate_tare
goal_default:
immediate_tare: true
handles: []
result:
unit: unit
weight: weight
schema:
description: Perform tare then read weight (standard read operation)
properties:
feedback: {}
goal:
properties:
immediate_tare:
default: true
description: Whether to use immediate tare
type: boolean
required: []
type: object
result:
properties:
unit:
description: Weight unit (e.g., g, kg)
type: string
weight:
description: Weight value after tare
type: number
required:
- weight
- unit
type: object
required:
- goal
type: object
type: UniLabJsonCommand
send_cmd:
feedback: {}
goal:
command: command
goal_default:
command: ''
handles: []
result:
return_info: return_info
success: success
schema:
description: ''
properties:
feedback:
properties:
status:
type: string
required:
- status
title: SendCmd_Feedback
type: object
goal:
properties:
command:
type: string
required:
- command
title: SendCmd_Goal
type: object
result:
properties:
return_info:
type: string
success:
type: boolean
required:
- return_info
- success
title: SendCmd_Result
type: object
required:
- goal
title: SendCmd
type: object
type: SendCmd
tare:
feedback: {}
goal:
immediate: immediate
goal_default:
immediate: false
handles: []
result:
success: success
schema:
description: Tare operation for balance
properties:
feedback: {}
goal:
properties:
immediate:
default: false
description: Whether to perform immediate tare
type: boolean
required: []
type: object
result:
properties:
success:
description: Whether tare operation was successful
type: boolean
required:
- success
type: object
required:
- goal
type: object
type: UniLabJsonCommand
zero:
feedback: {}
goal:
immediate: immediate
goal_default:
immediate: false
handles: []
result:
success: success
schema:
description: Zero operation for balance
properties:
feedback: {}
goal:
properties:
immediate:
default: false
description: Whether to perform immediate zero
type: boolean
required: []
type: object
result:
properties:
success:
description: Whether zero operation was successful
type: boolean
required:
- success
type: object
required:
- goal
type: object
type: UniLabJsonCommand
module: unilabos.devices.balance.mettler_toledo_xpr.mettler_toledo_xpr:MettlerToledoXPR
status_types:
error_message: str
is_stable: bool
status: str
unit: str
weight: float
type: python
config_info: []
description: Mettler Toledo XPR/XSR Balance Driver
handles: []
icon: ''
init_param_schema:
description: MettlerToledoXPR __init__ parameters
properties:
feedback: {}
goal:
description: Initialization parameters for Mettler Toledo XPR balance
properties:
ip:
default: 192.168.1.10
description: Balance IP address
type: string
password:
default: '123456'
description: Balance password
type: string
port:
default: 81
description: Balance port number
type: integer
timeout:
default: 10
description: Connection timeout in seconds
type: integer
required: []
type: object
result: {}
required:
- goal
title: __init__ command parameters
type: object
version: 1.0.0

View File

@@ -0,0 +1,25 @@
{
"nodes": [
{
"id": "BALANCE_STATION",
"name": "METTLER_TOLEDO_XPR",
"parent": null,
"type": "device",
"class": "balance.mettler_toledo_xpr",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"ip": "192.168.1.10",
"port": 81,
"password": "123456",
"timeout": 10
},
"data": {},
"children": []
}
],
"links": []
}

View File

@@ -0,0 +1,571 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Mettler Toledo XPR/XSR Balance Driver for Uni-Lab OS
This driver provides standard interface for Mettler Toledo XPR/XSR balance operations
including tare, zero, and weight reading functions.
"""
import enum
import base64
import hashlib
import logging
import time
from pathlib import Path
from decimal import Decimal
from typing import Tuple, Optional
from jinja2 import Template
from requests import Session
from zeep import Client
from zeep.transports import Transport
import pprp
# Import UniversalDriver - handle import error gracefully
try:
from unilabos.device_comms.universal_driver import UniversalDriver
except ImportError:
# Fallback for standalone testing
class UniversalDriver:
"""Fallback UniversalDriver for standalone testing"""
def __init__(self):
self.success = False
class Outcome(enum.Enum):
"""Balance operation outcome enumeration"""
SUCCESS = "Success"
ERROR = "Error"
class MettlerToledoXPR(UniversalDriver):
"""Mettler Toledo XPR/XSR Balance Driver
Provides standard interface for balance operations including:
- Tare (去皮)
- Zero (清零)
- Weight reading (读数)
"""
def __init__(self, ip: str = "192.168.1.10", port: int = 81,
password: str = "123456", timeout: int = 10):
"""Initialize the balance driver
Args:
ip: Balance IP address
port: Balance port number
password: Balance password
timeout: Connection timeout in seconds
"""
super().__init__()
self.ip = ip
self.port = port
self.password = password
self.timeout = timeout
self.api_path = "MT/Laboratory/Balance/XprXsr/V03"
# Status properties
self._status = "Disconnected"
self._last_weight = 0.0
self._last_unit = "g"
self._is_stable = False
self._error_message = ""
# ROS2 action result properties
self.success = False
self.return_info = ""
# Service objects
self.client = None
self.session_svc = None
self.weighing_svc = None
self.session_id = None
# WSDL template path
self.wsdl_template = Path(__file__).parent / "MT.Laboratory.Balance.XprXsr.V03.wsdl"
# Bindings
self.bindings = {
"session": "{http://MT/Laboratory/Balance/XprXsr/V03}BasicHttpBinding_ISessionService",
"weigh": "{http://MT/Laboratory/Balance/XprXsr/V03}BasicHttpBinding_IWeighingService",
}
# Setup logging
self.logger = logging.getLogger(f"MettlerToledoXPR-{ip}")
# Initialize connection
self._connect()
@property
def status(self) -> str:
"""Current device status"""
return self._status
@property
def weight(self) -> float:
"""Last measured weight value"""
return self._last_weight
@property
def unit(self) -> str:
"""Weight unit (e.g., 'g', 'kg')"""
return self._last_unit
@property
def is_stable(self) -> bool:
"""Whether the weight reading is stable"""
return self._is_stable
@property
def error_message(self) -> str:
"""Last error message"""
return self._error_message
def _decrypt_session_id(self, pw: str, enc_sid: str, salt: str) -> str:
"""Decrypt session ID using password and salt"""
key = hashlib.pbkdf2_hmac("sha1", pw.encode(),
base64.b64decode(salt), 1000, dklen=32)
plain = pprp.decrypt_sink(
pprp.rijndael_decrypt_gen(
key, pprp.data_source_gen(base64.b64decode(enc_sid))))
return plain.decode()
def _render_wsdl(self) -> Path:
"""Render WSDL template with current connection parameters"""
if not self.wsdl_template.exists():
raise FileNotFoundError(f"WSDL template not found: {self.wsdl_template}")
text = Template(self.wsdl_template.read_text(encoding="utf-8")).render(
host=self.ip, port=self.port, api_path=self.api_path)
wsdl_path = self.wsdl_template.parent / f"rendered_{self.ip}_{self.port}.wsdl"
wsdl_path.write_text(text, encoding="utf-8")
return wsdl_path
def _connect(self):
"""Establish connection to the balance"""
try:
self._status = "Connecting"
# Render WSDL
wsdl_path = self._render_wsdl()
self.logger.info(f"WSDL rendered to {wsdl_path}")
# Create SOAP client
transport = Transport(session=Session(), timeout=self.timeout)
self.client = Client(wsdl=str(wsdl_path), transport=transport)
# Create service proxies
base_url = f"http://{self.ip}:{self.port}/{self.api_path}"
self.session_svc = self.client.create_service(
self.bindings["session"], f"{base_url}/SessionService")
self.weighing_svc = self.client.create_service(
self.bindings["weigh"], f"{base_url}/WeighingService")
self.logger.info("Zeep service proxies created")
# Open session
self.logger.info("Opening session...")
reply = self.session_svc.OpenSession()
if reply.Outcome != Outcome.SUCCESS.value:
raise RuntimeError(f"OpenSession failed: {getattr(reply, 'ErrorMessage', '')}")
self.session_id = self._decrypt_session_id(
self.password, reply.SessionId, reply.Salt)
self.logger.info(f"Session established successfully, SessionId={self.session_id}")
self._status = "Connected"
self._error_message = ""
except Exception as e:
self._status = "Error"
self._error_message = str(e)
self.logger.error(f"Connection failed: {e}")
raise
def _ensure_connected(self):
"""Ensure the device is connected"""
if self._status != "Connected" or self.session_id is None:
self._connect()
def tare(self, immediate: bool = False) -> bool:
"""Perform tare operation (去皮)
Args:
immediate: Whether to perform immediate tare
Returns:
bool: True if successful, False otherwise
"""
try:
self._ensure_connected()
self._status = "Taring"
self.logger.info(f"Performing tare (immediate={immediate})...")
reply = self.weighing_svc.Tare(self.session_id, immediate)
if reply.Outcome != Outcome.SUCCESS.value:
error_msg = getattr(reply, 'ErrorMessage', 'Unknown error')
self.logger.error(f"Tare failed: {error_msg}")
self._error_message = f"Tare failed: {error_msg}"
self._status = "Error"
return False
self.logger.info("Tare completed successfully")
self._status = "Connected"
self._error_message = ""
return True
except Exception as e:
self.logger.error(f"Tare operation failed: {e}")
self._error_message = str(e)
self._status = "Error"
return False
def zero(self, immediate: bool = False) -> bool:
"""Perform zero operation (清零)
Args:
immediate: Whether to perform immediate zero
Returns:
bool: True if successful, False otherwise
"""
try:
self._ensure_connected()
self._status = "Zeroing"
self.logger.info(f"Performing zero (immediate={immediate})...")
reply = self.weighing_svc.Zero(self.session_id, immediate)
if reply.Outcome != Outcome.SUCCESS.value:
error_msg = getattr(reply, 'ErrorMessage', 'Unknown error')
self.logger.error(f"Zero failed: {error_msg}")
self._error_message = f"Zero failed: {error_msg}"
self._status = "Error"
return False
self.logger.info("Zero completed successfully")
self._status = "Connected"
self._error_message = ""
return True
except Exception as e:
self.logger.error(f"Zero operation failed: {e}")
self._error_message = str(e)
self._status = "Error"
return False
def get_weight(self) -> float:
"""Get current weight reading (读数)
Returns:
float: Weight value
"""
try:
self._ensure_connected()
self._status = "Reading"
self.logger.info("Getting weight...")
reply = self.weighing_svc.GetWeight(self.session_id)
if reply.Outcome != Outcome.SUCCESS.value:
error_msg = getattr(reply, 'ErrorMessage', 'Unknown error')
self.logger.error(f"GetWeight failed: {error_msg}")
self._error_message = f"GetWeight failed: {error_msg}"
self._status = "Error"
return 0.0
# Handle different response structures
if hasattr(reply, 'WeightSample'):
# Handle WeightSample structure (most common for XPR)
weight_sample = reply.WeightSample
if hasattr(weight_sample, 'NetWeight'):
weight_val = float(Decimal(weight_sample.NetWeight.Value))
weight_unit = weight_sample.NetWeight.Unit
elif hasattr(weight_sample, 'GrossWeight'):
weight_val = float(Decimal(weight_sample.GrossWeight.Value))
weight_unit = weight_sample.GrossWeight.Unit
else:
weight_val = 0.0
weight_unit = 'g'
is_stable = getattr(weight_sample, 'Stable', True)
elif hasattr(reply, 'Weight'):
weight_val = float(Decimal(reply.Weight.Value))
weight_unit = reply.Weight.Unit
is_stable = getattr(reply.Weight, 'IsStable', True)
elif hasattr(reply, 'Value'):
weight_val = float(Decimal(reply.Value))
weight_unit = getattr(reply, 'Unit', 'g')
is_stable = getattr(reply, 'IsStable', True)
else:
# Try to extract from reply attributes
weight_val = float(Decimal(getattr(reply, 'WeightValue', getattr(reply, 'Value', '0'))))
weight_unit = getattr(reply, 'WeightUnit', getattr(reply, 'Unit', 'g'))
is_stable = getattr(reply, 'IsStable', True)
# Convert to grams for consistent output (ROS2 requirement)
if weight_unit.lower() in ['milligram', 'mg']:
weight_val_grams = weight_val / 1000.0
elif weight_unit.lower() in ['kilogram', 'kg']:
weight_val_grams = weight_val * 1000.0
elif weight_unit.lower() in ['gram', 'g']:
weight_val_grams = weight_val
else:
# Default to assuming grams if unit is unknown
weight_val_grams = weight_val
self.logger.warning(f"Unknown weight unit: {weight_unit}, assuming grams")
# Update internal state (keep original values for reference)
self._last_weight = weight_val
self._last_unit = weight_unit
self._is_stable = is_stable
self.logger.info(f"Weight: {weight_val_grams} g (original: {weight_val} {weight_unit})")
self._status = "Connected"
self._error_message = ""
return weight_val_grams
except Exception as e:
self.logger.error(f"Get weight failed: {e}")
self._error_message = str(e)
self._status = "Error"
return 0.0
def get_weight_with_unit(self) -> Tuple[float, str]:
"""Get current weight reading with unit (读数含单位)
Returns:
Tuple[float, str]: Weight value and unit
"""
try:
self._ensure_connected()
self._status = "Reading"
self.logger.info("Getting weight with unit...")
reply = self.weighing_svc.GetWeight(self.session_id)
if reply.Outcome != Outcome.SUCCESS.value:
error_msg = getattr(reply, 'ErrorMessage', 'Unknown error')
self.logger.error(f"GetWeight failed: {error_msg}")
self._error_message = f"GetWeight failed: {error_msg}"
self._status = "Error"
return 0.0, ""
# Handle different response structures
if hasattr(reply, 'WeightSample'):
# Handle WeightSample structure (most common for XPR)
weight_sample = reply.WeightSample
if hasattr(weight_sample, 'NetWeight'):
weight_val = float(Decimal(weight_sample.NetWeight.Value))
weight_unit = weight_sample.NetWeight.Unit
elif hasattr(weight_sample, 'GrossWeight'):
weight_val = float(Decimal(weight_sample.GrossWeight.Value))
weight_unit = weight_sample.GrossWeight.Unit
else:
weight_val = 0.0
weight_unit = 'g'
is_stable = getattr(weight_sample, 'Stable', True)
elif hasattr(reply, 'Weight'):
weight_val = float(Decimal(reply.Weight.Value))
weight_unit = reply.Weight.Unit
is_stable = getattr(reply.Weight, 'IsStable', True)
elif hasattr(reply, 'Value'):
weight_val = float(Decimal(reply.Value))
weight_unit = getattr(reply, 'Unit', 'g')
is_stable = getattr(reply, 'IsStable', True)
else:
# Try to extract from reply attributes
weight_val = float(Decimal(getattr(reply, 'WeightValue', getattr(reply, 'Value', '0'))))
weight_unit = getattr(reply, 'WeightUnit', getattr(reply, 'Unit', 'g'))
is_stable = getattr(reply, 'IsStable', True)
# Update internal state
self._last_weight = weight_val
self._last_unit = weight_unit
self._is_stable = is_stable
self.logger.info(f"Weight: {weight_val} {weight_unit}")
self._status = "Connected"
self._error_message = ""
return weight_val, weight_unit
except Exception as e:
self.logger.error(f"Get weight with unit failed: {e}")
self._error_message = str(e)
self._status = "Error"
return 0.0, ""
def send_cmd(self, command: str) -> dict:
"""ROS2 SendCmd action handler
Args:
command: JSON string containing command and parameters
Returns:
dict: Result containing success status and return_info
"""
return self.execute_command_from_outer(command)
def execute_command_from_outer(self, command: str) -> dict:
"""Execute command from ROS2 SendCmd action
Args:
command: JSON string containing command and parameters
Returns:
dict: Result containing success status and return_info
"""
try:
import json
# Parse JSON command
cmd_data = json.loads(command.replace("'", '"').replace("False", "false").replace("True", "true"))
# Extract command name and parameters
cmd_name = cmd_data.get('command', '')
params = cmd_data.get('params', {})
self.logger.info(f"Executing command: {cmd_name} with params: {params}")
# Execute different commands
if cmd_name == 'tare':
immediate = params.get('immediate', False)
success = self.tare(immediate)
result = {
'success': success,
'return_info': f"Tare operation {'successful' if success else 'failed'}"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
self.return_info = result['return_info']
return result
elif cmd_name == 'zero':
immediate = params.get('immediate', False)
success = self.zero(immediate)
result = {
'success': success,
'return_info': f"Zero operation {'successful' if success else 'failed'}"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
self.return_info = result['return_info']
return result
elif cmd_name == 'read' or cmd_name == 'get_weight':
try:
self.logger.info(f"Executing {cmd_name} command via ROS2...")
self.logger.info(f"Current status: {self._status}")
# Use get_weight to get weight value (returns float in grams)
weight_grams = self.get_weight()
self.logger.info(f"get_weight() returned: {weight_grams} g")
# Get the original weight and unit for display
original_weight = getattr(self, '_last_weight', weight_grams)
original_unit = getattr(self, '_last_unit', 'g')
self.logger.info(f"Original reading: {original_weight} {original_unit}")
result = {
'success': True,
'return_info': f"Weight: {original_weight} {original_unit}"
}
except Exception as e:
self.logger.error(f"Exception in {cmd_name}: {str(e)}")
self.logger.error(f"Exception type: {type(e).__name__}")
import traceback
self.logger.error(f"Traceback: {traceback.format_exc()}")
result = {
'success': False,
'return_info': f"Failed to read weight: {str(e)}"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
self.return_info = result['return_info']
return result
else:
result = {
'success': False,
'return_info': f"Unknown command: {cmd_name}. Available commands: tare, zero, read"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
self.return_info = result['return_info']
return result
except json.JSONDecodeError as e:
self.logger.error(f"JSON parsing failed: {e}")
result = {
'success': False,
'return_info': f"JSON parsing failed: {str(e)}"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
self.return_info = result['return_info']
return result
except Exception as e:
self.logger.error(f"Command execution failed: {e}")
result = {
'success': False,
'return_info': f"Command execution failed: {str(e)}"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
self.return_info = result['return_info']
return result
def __del__(self):
"""Cleanup when object is destroyed"""
self.disconnect()
if __name__ == "__main__":
# Test the driver
import argparse
parser = argparse.ArgumentParser(description="Mettler Toledo XPR Balance Driver Test")
parser.add_argument("--ip", default="192.168.1.10", help="Balance IP address")
parser.add_argument("--port", type=int, default=81, help="Balance port")
parser.add_argument("--password", default="123456", help="Balance password")
parser.add_argument("action", choices=["tare", "zero", "read"],
nargs="?", default="read", help="Action to perform")
parser.add_argument("--immediate", action="store_true", help="Use immediate mode")
args = parser.parse_args()
# Setup logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s")
# Create driver instance
balance = MettlerToledoXPR(ip=args.ip, port=args.port, password=args.password)
try:
if args.action == "tare":
success = balance.tare(args.immediate)
print(f"Tare {'successful' if success else 'failed'}")
elif args.action == "zero":
success = balance.zero(args.immediate)
print(f"Zero {'successful' if success else 'failed'}")
else: # read
# Perform tare first, then read weight
if balance.tare(args.immediate):
weight, unit = balance.get_weight_with_unit()
print(f"Weight: {weight} {unit}")
else:
print("Tare operation failed, cannot read weight")
finally:
balance.disconnect()