更新电化学工作站与拉曼光谱的驱动

This commit is contained in:
Hua Haiming
2025-05-06 16:09:04 +08:00
parent d1fbea3b7d
commit 2fed44e92b
6 changed files with 733 additions and 0 deletions

34
examples/dh_7000_eis.py Normal file
View File

@@ -0,0 +1,34 @@
import rclpy
# Initialize ROS communications for a given context
if(rclpy.ok() == False):
rclpy.init()
else:
print("rclpy already initiated")
from unilabos.devices.dh_electrochem.dh_7000 import DH7000
from unilabos_msgs.action import SendCmd
from std_msgs.msg import Float64, String, Int16
from unilabos.ros.device_node_wrapper import ros2_device_node
import clr
import os
# dll_path = r'D:\UniLab\code\DHElecChem\release64'
# eccore_dll_path = os.path.join(dll_path, 'ECCore.dll')
# os.environ["PATH"] = dll_path + os.pathsep + os.environ["PATH"]
# clr.AddReference(eccore_dll_path)
# from ECCore import ElecMachines
ROS2_DH7000 = ros2_device_node(
DH7000,
status_types={'machine_id': Int16, 'status': String},
action_value_mappings={'dh_cmd': {
'type': SendCmd,
'goal': {'command': 'command'},
'feedback': {},
'result': {'success': 'success'}}
}
)
device = ROS2_DH7000(device_id='DH7000_1')
rclpy.spin(device.ros_node_instance)

View File

@@ -0,0 +1,33 @@
import rclpy
# Initialize ROS communications for a given context
if(rclpy.ok() == False):
rclpy.init()
else:
print("rclpy already initiated")
from unilabos.devices.opsky_Raman.opsky_ATR30007 import ATR30007
from unilabos_msgs.action import SendCmd
from std_msgs.msg import Float64, String, Int16
from unilabos.ros.device_node_wrapper import ros2_device_node
import clr
import os
# dll_path = r'D:\UniLab\code\DHElecChem\release64'
# eccore_dll_path = os.path.join(dll_path, 'ECCore.dll')
# os.environ["PATH"] = dll_path + os.pathsep + os.environ["PATH"]
# clr.AddReference(eccore_dll_path)
# from ECCore import ElecMachines
ROS2_ATR30007 = ros2_device_node(
ATR30007,
#status_types={'machine_id': Int16, 'status': String},
action_value_mappings={'opsky_cmd': {
'type': SendCmd,
'goal': {'command': 'command'},
'feedback': {},
'result': {'success': 'success'}}
}
)
device = ROS2_ATR30007(device_id='ATR30007_1')
rclpy.spin(device.ros_node_instance)

View File

@@ -0,0 +1,552 @@
import clr
import os
import time
import pandas as pd
import json
import shutil
import sys
import time
from System import Environment
# define class DH7000
# this class is used to control the DH7000 electrochemical workstation
class DH7000:
def __init__(self, dll_path: str = r'D:\DH_release64'):
self.dll_path = dll_path
self.machine = None
self._load_dll()
self._initialize_machine()
self._status = "PowerOff" # otherwise it will be "Running", "Overload", "Idle"
self._machine_id = None
@property
def status(self) -> str:
return self._status
@property
def machine_id(self) -> int:
return self._machine_id
def _load_dll(self):
# Check if ECCore.dll exists
eccore_dll_path = os.path.join(self.dll_path, 'ECCore.dll')
if not os.path.exists(eccore_dll_path):
raise FileNotFoundError(f"ECCore.dll不存在于路径 {eccore_dll_path}")
# Prepend DLL path to system PATH
os.environ["PATH"] = self.dll_path + os.pathsep + os.environ["PATH"]
# Add reference
try:
clr.AddReference(eccore_dll_path)
from ECCore import ElecMachines
self.machine = ElecMachines()
print("成功添加ECCore.dll引用导入ElecMachines类并创建实例")
except Exception as e:
raise RuntimeError(f"添加DLL引用或导入ElecMachines类时出错: {e}")
def _initialize_machine(self):
interface_dir = os.path.join(self.dll_path, 'DHInterface')
if not os.path.exists(interface_dir):
raise FileNotFoundError(f"DHInterface文件夹不存在于路径 {interface_dir}")
init_success = self.machine.Init(interface_dir)
if not init_success:
raise RuntimeError("工作站初始化失败")
print("工作站初始化成功")
self.check_status()
def check_status(self):
Flag_isexp = self.machine.IsExperimenting()
if Flag_isexp:
Flag_isover = self.machine.IsOverLoad()
if Flag_isover:
self._status = "Overload"
else:
self._status = "Running"
else:
self._status = "Idle"
def set_current_machineid(self, machine_id: int):
machineid_list = list(self.machine.GetMachineId())
if machine_id not in machineid_list:
raise ValueError(f"工作站ID {machine_id} 不在可用ID列表中: {machineid_list}")
self.machine.SetCurrentMachineId(machine_id)
self._machine_id = machine_id
print(f"选择的工作站ID: {self._machine_id}")
def start_eis(
self,
isVolEIS: bool = True,
StartFreq: float = 1E5,
EndFreq: float = 1E-1,
Amplitude: float = 10,
IntervalType: int = 1,
PointCount: int = 10,
Voltage: float = 0.0,
VoltageVSType: float = 1.0,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
file_name: str = 'test_eis',
save_root: str = r"D:\UniLab\results"
):
"""
EIS test
"""
self.machine.Start_EIS(
isVolEIS,
StartFreq,
EndFreq,
Amplitude,
IntervalType,
PointCount,
Voltage,
VoltageVSType,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand
)
self._status = "Running"
# Wait for experiment to finish
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Retrieve data
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 6
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
ZreData = list(self.machine.SplitData(data_result, splitCount, 1))
ZimData = list(self.machine.SplitData(data_result, splitCount, 2))
ZData = list(self.machine.SplitData(data_result, splitCount, 3))
FreqData = list(self.machine.SplitData(data_result, splitCount, 4))
PhaseData = list(self.machine.SplitData(data_result, splitCount, 5))
data_to_dataframe = {
"TimeData": timeData,
"ZreData": ZreData,
"ZimData": ZimData,
"ZData": ZData,
"FreqData": FreqData,
"PhaseData": PhaseData,
}
# Convert to DataFrame
df = pd.DataFrame(data_to_dataframe)
self.save_data(df, file_name, save_root)
self.check_status()
def start_lsv(
self,
InitialPotential: float = 0.0,
InitialPotentialVSType: int = 1,
FinallyPotential: float = 1.0,
FinallyPotentialVSType: int = 1,
ScanRate: float = 0.05,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
file_name: str = 'lsv_test',
save_root: str = r"D:\UniLab\results"
):
"""
LSV test
"""
self.machine.Start_Linear_Scan_Voltammetry(
InitialPotential,
InitialPotentialVSType,
FinallyPotential,
FinallyPotentialVSType,
ScanRate,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand
)
self._status = "Running"
# Wait for experiment to finish
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Retrieve data
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 3
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
VolData = list(self.machine.SplitData(data_result, splitCount, 1))
CurData = list(self.machine.SplitData(data_result, splitCount, 2))
data_to_dataframe = {
"TimeData": timeData,
"VolData": VolData,
"CurData": CurData,
}
# Convert to DataFrame
df = pd.DataFrame(data_to_dataframe)
self.save_data(df, file_name, save_root)
self.check_status()
def start_cv_single(
self,
InitialPotential: float = 0.0,
InitialPotentialVSType: int = 0,
TopPotential: float = 1.0,
TopPotentialVSType: int = 0,
FinallyPotential: float = 0.0,
FinallyPotentialVSType: int = 0,
ScanRate: float = 0.1,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
isVoltageFilterAuto: int = 1,
VoltageFilter: str = "",
isCurrentFilterAuto: int = 1,
currentFilter: str = "",
machineId: int = 0,
delayTime: float = 0.0,
file_name: str = 'cv_single_test',
save_root: str = r"D:\UniLab\results"
):
"""
Single cyclic voltammetry (Single CV)
"""
# Call underlying DLL function
self.machine.Start_Circle_Voltammetry_Single(
InitialPotential,
InitialPotentialVSType,
TopPotential,
TopPotentialVSType,
FinallyPotential,
FinallyPotentialVSType,
ScanRate,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand,
isVoltageFilterAuto,
VoltageFilter,
isCurrentFilterAuto,
currentFilter,
machineId,
delayTime
)
# Wait for experiment to finish
self._status = "Running"
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Retrieve and parse data
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
# Same as LSV, usually three columns: Time, Voltage, Current
splitCount = 3
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
volData = list(self.machine.SplitData(data_result, splitCount, 1))
curData = list(self.machine.SplitData(data_result, splitCount, 2))
data_to_dataframe = {
"TimeData": timeData,
"VolData": volData,
"CurData": curData,
}
df = pd.DataFrame(data_to_dataframe)
# Save data
self.save_data(df, file_name, save_root)
self.check_status()
def start_cv_multi(
self,
IsUseInitialPotential: bool = True,
InitialPotential: float = 0.0,
InitialPotentialVSType: int = 0,
TopPotential1: float = 1.0,
TopPotential1VSType: int = 0,
TopPotential2: float = -1.0,
TopPotential2VSType: int = 0,
IsUseFinallyPotential: bool = True,
FinallyPotential: float = 0.0,
FinallyPotentialVSType: int = 0,
ScanRate: float = 0.1,
cycleCount: int = 3,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
isVoltageFilterAuto: int = 1,
VoltageFilter: str = "",
isCurrentFilterAuto: int = 1,
currentFilter: str = "",
machineId: int = 0,
delayTime: float = 0.0,
file_name: str = "cv_multi_test",
save_root: str = r"D:\UniLab\results",
):
"""
Multiple cyclic voltammetry (Multiple CV).
"""
# Call underlying DLL
self.machine.Start_Circle_Voltammetry_Multi(
IsUseInitialPotential,
InitialPotential,
InitialPotentialVSType,
TopPotential1,
TopPotential1VSType,
TopPotential2,
TopPotential2VSType,
IsUseFinallyPotential,
FinallyPotential,
FinallyPotentialVSType,
ScanRate,
cycleCount,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand,
isVoltageFilterAuto,
VoltageFilter,
isCurrentFilterAuto,
currentFilter,
machineId,
delayTime,
)
# Wait for experiment to finish
self._status = "Running"
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Fetch data and split
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 3 # Assume still three columns: Time/Voltage/Current
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
volData = list(self.machine.SplitData(data_result, splitCount, 1))
curData = list(self.machine.SplitData(data_result, splitCount, 2))
df = pd.DataFrame(
{
"TimeData": timeData,
"VolData": volData,
"CurData": curData,
}
)
# Save
self.save_data(df, file_name, save_root)
self.check_status()
def start_ca(
self,
timePerPoint: float = 0.1,
continueTime: float = 100.0,
InitialPotential: float = 0.0,
InitialPotentialVSType: int = 0,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
isVoltageFilterAuto: int = 1,
VoltageFilter: str = "",
isCurrentFilterAuto: int = 1,
currentFilter: str = "",
machineId: int = 0,
file_name: str = "ca_test",
save_root: str = r"D:\UniLab\results",
):
"""
ChronoAmperometry (CA) — constant potential
"""
# Call DLL
self.machine.Start_ChronoamperonetryParam(
timePerPoint,
continueTime,
InitialPotential,
InitialPotentialVSType,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand,
isVoltageFilterAuto,
VoltageFilter,
isCurrentFilterAuto,
currentFilter,
machineId,
)
# Wait for experiment to finish
self._status = "Running"
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Fetch and split data (commonly three columns: Time / Voltage / Current)
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 3
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
volData = list(self.machine.SplitData(data_result, splitCount, 1))
curData = list(self.machine.SplitData(data_result, splitCount, 2))
df = pd.DataFrame(
{
"TimeData": timeData,
"VolData": volData,
"CurData": curData,
}
)
# Save
self.save_data(df, file_name, save_root)
self.check_status()
def start_cp(
self,
timePerPoint: float = 0.1,
continueTime: float = 100.0,
current: float = 1.0,
VoltageRand: str = "100",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
isVoltageFilterAuto: int = 1,
VoltageFilter: str = "",
isCurrentFilterAuto: int = 1,
currentFilter: str = "",
machineId: int = 0,
file_name: str = "cp_test",
save_root: str = r"D:\UniLab\results",
):
"""
ChronoPotentiometry (CP) — constant current
"""
# Call DLL
self.machine.Start_ChronopotentiometryParam(
timePerPoint,
continueTime,
current,
VoltageRand,
isCurrentRandAuto,
CurrentRand,
isVoltageFilterAuto,
VoltageFilter,
isCurrentFilterAuto,
currentFilter,
machineId,
)
# Wait for experiment to finish
self._status = "Running"
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Fetch and split data (Time / Voltage / Current)
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 3
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
volData = list(self.machine.SplitData(data_result, splitCount, 1))
curData = list(self.machine.SplitData(data_result, splitCount, 2))
df = pd.DataFrame(
{
"TimeData": timeData,
"VolData": volData,
"CurData": curData,
}
)
# Save
self.save_data(df, file_name, save_root)
self.check_status()
def stop_experiment(self):
self.machine.StopExperiment()
self.check_status()
print("实验已停止")
def save_data(self, data: pd.DataFrame, file_name: str, save_root: str):
if not os.path.exists(save_root):
os.makedirs(save_root)
data.to_csv(os.path.join(save_root, f"{file_name}.csv"), index=False)
print(f"数据已保存到 {save_root}")
# === Core: a unified dh_cmd method that calls start_eis or start_lsv according to methods ===
def dh_cmd(self, command: str):
"""
Unified handler for different commands such as EIS / LSV / CV / CA.
In the incoming command JSON, use the key "methods" to specify the measurement type: "eis", "lsv", etc.
"""
self.success = False
print(f"接收到命令: {command}")
# Replace "!=!" with quotes and fix True/False and path separators
command = command.replace("!=!", "\"")
command = command.replace('\\', '\\\\')
command = command.replace("True", "true").replace("False", "false")
try:
cmd_dict = json.loads(command)
print(f"命令参数: {cmd_dict}")
# Extract measurement method, default to "EIS" if not specified
method = cmd_dict.pop("methods", "eis").lower()
# Extract file save path (may not be provided)
save_root = cmd_dict.get("save_root", r"D:\UniLab\results\test")
if method in ("cv_single", "cvs"):
# CV (cyclic voltammetry, single cycle)
print("执行 EIS 测试...")
self.start_cv_single(**cmd_dict)
elif method in ("cv_multi", "cvm"):
# CV (cyclic voltammetry, multiple cycles)
print("执行多循环伏安测试...")
self.start_cv_multi(**cmd_dict)
elif method == "lsv":
# LSV
print("执行 LSV 测试...")
self.start_lsv(**cmd_dict)
elif method == "ca":
# CA
self.start_ca(**cmd_dict)
elif method == "cp":
# CP
self.start_cp(**cmd_dict)
else:
# Default to EIS
print("执行 EIS 测试...")
self.start_eis(**cmd_dict)
print(f"实验完成,数据已保存到 {save_root}")
self.success = True
except Exception as e:
print(f"命令执行失败: {e}")
raise RuntimeError(f"error: {e}")

View File

View File

@@ -0,0 +1,114 @@
import clr
import os
import time
import pandas as pd
import json
import shutil
import sys
import time
from System import Environment
# define class ATR30007
# this class is used to control the ATR30007 Raman workstation
class ATR30007:
def __init__(self, dll_path: str = r'D:\Raman_RS'):
self.dll_path = dll_path
self.machine = None
self._load_dll()
def _load_dll(self):
# 检查ECCore.dll是否存在
eccore_dll_path = os.path.join(self.dll_path, 'ATRWrapper.dll')
if not os.path.exists(eccore_dll_path):
raise FileNotFoundError(f"ATRWrapper.dll不存在于路径 {eccore_dll_path}")
# 将DLL路径添加到系统PATH的最前面
os.environ["PATH"] = self.dll_path + os.pathsep + os.environ["PATH"]
# 添加引用
try:
clr.AddReference(eccore_dll_path)
from Optosky.Wrapper import ATRWrapper
self.machine = ATRWrapper()
print("成功添加ATRWrapper.dll引用导入ATRWrapper类并创建实例")
except Exception as e:
raise RuntimeError(f"添加DLL引用或导入ATRWrapper类时出错: {e}")
def get_machineSn(self):
machineSn = self.machine.GetSn()
print(f"选择的工作站ID: {machineSn}")
def start_Raman(self, IntegTime: int = 5000, LdPower: int = 200,
Ldwave: int = 1, CCDTemp: int = -5,
file_name: str = 'test_raman', save_root: str = None):
#打开仪器
On_flag = self.machine.OpenDevice()
print(f"On_flag: {On_flag}")
#获取仪器SN
wrapper_Sn = self.machine.GetSn()
print(f"wrapper_Sn: {wrapper_Sn}")
#设置当前设备的积分时间, 单位为毫秒
Integ_flag = self.machine.SetIntegrationTime(IntegTime)
print(f"Integ_flag:{Integ_flag}")
#设置激光功率, 单位mW
LdP_flag = self.machine.SetLdPower(LdPower,Ldwave)
print(f"LdP_flag:{LdP_flag}")
#设置 CCD 制冷温度
SetC_flag = self.machine.SetCool(CCDTemp)
print(f"SetC_flag:{SetC_flag}")
#开始采集光谱
Spect = self.machine.AcquireSpectrum()
#开始采集光谱谱图数据转换
Spect_data = list(Spect.get_Data())
Spect_suss_flag = Spect.get_Success()
print(f"Spect_suss_flag:{Spect_suss_flag}")
#获取波数
WaveNum = list(self.machine.GetWaveNum())
#光谱数据基线校正
Spect_bLC = list(self.machine.BaseLineCorrect(Spect_data))
#对数据进行boxcar 平滑
Spect_StB = list(self.machine.SmoothBoxcar(Spect_bLC, 10))
#关闭仪器
OFF_flag = wrapper.CloseDevice()
print(f"OFF_flag: {OFF_flag}")
data_to_dataframe = {
"WaveNum": WaveNum,
"Spect_data": Spect_data,
"Spect_bLC": Spect_bLC,
"Spect_StB": Spect_StB
}
# 将数据转换为DataFrame格式
df = pd.DataFrame(data_to_dataframe)
self.save_data(df, file_name, save_root)
self.check_status()
def save_data(self, data: pd.DataFrame, file_name: str, save_root: str):
if not os.path.exists(save_root):
os.makedirs(save_root)
data.to_csv(os.path.join(save_root, f"{file_name}.csv"), index=False)
print(f"数据已保存到 {save_root}")
def opsky_cmd(self, command: str):
print(f"接收到命令: {command}")
# replace !=! to "
command = command.replace("!=!", "\"")
command = command.replace('\\', '\\\\')
command = command.replace("True", "true").replace("False", "false")
try:
cmd_dict = json.loads(command)
print(f"命令参数: {cmd_dict}")
# 解析命令参数
# file_name = cmd_dict.get("file_name", "test")
save_root = cmd_dict.get("save_root", r"D:\UniLab\results\250414")
# FIXME: use EIS for test. Add parameter for other tests
self.start_Raman(**cmd_dict) # , file_name=file_name, save_root=save_root
print(f"实验完成,数据已保存到 {save_root}")
except Exception as e:
print(f"命令执行失败: {e}")
raise f"error: {e}"