Merge pull request #2 from Andy6M/feature/xprbalance-zhida

balance
This commit is contained in:
Xie Qiming
2025-08-08 22:55:16 +08:00
committed by GitHub
2 changed files with 29 additions and 125 deletions

View File

@@ -26,17 +26,7 @@ The Mettler Toledo XPR/XSR balance driver supports operations through ROS2 actio
- **输入 / Input**: 无参数 / No parameters
- **输出 / Output**: `{"return_info": str, "success": bool}` - 包含重量信息 / Contains weight information
### 4. 带去皮读取重量 / Read with Tare (`read_with_tare`)
- **功能 / Function**: 先去皮再读取重量 / Tare first then read weight
- **输入 / Input**: 无参数 / No parameters
- **输出 / Output**: `{"return_info": str, "success": bool}` - 包含去皮后的重量信息 / Contains weight information after taring
### 5. 断开连接 / Disconnect (`disconnect`)
- **功能 / Function**: 断开与天平的连接 / Disconnect from the balance
- **输入 / Input**: 无参数 / No parameters
- **输出 / Output**: `{"return_info": str, "success": bool}` - 断开连接结果 / Disconnection result
## 使用方法 / Usage Methods
@@ -66,29 +56,31 @@ ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/Sen
}"
```
或者使用别名 / Or use alias:
### 4. 推荐的去皮读取流程 / Recommended Tare and Read Workflow
**步骤1: 去皮操作 / Step 1: Tare Operation**
```bash
# 放置空容器后执行去皮 / Execute tare after placing empty container
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"get_weight\"}'
command: '{\"command\": \"tare\", \"params\": {\"immediate\": false}}'
}"
```
### 4. 带去皮读取重量 / Read with Tare
**步骤2: 读取净重 / Step 2: Read Net Weight**
```bash
# 添加物质后读取净重 / Read net weight after adding substance
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"read_with_tare\"}'
command: '{\"command\": \"read\"}'
}"
```
### 5. 断开连接 / Disconnect
**优势 / Advantages**:
- 可以在去皮和读取之间进行确认 / Can confirm between taring and reading
- 更好的错误处理和调试 / Better error handling and debugging
- 操作流程更加清晰 / Clearer operation workflow
```bash
ros2 action send_goal /devices/BALANCE_STATION/send_cmd unilabos_msgs/action/SendCmd "{
command: '{\"command\": \"disconnect\"}'
}"
```
## 命令格式说明 / Command Format Description
@@ -182,13 +174,8 @@ class BalanceController(Node):
"""读取重量 / Read weight"""
return self.send_command('read')
def read_with_tare(self):
"""带去皮读取重量 / Read weight with tare"""
return self.send_command('read_with_tare')
def disconnect_balance(self):
"""断开连接 / Disconnect"""
return self.send_command('disconnect')
# 使用示例 / Usage Example
def main():

View File

@@ -135,28 +135,10 @@ class MettlerToledoXPR(UniversalDriver):
def _render_wsdl(self) -> Path:
"""Render WSDL template with current connection parameters"""
if not self.wsdl_template.exists():
error_msg = (
f"WSDL file not found: {self.wsdl_template}\n\n"
"IMPORTANT: You need to obtain the official WSDL file from Mettler Toledo.\n"
"Please follow these steps:\n"
"1. Contact Mettler Toledo support to get the WSDL file\n"
"2. Place it in the driver directory as 'MT.Laboratory.Balance.XprXsr.V03.wsdl'\n"
"3. Ensure it contains Jinja2 template variables: {{host}}, {{port}}, {{api_path}}\n\n"
"For detailed instructions, see the README.md file in the driver directory."
)
raise FileNotFoundError(error_msg)
raise FileNotFoundError(f"WSDL template not found: {self.wsdl_template}")
try:
text = Template(self.wsdl_template.read_text(encoding="utf-8")).render(
host=self.ip, port=self.port, api_path=self.api_path)
except Exception as e:
error_msg = (
f"Failed to render WSDL template: {e}\n\n"
"This usually means the WSDL file doesn't contain the required template variables.\n"
"Please ensure your WSDL file contains: {{host}}, {{port}}, {{api_path}}\n"
"See README.md for detailed configuration instructions."
)
raise RuntimeError(error_msg) from e
text = Template(self.wsdl_template.read_text(encoding="utf-8")).render(
host=self.ip, port=self.port, api_path=self.api_path)
wsdl_path = self.wsdl_template.parent / f"rendered_{self.ip}_{self.port}.wsdl"
wsdl_path.write_text(text, encoding="utf-8")
@@ -419,52 +401,9 @@ class MettlerToledoXPR(UniversalDriver):
self._status = "Error"
return 0.0, ""
def read_with_tare(self, immediate_tare: bool = True) -> Tuple[float, str]:
"""Perform tare then read weight (standard read operation)
Args:
immediate_tare: Whether to use immediate tare
Returns:
Tuple[float, str]: Weight value and unit
"""
try:
# Try immediate tare first
if not self.tare(immediate_tare):
# If immediate tare fails and it's an LFT balance, try normal tare
if immediate_tare and "Tare immediate cannot be executed" in self._error_message:
self.logger.warning("LFT balance doesn't support immediate tare, using normal tare")
if not self.tare(False):
return 0.0, ""
else:
return 0.0, ""
# Small delay to ensure tare is complete
time.sleep(0.5)
# Get weight
return self.get_weight_with_unit()
except Exception as e:
self.logger.error(f"Read with tare failed: {e}")
self._error_message = str(e)
self._status = "Error"
return 0.0, ""
def disconnect(self):
"""Disconnect from the balance"""
try:
if self.session_svc and self.session_id:
self.session_svc.CloseSession(self.session_id)
self.logger.info("Session closed")
except Exception as e:
self.logger.warning(f"Error closing session: {e}")
finally:
self.session_id = None
self.session_svc = None
self.weighing_svc = None
self.client = None
self._status = "Disconnected"
def send_cmd(self, command: str) -> dict:
"""ROS2 SendCmd action handler
@@ -554,38 +493,12 @@ class MettlerToledoXPR(UniversalDriver):
self.return_info = result['return_info']
return result
elif cmd_name == 'read_with_tare':
try:
weight, unit = self.read_with_tare()
result = {
'success': True,
'return_info': f"Weight with tare: {weight} {unit}"
}
except Exception as e:
result = {
'success': False,
'return_info': f"Failed to read weight with tare: {str(e)}"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
self.return_info = result['return_info']
return result
elif cmd_name == 'disconnect':
self.disconnect()
result = {
'success': True,
'return_info': "Disconnect successful"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
self.return_info = result['return_info']
return result
else:
result = {
'success': False,
'return_info': f"Unknown command: {cmd_name}. Available commands: tare, zero, read, read_with_tare, disconnect"
'return_info': f"Unknown command: {cmd_name}. Available commands: tare, zero, read"
}
# Update instance attributes for ROS2 action system
self.success = result['success']
@@ -647,8 +560,12 @@ if __name__ == "__main__":
success = balance.zero(args.immediate)
print(f"Zero {'successful' if success else 'failed'}")
else: # read
weight, unit = balance.read_with_tare()
print(f"Weight: {weight} {unit}")
# Perform tare first, then read weight
if balance.tare(args.immediate):
weight, unit = balance.get_weight_with_unit()
print(f"Weight: {weight} {unit}")
else:
print("Tare operation failed, cannot read weight")
finally:
balance.disconnect()