7 Commits

17 changed files with 120 additions and 2543 deletions

View File

@@ -90,48 +90,9 @@
}
},
"data": {}
},
{
"id": "NewareTester",
"name": "新威电池测试系统",
"parent": null,
"children": [],
"type": "device",
"class": "neware_battery_test_system",
"config": {
"ip": "127.0.0.1",
"port": 502,
"machine_id": 1,
"devtype": "27",
"timeout": 20,
"size_x": 500.0,
"size_y": 500.0,
"size_z": 2000.0
},
"position": {
"size": {
"height": 1600,
"width": 1200,
"depth": 800
},
"position": {
"x": 1500,
"y": 0,
"z": 0
}
},
"data": {
"功能说明": "新威电池测试系统提供720通道监控和CSV批量提交功能",
"监控功能": "支持720个通道的实时状态监控、2盘电池物料管理、状态导出等",
"提交功能": "通过submit_from_csv action从CSV文件批量提交测试任务"
}
}
],
"links": []
}

View File

@@ -0,0 +1,29 @@
{
"nodes": [
{
"id": "NEWARE_BATTERY_TEST_SYSTEM",
"name": "Neware Battery Test System",
"parent": null,
"type": "device",
"class": "neware_battery_test_system",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"ip": "127.0.0.1",
"port": 502,
"machine_id": 1,
"devtype": "27",
"timeout": 20,
"size_x": 500.0,
"size_y": 500.0,
"size_z": 2000.0
},
"data": {},
"children": []
}
],
"links": []
}

View File

@@ -13,8 +13,6 @@
- 状态类型: working/stop/finish/protect/pause/false/unknown
"""
import os
import sys
import socket
import xml.etree.ElementTree as ET
import json
@@ -23,6 +21,7 @@ from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict
from pylabrobot.resources import ResourceHolder, Coordinate, create_ordered_items_2d, Deck, Plate
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode
@@ -57,6 +56,13 @@ class BatteryTestPositionState(TypedDict):
status: str # 通道状态
color: str # 状态对应颜色
# 额外的inquire协议字段
relativetime: float # 相对时间 (s)
open_or_close: int # 0=关闭, 1=打开
step_type: str # 步骤类型
cycle_id: int # 循环ID
step_id: int # 步骤ID
log_code: str # 日志代码
class BatteryTestPosition(ResourceHolder):
@@ -136,9 +142,9 @@ class NewareBatteryTestSystem:
devtype: str = None,
timeout: int = None,
size_x: float = 50,
size_y: float = 50,
size_z: float = 20,
size_x: float = 500.0,
size_y: float = 500.0,
size_z: float = 2000.0,
):
"""
初始化新威电池测试系统
@@ -156,12 +162,6 @@ class NewareBatteryTestSystem:
self.machine_id = machine_id
self.devtype = devtype or self.DEVTYPE
self.timeout = timeout or self.TIMEOUT
# 存储设备物理尺寸
self.size_x = size_x
self.size_y = size_y
self.size_z = size_z
self._last_status_update = None
self._cached_status = {}
self._ros_node: Optional[ROS2WorkstationNode] = None # ROS节点引用由框架设置
@@ -192,9 +192,8 @@ class NewareBatteryTestSystem:
def _setup_material_management(self):
"""设置物料管理系统"""
# 第1盘5行8列网格 (A1-E8) - 5行对应subdevid 1-58列对应chlid 1-8
# 先给物料设置一个最大的Deck,并设置其在空间中的位置
deck_main = Deck("ADeckName", 2000, 1800, 100, origin=Coordinate(2000,2000,0))
# 先给物料设置一个最大的Deck
deck_main = Deck("ADeckName", 200, 200, 200)
plate1_resources: Dict[str, BatteryTestPosition] = create_ordered_items_2d(
BatteryTestPosition,
@@ -203,8 +202,8 @@ class NewareBatteryTestSystem:
dx=10,
dy=10,
dz=0,
item_dx=65,
item_dy=65
item_dx=45,
item_dy=45
)
plate1 = Plate("P1", 400, 300, 50, ordered_items=plate1_resources)
deck_main.assign_child_resource(plate1, location=Coordinate(0, 0, 0))
@@ -233,15 +232,11 @@ class NewareBatteryTestSystem:
num_items_y=5, # 5行对应subdevid 6-10即A-E
dx=10,
dy=10,
dz=0,
dz=100, # Z轴偏移100mm
item_dx=65,
item_dy=65
)
plate2 = Plate("P2", 400, 300, 50, ordered_items=plate2_resources)
deck_main.assign_child_resource(plate2, location=Coordinate(0, 350, 0))
# 为第2盘资源添加P2_前缀
self.station_resources_plate2 = {}
for name, resource in plate2_resources.items():
@@ -311,132 +306,55 @@ class NewareBatteryTestSystem:
def _update_plate_resources(self, subunits: Dict):
"""更新两盘电池资源的状态"""
# 第1盘subdevid 1-5 映射到 8列5行网格 (列0-7, 行0-4)
# 第1盘subdevid 1-5 映射到 P1_A1-P1_E8 (5行8列)
for subdev_id in range(1, 6): # subdevid 1-5
status_row = subunits.get(subdev_id, {})
for chl_id in range(1, 9): # chlid 1-8
try:
# 根据用户描述:第一个是(0,0),最后一个是(7,4)
# 说明是8列5行列从0开始行从0开始
col_idx = (chl_id - 1) # 0-7 (chlid 1-8 -> 列0-7)
row_idx = (subdev_id - 1) # 0-4 (subdevid 1-5 -> 行0-4)
# 尝试多种可能的资源命名格式
possible_names = [
f"P1_batterytestposition_{col_idx}_{row_idx}", # 用户提到的格式
f"P1_{self.LETTERS[row_idx]}{col_idx + 1}", # 原有的A1-E8格式
f"P1_{self.LETTERS[row_idx].lower()}{col_idx + 1}", # 小写字母格式
]
r = None
resource_name = None
for name in possible_names:
if name in self.station_resources:
r = self.station_resources[name]
resource_name = name
break
# 计算在5×8网格中的位置
row_idx = (subdev_id - 1) # 0-4 (对应A-E)
col_idx = (chl_id - 1) # 0-7 (对应1-8)
resource_name = f"P1_{self.LETTERS[row_idx]}{col_idx + 1}"
r = self.station_resources.get(resource_name)
if r:
status_channel = status_row.get(chl_id, {})
metrics = status_channel.get("metrics", {})
# 构建BatteryTestPosition状态数据移除capacity和energy
channel_state = {
# 基本测量数据
"voltage": metrics.get("voltage_V", 0.0),
"current": metrics.get("current_A", 0.0),
"time": metrics.get("totaltime_s", 0.0),
# 状态信息
"status": status_channel.get("state", "unknown"),
"color": status_channel.get("color", self.STATUS_COLOR["unknown"]),
# 通道名称标识
"Channel_Name": f"{self.machine_id}-{subdev_id}-{chl_id}",
"voltage": status_channel.get("voltage_V", 0.0),
"current": status_channel.get("current_A", 0.0),
"time": status_channel.get("totaltime_s", 0.0),
}
r.load_state(channel_state)
# 调试信息
if self._ros_node and hasattr(self._ros_node, 'lab_logger'):
self._ros_node.lab_logger().debug(
f"更新P1资源状态: {resource_name} <- subdev{subdev_id}/chl{chl_id} "
f"状态:{channel_state['status']}"
)
else:
# 如果找不到资源,记录调试信息
if self._ros_node and hasattr(self._ros_node, 'lab_logger'):
self._ros_node.lab_logger().debug(
f"P1未找到资源: subdev{subdev_id}/chl{chl_id} -> 尝试的名称: {possible_names}"
)
except (KeyError, IndexError) as e:
if self._ros_node and hasattr(self._ros_node, 'lab_logger'):
self._ros_node.lab_logger().debug(f"P1映射错误: subdev{subdev_id}/chl{chl_id} - {e}")
except (KeyError, IndexError):
continue
# 第2盘subdevid 6-10 映射到 8列5行网格 (列0-7, 行0-4)
# 第2盘subdevid 6-10 映射到 P2_A1-P2_E8 (5行8列)
for subdev_id in range(6, 11): # subdevid 6-10
status_row = subunits.get(subdev_id, {})
for chl_id in range(1, 9): # chlid 1-8
try:
col_idx = (chl_id - 1) # 0-7 (chlid 1-8 -> 列0-7)
row_idx = (subdev_id - 6) # 0-4 (subdevid 6-10 -> 行0-4)
# 尝试多种可能的资源命名格式
possible_names = [
f"P2_batterytestposition_{col_idx}_{row_idx}", # 用户提到的格式
f"P2_{self.LETTERS[row_idx]}{col_idx + 1}", # 原有的A1-E8格式
f"P2_{self.LETTERS[row_idx].lower()}{col_idx + 1}", # 小写字母格式
]
r = None
resource_name = None
for name in possible_names:
if name in self.station_resources:
r = self.station_resources[name]
resource_name = name
break
# 计算在5×8网格中的位置
row_idx = (subdev_id - 6) # 0-4 (subdevid 6->0, 7->1, ..., 10->4) (对应A-E)
col_idx = (chl_id - 1) # 0-7 (对应1-8)
resource_name = f"P2_{self.LETTERS[row_idx]}{col_idx + 1}"
r = self.station_resources.get(resource_name)
if r:
status_channel = status_row.get(chl_id, {})
metrics = status_channel.get("metrics", {})
# 构建BatteryTestPosition状态数据移除capacity和energy
channel_state = {
# 基本测量数据
"voltage": metrics.get("voltage_V", 0.0),
"current": metrics.get("current_A", 0.0),
"time": metrics.get("totaltime_s", 0.0),
# 状态信息
"status": status_channel.get("state", "unknown"),
"color": status_channel.get("color", self.STATUS_COLOR["unknown"]),
# 通道名称标识
"Channel_Name": f"{self.machine_id}-{subdev_id}-{chl_id}",
"voltage": status_channel.get("voltage_V", 0.0),
"current": status_channel.get("current_A", 0.0),
"time": status_channel.get("totaltime_s", 0.0),
}
r.load_state(channel_state)
# 调试信息
if self._ros_node and hasattr(self._ros_node, 'lab_logger'):
self._ros_node.lab_logger().debug(
f"更新P2资源状态: {resource_name} <- subdev{subdev_id}/chl{chl_id} "
f"状态:{channel_state['status']}"
)
else:
# 如果找不到资源,记录调试信息
if self._ros_node and hasattr(self._ros_node, 'lab_logger'):
self._ros_node.lab_logger().debug(
f"P2未找到资源: subdev{subdev_id}/chl{chl_id} -> 尝试的名称: {possible_names}"
)
except (KeyError, IndexError) as e:
if self._ros_node and hasattr(self._ros_node, 'lab_logger'):
self._ros_node.lab_logger().debug(f"P2映射错误: subdev{subdev_id}/chl{chl_id} - {e}")
except (KeyError, IndexError):
continue
ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{
"resources": list(self.station_resources.values())
})
@property
def connection_info(self) -> Dict[str, str]:
@@ -572,45 +490,6 @@ class NewareBatteryTestSystem:
def debug_resource_names(self) -> dict:
"""
调试方法显示所有资源的实际名称ROS2动作
Returns:
dict: ROS2动作结果格式包含所有资源名称信息
"""
try:
debug_info = {
"total_resources": len(self.station_resources),
"plate1_resources": len(self.station_resources_plate1),
"plate2_resources": len(self.station_resources_plate2),
"plate1_names": list(self.station_resources_plate1.keys())[:10], # 显示前10个
"plate2_names": list(self.station_resources_plate2.keys())[:10], # 显示前10个
"all_resource_names": list(self.station_resources.keys())[:20], # 显示前20个
}
# 检查是否有用户提到的命名格式
batterytestposition_names = [name for name in self.station_resources.keys()
if "batterytestposition" in name]
debug_info["batterytestposition_names"] = batterytestposition_names[:10]
success_msg = f"资源调试信息获取成功,共{debug_info['total_resources']}个资源"
if self._ros_node:
self._ros_node.lab_logger().info(success_msg)
self._ros_node.lab_logger().info(f"调试信息: {debug_info}")
return {
"return_info": success_msg,
"success": True,
"debug_data": debug_info
}
except Exception as e:
error_msg = f"获取资源调试信息失败: {str(e)}"
if self._ros_node:
self._ros_node.lab_logger().error(error_msg)
return {"return_info": error_msg, "success": False}
# ========================
# 辅助方法
# ========================
@@ -659,228 +538,6 @@ class NewareBatteryTestSystem:
except Exception as e:
print(f" 获取状态失败: {e}")
# ========================
# CSV批量提交功能新增
# ========================
def _ensure_local_import_path(self):
"""确保本地模块导入路径"""
base_dir = os.path.dirname(__file__)
if base_dir not in sys.path:
sys.path.insert(0, base_dir)
def _canon(self, bs: str) -> str:
"""规范化电池体系名称"""
return str(bs).strip().replace('-', '_').upper()
def _compute_values(self, row):
"""
计算活性物质质量和容量
Args:
row: DataFrame行数据
Returns:
tuple: (活性物质质量mg, 容量mAh)
"""
pw = float(row['Pole_Weight'])
cm = float(row['集流体质量'])
am = row['活性物质含量']
if isinstance(am, str) and am.endswith('%'):
amv = float(am.rstrip('%')) / 100.0
else:
amv = float(am)
act_mass = (pw - cm) * amv
sc = float(row['克容量mah/g'])
cap = act_mass * sc / 1000.0
return round(act_mass, 2), round(cap, 3)
def _get_xml_builder(self, gen_mod, key: str):
"""
获取对应电池体系的XML生成函数
Args:
gen_mod: generate_xml_content模块
key: 电池体系标识
Returns:
callable: XML生成函数
"""
fmap = {
'LB6': gen_mod.xml_LB6,
'GR_LI': gen_mod.xml_Gr_Li,
'LFP_LI': gen_mod.xml_LFP_Li,
'LFP_GR': gen_mod.xml_LFP_Gr,
'811_LI_002': gen_mod.xml_811_Li_002,
'811_LI_005': gen_mod.xml_811_Li_005,
'SIGR_LI_STEP': gen_mod.xml_SiGr_Li_Step,
'SIGR_LI': gen_mod.xml_SiGr_Li_Step,
'811_SIGR': gen_mod.xml_811_SiGr,
}
if key not in fmap:
raise ValueError(f"未定义电池体系映射: {key}")
return fmap[key]
def _save_xml(self, xml: str, path: str):
"""
保存XML文件
Args:
xml: XML内容
path: 文件路径
"""
with open(path, 'w', encoding='utf-8') as f:
f.write(xml)
def submit_from_csv(self, csv_path: str, output_dir: str = ".") -> dict:
"""
从CSV文件批量提交Neware测试任务设备动作
Args:
csv_path (str): 输入CSV文件路径
output_dir (str): 输出目录用于存储XML文件和备份默认当前目录
Returns:
dict: 执行结果 {"return_info": str, "success": bool, "submitted_count": int}
"""
try:
# 确保可以导入本地模块
self._ensure_local_import_path()
import pandas as pd
import generate_xml_content as gen_mod
from neware_driver import start_test
if self._ros_node:
self._ros_node.lab_logger().info(f"开始从CSV文件提交任务: {csv_path}")
# 读取CSV文件
if not os.path.exists(csv_path):
error_msg = f"CSV文件不存在: {csv_path}"
if self._ros_node:
self._ros_node.lab_logger().error(error_msg)
return {"return_info": error_msg, "success": False, "submitted_count": 0}
df = pd.read_csv(csv_path, encoding='gbk')
# 验证必需列
required = [
'Battery_Code', 'Pole_Weight', '集流体质量', '活性物质含量',
'克容量mah/g', '电池体系', '设备号', '排号', '通道号'
]
missing = [c for c in required if c not in df.columns]
if missing:
error_msg = f"CSV缺少必需列: {missing}"
if self._ros_node:
self._ros_node.lab_logger().error(error_msg)
return {"return_info": error_msg, "success": False, "submitted_count": 0}
# 创建输出目录
xml_dir = os.path.join(output_dir, 'xml_dir')
backup_dir = os.path.join(output_dir, 'backup_dir')
os.makedirs(xml_dir, exist_ok=True)
os.makedirs(backup_dir, exist_ok=True)
if self._ros_node:
self._ros_node.lab_logger().info(
f"输出目录: XML={xml_dir}, 备份={backup_dir}"
)
# 逐行处理CSV数据
submitted_count = 0
results = []
for idx, row in df.iterrows():
try:
coin_id = str(row['Battery_Code'])
# 计算活性物质质量和容量
act_mass, cap_mAh = self._compute_values(row)
if cap_mAh < 0:
error_msg = (
f"容量为负数: Battery_Code={coin_id}, "
f"活性物质质量mg={act_mass}, 容量mah={cap_mAh}"
)
if self._ros_node:
self._ros_node.lab_logger().warning(error_msg)
results.append(f"{idx+1} 失败: {error_msg}")
continue
# 获取电池体系对应的XML生成函数
key = self._canon(row['电池体系'])
builder = self._get_xml_builder(gen_mod, key)
# 生成XML内容
xml_content = builder(act_mass, cap_mAh)
# 获取设备信息
devid = int(row['设备号'])
subdevid = int(row['排号'])
chlid = int(row['通道号'])
# 保存XML文件
recipe_path = os.path.join(
xml_dir,
f"{coin_id}_{devid}_{subdevid}_{chlid}.xml"
)
self._save_xml(xml_content, recipe_path)
# 提交测试任务
resp = start_test(
ip=self.ip,
port=self.port,
devid=devid,
subdevid=subdevid,
chlid=chlid,
CoinID=coin_id,
recipe_path=recipe_path,
backup_dir=backup_dir
)
submitted_count += 1
results.append(f"{idx+1} {coin_id}: {resp}")
if self._ros_node:
self._ros_node.lab_logger().info(
f"已提交 {coin_id} (设备{devid}-{subdevid}-{chlid}): {resp}"
)
except Exception as e:
error_msg = f"{idx+1} 处理失败: {str(e)}"
results.append(error_msg)
if self._ros_node:
self._ros_node.lab_logger().error(error_msg)
# 汇总结果
success_msg = (
f"批量提交完成: 成功{submitted_count}个,共{len(df)}行。"
f"\n详细结果:\n" + "\n".join(results)
)
if self._ros_node:
self._ros_node.lab_logger().info(
f"批量提交完成: 成功{submitted_count}/{len(df)}"
)
return {
"return_info": success_msg,
"success": True,
"submitted_count": submitted_count,
"total_count": len(df),
"results": results
}
except Exception as e:
error_msg = f"批量提交失败: {str(e)}"
if self._ros_node:
self._ros_node.lab_logger().error(error_msg)
return {
"return_info": error_msg,
"success": False,
"submitted_count": 0
}
def get_device_summary(self) -> dict:
"""
获取设备级别的摘要统计设备动作

View File

@@ -1,8 +0,0 @@
from .neware_battery_test_system import NewareBatteryTestSystem
from .neware_driver import build_start_command, start_test
__all__ = [
"NewareBatteryTestSystem",
"build_start_command",
"start_test",
]

View File

@@ -1,3 +0,0 @@
Timestamp,Battery_Count,Assembly_Time,Open_Circuit_Voltage,Pole_Weight,Assembly_Pressure,Battery_Code,Electrolyte_Code,<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>,<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʺ<EFBFBD><EFBFBD><EFBFBD>,<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>mah/g,<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϵ,<EFBFBD><EFBFBD><EFBFBD>,<EFBFBD>ź<EFBFBD>,ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
2025/10/29 17:32,7,5,0.11299999803304672,18.049999237060547,3593,Li000595,Si-Gr001,9.2,0.954,469,SiGr_Li,1,1,2
2025/10/30 17:49,2,5,0,13.109999895095825,4094,YS101224,NoRead88,5.2,0.92,190,SiGr_Li,2,1,1
1 Timestamp Battery_Count Assembly_Time Open_Circuit_Voltage Pole_Weight Assembly_Pressure Battery_Code Electrolyte_Code 集流体质量 活性物质含量 克容量mah/g 电池体系 设备号 排号 通道号
2 2025/10/29 17:32 7 5 0.11299999803304672 18.049999237060547 3593 Li000595 Si-Gr001 9.2 0.954 469 SiGr_Li 1 1 2
3 2025/10/30 17:49 2 5 0 13.109999895095825 4094 YS101224 NoRead88 5.2 0.92 190 SiGr_Li 2 1 1

View File

@@ -1,33 +0,0 @@
{
"nodes": [
{
"id": "NEWARE_BATTERY_TEST_SYSTEM",
"name": "Neware Battery Test System",
"parent": null,
"type": "device",
"class": "neware_battery_test_system",
"position": {
"x": 620.0,
"y": 200.0,
"z": 0
},
"config": {
"ip": "127.0.0.1",
"port": 502,
"machine_id": 1,
"devtype": "27",
"timeout": 20,
"size_x": 500.0,
"size_y": 500.0,
"size_z": 2000.0
},
"data": {
"功能说明": "新威电池测试系统提供720通道监控和CSV批量提交功能",
"监控功能": "支持720个通道的实时状态监控、2盘电池物料管理、状态导出等",
"提交功能": "通过submit_from_csv action从CSV文件批量提交测试任务。CSV必须包含: Battery_Code, Pole_Weight, 集流体质量, 活性物质含量, 克容量mah/g, 电池体系, 设备号, 排号, 通道号"
},
"children": []
}
],
"links": []
}

View File

@@ -1,49 +0,0 @@
import socket
END_MARKS = [b"\r\n#\r\n", b"</bts>"] # 读到任一标志即可判定完整响应
def build_start_command(devid, subdevid, chlid, CoinID,
ip_in_xml="127.0.0.1",
devtype:int=27,
recipe_path:str=f"D:\\HHM_test\\A001.xml",
backup_dir:str=f"D:\\HHM_test\\backup") -> str:
lines = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<bts version="1.0">',
' <cmd>start</cmd>',
' <list count="1">',
f' <start ip="{ip_in_xml}" devtype="{devtype}" devid="{devid}" subdevid="{subdevid}" chlid="{chlid}" barcode="{CoinID}">{recipe_path}</start>',
f' <backup backupdir="{backup_dir}" remotedir="" filenametype="1" customfilename="" createdirbydate="0" filetype="0" backupontime="1" backupontimeinterval="1" backupfree="0" />',
' </list>',
'</bts>',
]
# TCP 模式:请求必须以 #\r\n 结束(协议要求)
return "\r\n".join(lines) + "\r\n#\r\n"
def recv_until_marks(sock: socket.socket, timeout=60):
sock.settimeout(timeout) # 上限给足,协议允许到 30s:contentReference[oaicite:2]{index=2}
buf = bytearray()
while True:
chunk = sock.recv(8192)
if not chunk:
break
buf += chunk
# 读到结束标志就停,避免等对端断开
for m in END_MARKS:
if m in buf:
return bytes(buf)
# 保险:读到完整 XML 结束标签也停
if b"</bts>" in buf:
return bytes(buf)
return bytes(buf)
def start_test(ip="127.0.0.1", port=502, devid=3, subdevid=2, chlid=1, CoinID="A001", recipe_path=f"D:\\HHM_test\\A001.xml", backup_dir=f"D:\\HHM_test\\backup"):
xml_cmd = build_start_command(devid=devid, subdevid=subdevid, chlid=chlid, CoinID=CoinID, recipe_path=recipe_path, backup_dir=backup_dir)
#print(xml_cmd)
with socket.create_connection((ip, port), timeout=60) as s:
s.sendall(xml_cmd.encode("utf-8"))
data = recv_until_marks(s, timeout=60)
return data.decode("utf-8", errors="replace")
if __name__ == "__main__":
resp = start_test(ip="127.0.0.1", port=502, devid=4, subdevid=10, chlid=1, CoinID="A001", recipe_path=f"D:\\HHM_test\\A001.xml", backup_dir=f"D:\\HHM_test\\backup")
print(resp)

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from cgi import print_arguments
from doctest import debug
from typing import Dict, Any, List, Optional, Tuple
from typing import Dict, Any, List, Optional
import requests
from pylabrobot.resources.resource import Resource as ResourcePLR
from pathlib import Path
@@ -19,22 +19,8 @@ from unilabos.devices.workstation.bioyond_studio.config import (
)
from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService
from unilabos.resources.bioyond.decks import BIOYOND_YB_Deck
from unilabos.resources.graphio import resource_bioyond_to_plr
from unilabos.utils.log import logger
from unilabos.registry.registry import lab_registry
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
class device(BIOYOND_YB_Deck):
@classmethod
def deserialize(cls, data, allow_marshal=False): # type: ignore[override]
patched = dict(data)
if patched.get("type") == "device":
patched["type"] = "Deck"
if patched.get("category") == "device":
patched["category"] = "deck"
return super().deserialize(patched, allow_marshal=allow_marshal)
def _iso_local_now_ms() -> str:
# 文档要求:到毫秒 + Z例如 2025-08-15T05:43:22.814Z
@@ -54,14 +40,12 @@ class BioyondCellWorkstation(BioyondWorkstation):
def __init__(self, config: dict = None, deck=None, protocol_type=None, **kwargs):
# 使用统一配置,支持自定义覆盖, 从 config.py 加载完整配置
self.bioyond_config = {
self.bioyond_config ={
**API_CONFIG,
"material_type_mappings": MATERIAL_TYPE_MAPPINGS,
"warehouse_mapping": WAREHOUSE_MAPPING,
"debug_mode": False,
}
if config:
self.bioyond_config.update(config)
"debug_mode": False
}
# "material_type_mappings": MATERIAL_TYPE_MAPPINGS
# "warehouse_mapping": WAREHOUSE_MAPPING
@@ -72,12 +56,6 @@ class BioyondCellWorkstation(BioyondWorkstation):
self.http_service_started = self.debug_mode
self._device_id = "bioyond_cell_workstation" # 默认值后续会从_ros_node获取
super().__init__(bioyond_config=config, deck=deck)
self.transfer_target_device_id = self.bioyond_config.get("transfer_target_device_id", "BatteryStation")
self.transfer_target_parent = self.bioyond_config.get("transfer_target_parent", "YB_YH_Deck")
self.transfer_timeout = float(self.bioyond_config.get("transfer_timeout", 180.0))
self.coin_cell_workflow_config = self.bioyond_config.get("coin_cell_workflow_config", {})
self.pending_transfer_materials: List[Dict[str, Any]] = []
self.pending_transfer_plr: List[ResourcePLR] = []
self.update_push_ip() #直接修改奔耀端的报送ip地址
logger.info("已更新奔耀端推送 IP 地址")
@@ -346,7 +324,6 @@ class BioyondCellWorkstation(BioyondWorkstation):
"posX": int(row[2]), "posY": int(row[3]), "posZ": int(row[4]),
"materialName": str(row[5]).strip(),
"quantity": float(row[6]) if pd.notna(row[6]) else 0.0,
"temperature": 0,
})
# 四号手套箱原液瓶面
for _, row in df.iloc[14:23, 2:9].iterrows():
@@ -358,7 +335,6 @@ class BioyondCellWorkstation(BioyondWorkstation):
"quantity": float(row[6]) if pd.notna(row[6]) else 0.0,
"materialType": str(row[7]).strip() if pd.notna(row[7]) else "",
"targetWH": str(row[8]).strip() if pd.notna(row[8]) else "",
"temperature": 0,
})
# 三号手套箱人工堆栈
for _, row in df.iloc[25:40, 2:7].iterrows():
@@ -368,12 +344,11 @@ class BioyondCellWorkstation(BioyondWorkstation):
"posX": int(row[2]), "posY": int(row[3]), "posZ": int(row[4]),
"materialType": str(row[5]).strip() if pd.notna(row[5]) else "",
"materialId": str(row[6]).strip() if pd.notna(row[6]) else "",
"quantity": 1,
"temperature": 0,
"quantity": 1
})
else:
logger.warning(f"未找到 Excel 文件 {xlsx_path},自动切换到手动参数模式。")
# TODO: 温度下面手动模式没改,上面的改了
# ---------- 模式 2: 手动填写 ----------
if not items:
params = locals()
@@ -416,14 +391,10 @@ class BioyondCellWorkstation(BioyondWorkstation):
order_code = response.get("data", {}).get("orderCode")
if not order_code:
logger.error("上料任务未返回有效 orderCode")
return {"api_response": response, "order_finish": None}
# 等待完成报送
return response
# 等待完成报送
result = self.wait_for_order_finish(order_code)
return {
"api_response": response,
"order_finish": result,
"items": items,
}
return result
def auto_batch_outbound_from_xlsx(self, xlsx_path: str) -> Dict[str, Any]:
@@ -494,7 +465,7 @@ class BioyondCellWorkstation(BioyondWorkstation):
return response
# 2.14 新建实验
def create_orders(self, xlsx_path: str, *, material_filter: Optional[str] = None) -> Dict[str, Any]:
def create_orders(self, xlsx_path: str) -> Dict[str, Any]:
"""
从 Excel 解析并创建实验2.14
约定:
@@ -649,36 +620,9 @@ class BioyondCellWorkstation(BioyondWorkstation):
if not order_code:
logger.error("上料任务未返回有效 orderCode")
return response
# 等待完成报送
# 等待完成报送
result = self.wait_for_order_finish(order_code)
report_data = result.get("report") if isinstance(result, dict) else None
materials_from_report = (
report_data.get("usedMaterials") if isinstance(report_data, dict) else None
)
if materials_from_report:
materials = materials_from_report
logger.info(
"[create_orders] 使用订单完成报送中的物料信息: "
f"{len(materials)}"
)
else:
materials = self._fetch_bioyond_materials(filter_keyword=material_filter)
logger.info(
"[create_orders] 未收到订单报送物料信息,回退到实时查询"
)
print("materials_from_report:", materials_from_report)
# TODO: 需要将 materials 字典转换为 ResourceSlot 对象后才能转运
# self.transfer_resource_to_another(
# resource=[materials],
# mount_resource=["YB_YH_Deck"],
# sites=[None],
# mount_device_id="BatteryStation"
# )
return {
"api_response": response,
"order_finish": result,
"materials": materials,
}
return result
# 2.7 启动调度
def scheduler_start(self) -> Dict[str, Any]:
@@ -750,7 +694,6 @@ class BioyondCellWorkstation(BioyondWorkstation):
return response
# 等待完成报送
result = self.wait_for_order_finish(order_code)
return result
# 2.5 批量查询实验报告(post过滤关键字查询)
@@ -1219,221 +1162,28 @@ class BioyondCellWorkstation(BioyondWorkstation):
})
return final_result
def _fetch_bioyond_materials(
self,
*,
filter_keyword: Optional[str] = None,
type_mode: int = 2,
) -> List[Dict[str, Any]]:
query: Dict[str, Any] = {
"typeMode": type_mode,
"includeDetail": True,
}
if filter_keyword:
query["filter"] = filter_keyword
response = self._post_lims("/api/lims/storage/stock-material", query)
raw_materials = response.get("data")
if not isinstance(raw_materials, list):
raw_materials = []
try:
resource_bioyond_to_plr(
raw_materials,
type_mapping=self.bioyond_config.get("material_type_mappings", MATERIAL_TYPE_MAPPINGS),
deck=self.deck,
)
except Exception as exc:
logger.warning(f"转换奔曜物料到 PLR 失败: {exc}", exc_info=True)
return raw_materials
def _convert_materials_to_plr(self, materials: List[Dict[str, Any]]) -> List[ResourcePLR]:
try:
return resource_bioyond_to_plr(
deepcopy(materials),
type_mapping=self.bioyond_config.get("material_type_mappings", MATERIAL_TYPE_MAPPINGS),
deck=self.deck,
)
except Exception as exc:
logger.error(f"物料转换为 PLR 失败: {exc}", exc_info=True)
return []
def _wait_for_future(self, future, stage: str, timeout: Optional[float] = None):
if future is None:
return None
timeout = timeout or self.transfer_timeout
start = time.time()
while not future.done():
if (time.time() - start) > timeout:
raise TimeoutError(f"{stage} 超时 {timeout}s")
time.sleep(0.05)
return future.result()
def _register_plr_resources(self, resources: List[ResourcePLR]) -> None:
if not resources or not hasattr(self, "_ros_node") or self._ros_node is None:
return
future = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, resources=resources)
self._wait_for_future(future, "update_resource")
def _get_target_resource(self, name: str) -> ResourcePLR:
if not hasattr(self, "_ros_node") or self._ros_node is None:
raise RuntimeError("ROS 节点未初始化,无法获取资源")
resource = self._ros_node.resource_tracker.figure_resource({"name": name}, try_mode=False) # type: ignore
if resource is None:
raise ValueError(f"未找到目标资源: {name}")
return resource
def _allocate_sites(self, parent_resource: ResourcePLR, count: int) -> List[str]:
if not hasattr(parent_resource, "get_free_sites"):
raise ValueError(f"资源 {parent_resource} 不支持自动分配站位")
free_indices = list(parent_resource.get_free_sites())
if len(free_indices) < count:
raise ValueError(f"{parent_resource.name} 可用站位不足 (need {count}, have {len(free_indices)})")
ordering = list(getattr(parent_resource, "_ordering", {}).keys())
sites: List[str] = []
for idx in free_indices[:count]:
if ordering and idx < len(ordering):
sites.append(ordering[idx])
else:
sites.append(str(idx))
return sites
def _invoke_coin_cell_workflow(self, material_payload: List[Dict[str, Any]]) -> Any:
timeout = float(self.bioyond_config.get("coin_cell_workflow_timeout", 300.0))
workflow_payload: Dict[str, Any] = {}
if isinstance(self.coin_cell_workflow_config, dict):
workflow_payload.update(deepcopy(self.coin_cell_workflow_config))
workflow_payload["materials"] = deepcopy(material_payload)
return self._call_remote_device_method(
self.transfer_target_device_id,
"run_coin_cell_assembly_workflow",
timeout=timeout,
workflow_config=workflow_payload,
)
def _call_remote_device_method(
self,
device_id: str,
method: str,
*,
timeout: Optional[float] = None,
**kwargs,
) -> Any:
if not hasattr(self, "_ros_node") or self._ros_node is None:
raise RuntimeError("ROS 节点未初始化,无法调用远程设备")
if not device_id:
raise ValueError("device_id 不能为空")
if not method:
raise ValueError("method 不能为空")
timeout = timeout or self.transfer_timeout
payload = json.dumps(
{
"function_name": method,
"function_args": kwargs,
},
ensure_ascii=False,
)
future = ROS2DeviceNode.run_async_func(
self._ros_node.execute_single_action,
True,
device_id=device_id,
action_name="_execute_driver_command_async",
action_kwargs={"string": payload},
)
result = self._wait_for_future(future, f"{device_id}.{method}", timeout)
if hasattr(result, "return_info"):
try:
return json.loads(result.return_info)
except Exception:
return result.return_info
return result
def run_feeding_stage(self) -> Dict[str, Any]:
self.create_sample(
board_type="配液瓶(小)板",
bottle_type="配液瓶(小)",
location_code="B01",
name="配液瓶",
warehouse_name="手动堆栈"
)
self.create_sample(
board_type="5ml分液瓶板",
bottle_type="5ml分液瓶",
location_code="B02",
name="分液瓶",
warehouse_name="手动堆栈"
)
self.scheduler_start()
feeding_task = self.auto_feeding4to3(
xlsx_path="/Users/sml/work/Unilab/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/material_template.xlsx"
)
feeding_materials = self._fetch_bioyond_materials()
return {
"feeding_materials": feeding_materials,
"feeding_items": feeding_task.get("items", []),
"feeding_task": feeding_task,
}
def run_liquid_preparation_stage(
self,
feeding_materials: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, List[Dict[str, Any]]]:
result = self.create_orders(
xlsx_path="/Users/sml/work/Unilab/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/2025092701.xlsx"
)
filter_keyword = self.bioyond_config.get("mixing_material_filter") or None
materials = result.get("materials")
if materials is None:
materials = self._fetch_bioyond_materials(filter_keyword=filter_keyword)
return {
"feeding_materials": feeding_materials or [],
"liquid_materials": materials,
}
def run_transfer_stage(
self,
liquid_materials: Optional[List[Dict[str, Any]]] = None,
source_wh_id: Optional[str] = '3a19debc-84b4-0359-e2d4-b3beea49348b',
source_x: int = 1,
source_y: int = 1,
source_z: int = 1
) -> Dict[str, Any]:
"""转运阶段调用transfer_3_to_2_to_1执行3到2到1转运"""
logger.info("开始执行转运阶段 (run_transfer_stage)")
# 暂时注释掉物料转换和跨工站转运逻辑
# transfer_summary: Dict[str, Any] = {}
# try:
# source_materials = liquid_materials or self._fetch_bioyond_materials()
# transfer_plr = self._convert_materials_to_plr(source_materials)
# transfer_summary["plr_count"] = len(transfer_plr)
# ...
# except Exception as exc:
# transfer_summary["error"] = str(exc)
# logger.error(f"跨工站转运失败: {exc}", exc_info=True)
# 只执行核心的3到2到1转运
transfer_result = self.transfer_3_to_2_to_1(
source_wh_id=source_wh_id,
source_x=source_x,
source_y=source_y,
source_z=source_z
)
logger.info("转运阶段执行完成")
return {
"success": True,
"stage": "transfer",
"transfer_result": transfer_result
}
if __name__ == "__main__":
lab_registry.setup()
deck = BIOYOND_YB_Deck(setup=True)
w = BioyondCellWorkstation(deck=deck, address="172.16.28.102", port="502", debug_mode=False)
feeding = w.run_feeding_stage()
liquid = w.run_liquid_preparation_stage(feeding.get("feeding_materials"))
transfer = w.run_transfer_stage(liquid.get("liquid_materials"))
ws = BioyondCellWorkstation(deck=deck)
# ws.create_sample(name="test", board_type="配液瓶(小)板", bottle_type="配液瓶(小)", location_code="B01")
# logger.info(ws.scheduler_stop())
# logger.info(ws.scheduler_start())
# 继续后续流程
logger.info(ws.auto_feeding4to3()) #搬运物料到3号箱
# # # 使用正斜杠或 Path 对象来指定文件路径
# excel_path = Path("unilabos\\devices\\workstation\\bioyond_studio\\bioyond_cell\\2025092701.xlsx")
# logger.info(ws.create_orders(excel_path))
# logger.info(ws.transfer_3_to_2_to_1())
# logger.info(ws.transfer_1_to_2())
# logger.info(ws.scheduler_start())
while True:
time.sleep(1)
# re=ws.scheduler_stop()

View File

@@ -1,3 +1,4 @@
import csv
import inspect
import json
@@ -143,6 +144,7 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
else:
print("测试模式,跳过连接")
self.nodes, self.client = None, None
""" 工站的配置 """
self.success = False
@@ -159,27 +161,6 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
"resources": [self.deck]
})
def sync_transfer_resources(self) -> Dict[str, Any]:
"""
供跨工站转运完成后调用,强制将当前台面资源同步到云端/前端。
"""
if not hasattr(self, "_ros_node") or self._ros_node is None:
return {"status": "failed", "error": "ros_node_not_ready"}
if self.deck is None:
return {"status": "failed", "error": "deck_not_initialized"}
try:
future = ROS2DeviceNode.run_async_func(
self._ros_node.update_resource,
True,
resources=[self.deck],
)
if future:
future.result()
return {"status": "success"}
except Exception as exc:
logger.error(f"同步转运资源失败: {exc}", exc_info=True)
return {"status": "failed", "error": str(exc)}
# 批量操作在这里写
async def change_hole_sheet_to_2(self, hole: MaterialHole):
hole._unilabos_state["max_sheets"] = 2
@@ -1005,31 +986,6 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
#self.success = True
#return self.success
def run_packaging_workflow(self, workflow_config: Dict[str, Any]) -> "CoinCellAssemblyWorkstation":
config = workflow_config or {}
qiming_params = config.get("qiming") or {}
if qiming_params:
self.qiming_coin_cell_code(**qiming_params)
if config.get("init", True):
self.func_pack_device_init()
if config.get("auto", True):
self.func_pack_device_auto()
if config.get("start", True):
self.func_pack_device_start()
packaging_config = config.get("packaging") or {}
bottle_num = packaging_config.get("bottle_num")
if bottle_num is not None:
self.func_pack_send_bottle_num(bottle_num)
allpack_params = packaging_config.get("command") or {}
if allpack_params:
self.func_allpack_cmd(**allpack_params)
return self
def fun_wuliao_test(self) -> bool:
#找到data_init中构建的2个物料盘
liaopan3 = self.deck.get_resource("\u7535\u6c60\u6599\u76d8")
@@ -1243,95 +1199,20 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
"""移液枪头库存 (数量, INT16)"""
inventory, read_err = self.client.register_node_list(self.nodes).use_node('REG_DATA_TIPS_INVENTORY').read(1)
return inventory
'''
def run_coin_cell_assembly_workflow(
self,
workflow_config: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
config: Dict[str, Any]
if workflow_config is None:
config = {}
elif isinstance(workflow_config, list):
config = {"materials": workflow_config}
else:
config = workflow_config
qiming_defaults = {
"fujipian_panshu": 1,
"fujipian_juzhendianwei": 0,
"gemopanshu": 1,
"gemo_juzhendianwei": 0,
"lvbodian": True,
"battery_pressure_mode": True,
"battery_pressure": 4200,
"battery_clean_ignore": False,
}
qiming_params = {**qiming_defaults, **(config.get("qiming") or {})}
qiming_success = self.qiming_coin_cell_code(**qiming_params)
step_results: Dict[str, Any] = {}
try:
self.func_pack_device_init()
step_results["init"] = True
except Exception as exc:
step_results["init"] = f"error: {exc}"
try:
self.func_pack_device_auto()
step_results["auto"] = True
except Exception as exc:
step_results["auto"] = f"error: {exc}"
try:
self.func_pack_device_start()
step_results["start"] = True
except Exception as exc:
step_results["start"] = f"error: {exc}"
packaging_cfg = config.get("packaging") or {}
bottle_num = packaging_cfg.get("bottle_num", 1)
try:
self.func_pack_send_bottle_num(bottle_num)
step_results["send_bottle_num"] = True
except Exception as exc:
step_results["send_bottle_num"] = f"error: {exc}"
command_defaults = {
"elec_num": 1,
"elec_use_num": 1,
"elec_vol": 50,
"assembly_type": 7,
"assembly_pressure": 4200,
"file_path": "/Users/sml/work",
}
command_params = {**command_defaults, **(packaging_cfg.get("command") or {})}
packaging_result = self.func_allpack_cmd(**command_params)
finished_result = self.func_pack_send_finished_cmd()
stop_result = self.func_pack_device_stop()
return {
"qiming": {
"params": qiming_params,
"success": qiming_success,
},
"workflow_steps": step_results,
"packaging": {
"bottle_num": bottle_num,
"command": command_params,
"result": packaging_result,
},
"finish": {
"send_finished": finished_result,
"stop": stop_result,
},
}
if __name__ == "__main__":
deck = CoincellDeck(setup=True, name="coin_cell_deck")
w = CoinCellAssemblyWorkstation(deck=deck, address="172.16.28.102", port="502", debug_mode=False)
w.run_coin_cell_assembly_workflow()
# 简单测试
workstation = CoinCellAssemblyWorkstation(deck=CoincellDeck(setup=True, name="coin_cell_deck"))
# workstation.qiming_coin_cell_code(fujipian_panshu=1, fujipian_juzhendianwei=2, gemopanshu=3, gemo_juzhendianwei=4, lvbodian=False, battery_pressure_mode=False, battery_pressure=4200, battery_clean_ignore=False)
# print(f"工作站创建成功: {workstation.deck.name}")
# print(f"料盘数量: {len(workstation.deck.children)}")
workstation.func_pack_device_init()
workstation.func_pack_device_auto()
workstation.func_pack_device_start()
workstation.func_pack_send_bottle_num(16)
workstation.func_allpack_cmd(elec_num=16, elec_use_num=16, elec_vol=50, assembly_type=7, assembly_pressure=4200, file_path="/Users/calvincao/Desktop/work/Uni-Lab-OS-hhm")

View File

@@ -32,111 +32,6 @@ bioyond_cell:
feedback: {}
goal: {}
goal_default:
WH3_x1_y1_z3_1_materialId: ''
WH3_x1_y1_z3_1_materialType: ''
WH3_x1_y1_z3_1_quantity: 0
WH3_x1_y2_z3_4_materialId: ''
WH3_x1_y2_z3_4_materialType: ''
WH3_x1_y2_z3_4_quantity: 0
WH3_x1_y3_z3_7_materialId: ''
WH3_x1_y3_z3_7_materialType: ''
WH3_x1_y3_z3_7_quantity: 0
WH3_x1_y4_z3_10_materialId: ''
WH3_x1_y4_z3_10_materialType: ''
WH3_x1_y4_z3_10_quantity: 0
WH3_x1_y5_z3_13_materialId: ''
WH3_x1_y5_z3_13_materialType: ''
WH3_x1_y5_z3_13_quantity: 0
WH3_x2_y1_z3_2_materialId: ''
WH3_x2_y1_z3_2_materialType: ''
WH3_x2_y1_z3_2_quantity: 0
WH3_x2_y2_z3_5_materialId: ''
WH3_x2_y2_z3_5_materialType: ''
WH3_x2_y2_z3_5_quantity: 0
WH3_x2_y3_z3_8_materialId: ''
WH3_x2_y3_z3_8_materialType: ''
WH3_x2_y3_z3_8_quantity: 0
WH3_x2_y4_z3_11_materialId: ''
WH3_x2_y4_z3_11_materialType: ''
WH3_x2_y4_z3_11_quantity: 0
WH3_x2_y5_z3_14_materialId: ''
WH3_x2_y5_z3_14_materialType: ''
WH3_x2_y5_z3_14_quantity: 0
WH3_x3_y1_z3_3_materialId: ''
WH3_x3_y1_z3_3_materialType: ''
WH3_x3_y1_z3_3_quantity: 0
WH3_x3_y2_z3_6_materialId: ''
WH3_x3_y2_z3_6_materialType: ''
WH3_x3_y2_z3_6_quantity: 0
WH3_x3_y3_z3_9_materialId: ''
WH3_x3_y3_z3_9_materialType: ''
WH3_x3_y3_z3_9_quantity: 0
WH3_x3_y4_z3_12_materialId: ''
WH3_x3_y4_z3_12_materialType: ''
WH3_x3_y4_z3_12_quantity: 0
WH3_x3_y5_z3_15_materialId: ''
WH3_x3_y5_z3_15_materialType: ''
WH3_x3_y5_z3_15_quantity: 0
WH4_x1_y1_z1_1_materialName: ''
WH4_x1_y1_z1_1_quantity: 0.0
WH4_x1_y1_z2_1_materialName: ''
WH4_x1_y1_z2_1_materialType: ''
WH4_x1_y1_z2_1_quantity: 0.0
WH4_x1_y1_z2_1_targetWH: ''
WH4_x1_y2_z1_6_materialName: ''
WH4_x1_y2_z1_6_quantity: 0.0
WH4_x1_y2_z2_4_materialName: ''
WH4_x1_y2_z2_4_materialType: ''
WH4_x1_y2_z2_4_quantity: 0.0
WH4_x1_y2_z2_4_targetWH: ''
WH4_x1_y3_z1_11_materialName: ''
WH4_x1_y3_z1_11_quantity: 0.0
WH4_x1_y3_z2_7_materialName: ''
WH4_x1_y3_z2_7_materialType: ''
WH4_x1_y3_z2_7_quantity: 0.0
WH4_x1_y3_z2_7_targetWH: ''
WH4_x2_y1_z1_2_materialName: ''
WH4_x2_y1_z1_2_quantity: 0.0
WH4_x2_y1_z2_2_materialName: ''
WH4_x2_y1_z2_2_materialType: ''
WH4_x2_y1_z2_2_quantity: 0.0
WH4_x2_y1_z2_2_targetWH: ''
WH4_x2_y2_z1_7_materialName: ''
WH4_x2_y2_z1_7_quantity: 0.0
WH4_x2_y2_z2_5_materialName: ''
WH4_x2_y2_z2_5_materialType: ''
WH4_x2_y2_z2_5_quantity: 0.0
WH4_x2_y2_z2_5_targetWH: ''
WH4_x2_y3_z1_12_materialName: ''
WH4_x2_y3_z1_12_quantity: 0.0
WH4_x2_y3_z2_8_materialName: ''
WH4_x2_y3_z2_8_materialType: ''
WH4_x2_y3_z2_8_quantity: 0.0
WH4_x2_y3_z2_8_targetWH: ''
WH4_x3_y1_z1_3_materialName: ''
WH4_x3_y1_z1_3_quantity: 0.0
WH4_x3_y1_z2_3_materialName: ''
WH4_x3_y1_z2_3_materialType: ''
WH4_x3_y1_z2_3_quantity: 0.0
WH4_x3_y1_z2_3_targetWH: ''
WH4_x3_y2_z1_8_materialName: ''
WH4_x3_y2_z1_8_quantity: 0.0
WH4_x3_y2_z2_6_materialName: ''
WH4_x3_y2_z2_6_materialType: ''
WH4_x3_y2_z2_6_quantity: 0.0
WH4_x3_y2_z2_6_targetWH: ''
WH4_x3_y3_z2_9_materialName: ''
WH4_x3_y3_z2_9_materialType: ''
WH4_x3_y3_z2_9_quantity: 0.0
WH4_x3_y3_z2_9_targetWH: ''
WH4_x4_y1_z1_4_materialName: ''
WH4_x4_y1_z1_4_quantity: 0.0
WH4_x4_y2_z1_9_materialName: ''
WH4_x4_y2_z1_9_quantity: 0.0
WH4_x5_y1_z1_5_materialName: ''
WH4_x5_y1_z1_5_quantity: 0.0
WH4_x5_y2_z1_10_materialName: ''
WH4_x5_y2_z1_10_quantity: 0.0
xlsx_path: /Users/sml/work/Unilab/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/material_template.xlsx
handles: {}
placeholder_keys: {}
@@ -821,154 +716,6 @@ bioyond_cell:
title: resource_tree_transfer参数
type: object
type: UniLabJsonCommand
auto-run_feeding_stage:
feedback: {}
goal: {}
goal_default: {}
handles:
input: []
output:
- data_key: feeding_materials
data_source: executor
data_type: resource
handler_key: feeding_materials
label: Feeding Materials
placeholder_keys: {}
result:
properties:
feeding_materials:
items:
type: object
type: array
required:
- feeding_materials
type: object
schema:
description: ''
properties:
feedback: {}
goal:
properties: {}
required: []
type: object
result: {}
required:
- goal
title: run_feeding_stage参数
type: object
type: UniLabJsonCommand
auto-run_liquid_preparation_stage:
feedback: {}
goal: {}
goal_default: {}
handles:
input:
- data_key: feeding_materials
data_source: handle
data_type: resource
handler_key: feeding_materials
label: Feeding Materials
output:
- data_key: liquid_materials
data_source: executor
data_type: resource
handler_key: liquid_materials
label: Liquid Materials
placeholder_keys: {}
result:
properties:
feeding_materials:
items:
type: object
type: array
liquid_materials:
items:
type: object
type: array
required:
- liquid_materials
type: object
schema:
description: ''
properties:
feedback: {}
goal:
properties:
feeding_materials:
items:
type: object
type: array
required: []
type: object
result: {}
required:
- goal
title: run_liquid_preparation_stage参数
type: object
type: UniLabJsonCommand
auto-run_transfer_stage:
feedback: {}
goal: {}
goal_default: {}
handles:
input:
- data_key: liquid_materials
data_source: handle
data_type: resource
handler_key: liquid_materials
label: Liquid Materials
output:
- data_key: transfer_materials
data_source: executor
data_type: resource
handler_key: transfer_materials
label: Transfer Materials
placeholder_keys: {}
result:
properties:
liquid_materials:
items:
type: object
type: array
transfer_materials:
items:
type: object
type: array
transfer_summary:
type: object
required:
- transfer_materials
type: object
schema:
description: ''
properties:
feedback: {}
goal:
properties:
liquid_materials:
items:
type: object
type: array
required: []
type: object
result:
properties:
liquid_materials:
items:
type: object
type: array
transfer_materials:
items:
type: object
type: array
transfer_summary:
type: object
type: object
required:
- goal
title: run_transfer_stage参数
type: object
type: UniLabJsonCommand
auto-scheduler_continue:
feedback: {}
goal: {}
@@ -1260,7 +1007,7 @@ bioyond_cell:
device_id: String
type: python
config_info: []
description: 配液工站
description: ''
handles: []
icon: benyao2.webp
init_param_schema:

View File

@@ -477,171 +477,6 @@ coincellassemblyworkstation_device:
title: qiming_coin_cell_code参数
type: object
type: UniLabJsonCommand
auto-run_coin_cell_assembly_workflow:
feedback: {}
goal:
properties:
workflow_config:
type: object
required: []
type: object
goal_default:
workflow_config: {}
handles:
input:
- data_key: workflow_config
data_source: handle
data_type: resource
handler_key: WorkflowConfig
label: Workflow Config
output:
- data_key: qiming
data_source: executor
data_type: resource
handler_key: QimingResult
label: Qiming Result
- data_key: workflow_steps
data_source: executor
data_type: resource
handler_key: WorkflowSteps
label: Workflow Steps
- data_key: packaging
data_source: executor
data_type: resource
handler_key: PackagingResult
label: Packaging Result
- data_key: finish
data_source: executor
data_type: resource
handler_key: FinishResult
label: Finish Result
placeholder_keys: {}
result:
properties:
finish:
properties:
send_finished:
type: object
stop:
type: object
required:
- send_finished
- stop
type: object
packaging:
properties:
bottle_num:
type: integer
command:
type: object
result:
type: object
required:
- bottle_num
- command
- result
type: object
qiming:
properties:
params:
type: object
success:
type: boolean
required:
- params
- success
type: object
workflow_steps:
type: object
required:
- qiming
- workflow_steps
- packaging
- finish
type: object
schema:
description: ''
properties:
feedback: {}
goal:
properties:
workflow_config:
type: object
required: []
type: object
result:
properties:
finish:
properties:
send_finished:
type: object
stop:
type: object
required:
- send_finished
- stop
type: object
packaging:
properties:
bottle_num:
type: integer
command:
type: object
result:
type: object
required:
- bottle_num
- command
- result
type: object
qiming:
properties:
params:
type: object
success:
type: boolean
required:
- params
- success
type: object
workflow_steps:
type: object
required:
- qiming
- workflow_steps
- packaging
- finish
type: object
required:
- goal
title: run_coin_cell_assembly_workflow参数
type: object
type: UniLabJsonCommand
auto-run_packaging_workflow:
feedback: {}
goal: {}
goal_default:
workflow_config: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
workflow_config:
type: object
required:
- workflow_config
type: object
result: {}
required:
- goal
title: run_packaging_workflow参数
type: object
type: UniLabJsonCommand
module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinCellAssemblyWorkstation
status_types:
data_assembly_coin_cell_num: int
@@ -665,7 +500,7 @@ coincellassemblyworkstation_device:
sys_status: str
type: python
config_info: []
description: 扣电工站
description: ''
handles: []
icon: koudian.webp
init_param_schema:

View File

@@ -1,8 +1,6 @@
neware_battery_test_system:
category:
- neware_battery_test_system
- neware
- battery_test
class:
action_value_mappings:
auto-post_init:
@@ -72,38 +70,6 @@ neware_battery_test_system:
title: test_connection参数
type: object
type: UniLabJsonCommand
debug_resource_names:
feedback: {}
goal: {}
goal_default: {}
handles: {}
result:
return_info: return_info
success: success
schema:
description: 调试方法:显示所有资源的实际名称
properties:
feedback: {}
goal:
properties: {}
required: []
type: object
result:
properties:
return_info:
description: 资源调试信息
type: string
success:
description: 是否成功
type: boolean
required:
- return_info
- success
type: object
required:
- goal
type: object
type: UniLabJsonCommand
export_status_json:
feedback: {}
goal:
@@ -253,9 +219,7 @@ neware_battery_test_system:
goal_default:
string: ''
handles: {}
result:
return_info: return_info
success: success
result: {}
schema:
description: ''
properties:
@@ -288,56 +252,6 @@ neware_battery_test_system:
title: StrSingleInput
type: object
type: StrSingleInput
submit_from_csv:
feedback: {}
goal:
csv_path: string
output_dir: string
goal_default:
csv_path: ''
output_dir: .
handles: {}
result:
return_info: return_info
submitted_count: submitted_count
success: success
schema:
description: 从CSV文件批量提交Neware测试任务
properties:
feedback: {}
goal:
properties:
csv_path:
description: 输入CSV文件的绝对路径
type: string
output_dir:
description: 输出目录用于存储XML和备份文件默认当前目录
type: string
required:
- csv_path
type: object
result:
properties:
return_info:
description: 执行结果详细信息
type: string
submitted_count:
description: 成功提交的任务数量
type: integer
success:
description: 是否成功
type: boolean
total_count:
description: CSV文件中的总行数
type: integer
required:
- return_info
- success
type: object
required:
- goal
type: object
type: UniLabJsonCommand
test_connection_action:
feedback: {}
goal: {}
@@ -370,7 +284,7 @@ neware_battery_test_system:
- goal
type: object
type: UniLabJsonCommand
module: unilabos.devices.neware_battery_test_system.neware_battery_test_system:NewareBatteryTestSystem
module: unilabos.devices.battery.neware_battery_test_system:NewareBatteryTestSystem
status_types:
channel_status: dict
connection_info: dict
@@ -380,7 +294,7 @@ neware_battery_test_system:
total_channels: int
type: python
config_info: []
description: 新威电池测试系统驱动,提供720个通道的电池测试状态监控、物料管理和CSV批量提交功能。支持TCP通信实现远程控制包含完整的物料管理系统2盘电池状态映射以及从CSV文件批量提交测试任务的能力
description: 新威电池测试系统驱动,支持720个通道的电池测试状态监控和数据导出。通过TCP通信实现远程控制包含完整的物料管理系统,支持2盘电池状态映射和监控
handles: []
icon: ''
init_param_schema:
@@ -396,13 +310,13 @@ neware_battery_test_system:
port:
type: integer
size_x:
default: 50
default: 500.0
type: number
size_y:
default: 50
default: 500.0
type: number
size_z:
default: 20
default: 2000.0
type: number
timeout:
type: integer

View File

@@ -674,15 +674,10 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
for loc in material.get("locations", []):
if hasattr(deck, "warehouses") and loc.get("whName") in deck.warehouses:
warehouse = deck.warehouses[loc["whName"]]
num_x = getattr(warehouse, "num_items_x", 0) or 0
num_y = getattr(warehouse, "num_items_y", 0) or 0
num_z = getattr(warehouse, "num_items_z", 0) or 0
if num_x <= 0 or num_y <= 0 or num_z <= 0:
continue
idx = (
(loc.get("z", 0) - 1) * num_x * num_y
+ (loc.get("y", 0) - 1) * num_x
+ (loc.get("x", 0) - 1)
(loc.get("y", 0) - 1) * warehouse.num_items_x * warehouse.num_items_y
+ (loc.get("x", 0) - 1) * warehouse.num_items_x
+ (loc.get("z", 0) - 1)
)
if 0 <= idx < warehouse.capacity:
if warehouse[idx] is None or isinstance(warehouse[idx], ResourceHolder):

View File

@@ -402,6 +402,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
return result_future.result
"""还没有改过的部分"""
def _setup_hardware_proxy(
self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method