This commit is contained in:
h840473807
2025-06-24 01:03:06 +08:00
committed by GitHub
15 changed files with 1017 additions and 39 deletions

34
examples/dh_7000_eis.py Normal file
View File

@@ -0,0 +1,34 @@
import rclpy
# Initialize ROS communications for a given context
if(rclpy.ok() == False):
rclpy.init()
else:
print("rclpy already initiated")
from unilabos.devices.electrochem.dh_7000 import DH7000
from unilabos_msgs.action import SendCmd
from std_msgs.msg import Float64, String, Int16
from unilabos.ros.device_node_wrapper import ros2_device_node
import clr
import os
# dll_path = r'D:\UniLab\code\DHElecChem\release64'
# eccore_dll_path = os.path.join(dll_path, 'ECCore.dll')
# os.environ["PATH"] = dll_path + os.pathsep + os.environ["PATH"]
# clr.AddReference(eccore_dll_path)
# from ECCore import ElecMachines
ROS2_DH7000 = ros2_device_node(
DH7000,
status_types={'machine_id': Int16, 'status': String},
action_value_mappings={'dh_cmd': {
'type': SendCmd,
'goal': {'command': 'command'},
'feedback': {},
'result': {'success': 'success'}}
}
)
device = ROS2_DH7000(device_id='DH7000_1')
rclpy.spin(device.ros_node_instance)

View File

@@ -0,0 +1,31 @@
import rclpy
# Initialize ROS communications for a given context
if(rclpy.ok() == False):
rclpy.init()
else:
print("rclpy already initiated")
from unilabos.devices.raman_uv.opsky_ATR30007 import ATR30007
from unilabos_msgs.action import SendCmd
from unilabos.ros.device_node_wrapper import ros2_device_node
# dll_path = r'D:\UniLab\code\DHElecChem\release64'
# eccore_dll_path = os.path.join(dll_path, 'ECCore.dll')
# os.environ["PATH"] = dll_path + os.pathsep + os.environ["PATH"]
# clr.AddReference(eccore_dll_path)
# from ECCore import ElecMachines
ROS2_ATR30007 = ros2_device_node(
ATR30007,
#status_types={'machine_id': Int16, 'status': String},
action_value_mappings={'opsky_cmd': {
'type': SendCmd,
'goal': {'command': 'command'},
'feedback': {},
'result': {'success': 'success'}}
}
)
device = ROS2_ATR30007(device_id='ATR30007_1')
rclpy.spin(device.ros_node_instance)

View File

@@ -0,0 +1,61 @@
import os
import time
import pandas as pd
import json
import shutil
import sys
import time
# define class ATR30007
# this class is used to control the ATR30007 Raman workstation
class NMR_400:
def __init__(self):
self.machine = None
def start_NMR(self, user: str = "admin", holder: int = 1,
name: str = "Samplename1", expno: int = 1,
solvent: str = "D2O", experiment: str = "PROTON16",
title: str = "test_1H", save_root: str = None,):
#打开仪器
content = f"""USER {user}
HOLDER {holder}
NAME {name}
EXPNO {expno}
SOLVENT {solvent}
EXPERIMENT {experiment}
TITLE {title}
END
"""
# 生成文件名(当前日期)
filename = datetime.now().strftime("%Y%m%d%H%M") + ".txt"
# 创建输出目录(如果不存在)
os.makedirs(save_root, exist_ok=True)
# 写入文件
filepath = os.path.join(save_root, filename)
with open(filepath, "w") as f:
f.write(content)
print(f'文件{filename}保存到{save_root}')
def NMR_cmd(self, command: str):
print(f"接收到命令: {command}")
# replace !=! to "
command = command.replace("!=!", "\"")
command = command.replace('\\', '\\\\')
command = command.replace("True", "true").replace("False", "false")
try:
cmd_dict = json.loads(command)
print(f"命令参数: {cmd_dict}")
# 解析命令参数
save_root = cmd_dict.get("save_root", r"D:\UniLab\results\250414")
# FIXME: use EIS for test. Add parameter for other tests
self.start_NMR(**cmd_dict) # , file_name=file_name, save_root=save_root
print(f"实验完成,数据已保存到 {save_root}")
except Exception as e:
print(f"命令执行失败: {e}")
raise f"error: {e}"

View File

View File

@@ -0,0 +1,109 @@
### test Ao long AL-Y3500 XRD device #####
'''
注意: 复制前删除同名文件
(1) 首先要在D盘创建Aolong Save文件夹。
在D盘Aolong Save文件夹里创建Hardware.txt参数控制文件, 即D:\Aolong Save\Hardware.txt。
(2) 除了停止和开始命令,其它参数要在命令符后面输入一位空格字符(同一行不能输入多位空格)。
所有命令符英文都要大写。
在D盘Aolong Save文件夹里创建Jade文件夹测量结果自动保存在D:\Aolong Save\Jade里面。
(3) `远程控制.exe`的主界面要勾选远程控制,才能实现远程操作功能。
输入参数`Hardware.txt`解读:
命令符 参数
USER 用户名
SAMPLE 样品编号
TIME 采集时间
STEP 步进角度
START 起始角度
C_END 结束角度
RUN 开始命令
STOP 停止命令
END
示例:
建议采用(1)的写法XRD机器保持开启状态参数RUN启动`远程控制.exe`, 保持界面中的高压电源30 kV开启状态
(1). 测量Aolong 513 样品。在20-50度范围内以0.2秒速度采集0.02步进角度的样品数据!
USER Aolong
SAMPLE 513
TIME 0.2
STEP 0.02
START 20
C_END 50
RUN
END
(2). 停止测量
USER Aolong
SAMPLE 513
STOP
END
'''
class XRD_Aolong_AL_Y3500:
def __init__(self, name:str, time:float, step_angle:float, start_angle:float, end_angle:float, status:str):
self.name = name
self.time = time
self.step_angle = step_angle
self.start_angle = start_angle
self.end_angle = end_angle
self.status = status
def setup_run_XRD(self, name:str, time:float, step_angle:float, start_angle:float, end_angle:float, status:str):
'''
setup input parameter of `aolong XRD AL-Y3500`
'''
### default input parameters ##
parameter_path = "D:/Aolong Save/Hardware.txt"
ret = "USER Aolong"
ret += "\n"
ret += f"SAMPLE {name}"
ret += "\n"
ret += f"TIME {time}"
ret += "\n"
ret += f"STEP {step_angle}"
ret += "\n"
ret += f"START {start_angle}"
ret += "\n"
ret += f"C_END {end_angle}"
ret += "\n"
ret += f"{status}"
ret += "\n"
ret += "END"
ret += "\n"
fp = open(parameter_path, "w")
fp.write(ret)
fp.close()
## 文件`parameter_path = "D:/Aolong Save/Hardware.txt"` 一旦发生改动仪器开启自动开始XRD测试
## 并将测试结果自动保存到 `default save path` 中
save_path = "D:/Aolong Save/Jade"
#if __name__ == "__main__":
# setup_run_XRD("smc-20250515-test", 0.2, 0.02, 20, 40, "RUN" )

View File

View File

View File

@@ -0,0 +1,552 @@
import clr
import os
import time
import pandas as pd
import json
import shutil
import sys
import time
from System import Environment
# define class DH7000
# this class is used to control the DH7000 electrochemical workstation
class DH7000:
def __init__(self, dll_path: str = r'D:\DH_release64'):
self.dll_path = dll_path
self.machine = None
self._load_dll()
self._initialize_machine()
self._status = "PowerOff" # otherwise it will be "Running", "Overload", "Idle"
self._machine_id = None
@property
def status(self) -> str:
return self._status
@property
def machine_id(self) -> int:
return self._machine_id
def _load_dll(self):
# Check if ECCore.dll exists
eccore_dll_path = os.path.join(self.dll_path, 'ECCore.dll')
if not os.path.exists(eccore_dll_path):
raise FileNotFoundError(f"ECCore.dll不存在于路径 {eccore_dll_path}")
# Prepend DLL path to system PATH
os.environ["PATH"] = self.dll_path + os.pathsep + os.environ["PATH"]
# Add reference
try:
clr.AddReference(eccore_dll_path)
from ECCore import ElecMachines
self.machine = ElecMachines()
print("成功添加ECCore.dll引用导入ElecMachines类并创建实例")
except Exception as e:
raise RuntimeError(f"添加DLL引用或导入ElecMachines类时出错: {e}")
def _initialize_machine(self):
interface_dir = os.path.join(self.dll_path, 'DHInterface')
if not os.path.exists(interface_dir):
raise FileNotFoundError(f"DHInterface文件夹不存在于路径 {interface_dir}")
init_success = self.machine.Init(interface_dir)
if not init_success:
raise RuntimeError("工作站初始化失败")
print("工作站初始化成功")
self.check_status()
def check_status(self):
Flag_isexp = self.machine.IsExperimenting()
if Flag_isexp:
Flag_isover = self.machine.IsOverLoad()
if Flag_isover:
self._status = "Overload"
else:
self._status = "Running"
else:
self._status = "Idle"
def set_current_machineid(self, machine_id: int):
machineid_list = list(self.machine.GetMachineId())
if machine_id not in machineid_list:
raise ValueError(f"工作站ID {machine_id} 不在可用ID列表中: {machineid_list}")
self.machine.SetCurrentMachineId(machine_id)
self._machine_id = machine_id
print(f"选择的工作站ID: {self._machine_id}")
def start_eis(
self,
isVolEIS: bool = True,
StartFreq: float = 1E5,
EndFreq: float = 1E-1,
Amplitude: float = 10,
IntervalType: int = 1,
PointCount: int = 10,
Voltage: float = 0.0,
VoltageVSType: float = 1.0,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
file_name: str = 'test_eis',
save_root: str = r"D:\UniLab\results"
):
"""
EIS test
"""
self.machine.Start_EIS(
isVolEIS,
StartFreq,
EndFreq,
Amplitude,
IntervalType,
PointCount,
Voltage,
VoltageVSType,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand
)
self._status = "Running"
# Wait for experiment to finish
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Retrieve data
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 6
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
ZreData = list(self.machine.SplitData(data_result, splitCount, 1))
ZimData = list(self.machine.SplitData(data_result, splitCount, 2))
ZData = list(self.machine.SplitData(data_result, splitCount, 3))
FreqData = list(self.machine.SplitData(data_result, splitCount, 4))
PhaseData = list(self.machine.SplitData(data_result, splitCount, 5))
data_to_dataframe = {
"TimeData": timeData,
"ZreData": ZreData,
"ZimData": ZimData,
"ZData": ZData,
"FreqData": FreqData,
"PhaseData": PhaseData,
}
# Convert to DataFrame
df = pd.DataFrame(data_to_dataframe)
self.save_data(df, file_name, save_root)
self.check_status()
def start_lsv(
self,
InitialPotential: float = 0.0,
InitialPotentialVSType: int = 1,
FinallyPotential: float = 1.0,
FinallyPotentialVSType: int = 1,
ScanRate: float = 0.05,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
file_name: str = 'lsv_test',
save_root: str = r"D:\UniLab\results"
):
"""
LSV test
"""
self.machine.Start_Linear_Scan_Voltammetry(
InitialPotential,
InitialPotentialVSType,
FinallyPotential,
FinallyPotentialVSType,
ScanRate,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand
)
self._status = "Running"
# Wait for experiment to finish
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Retrieve data
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 3
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
VolData = list(self.machine.SplitData(data_result, splitCount, 1))
CurData = list(self.machine.SplitData(data_result, splitCount, 2))
data_to_dataframe = {
"TimeData": timeData,
"VolData": VolData,
"CurData": CurData,
}
# Convert to DataFrame
df = pd.DataFrame(data_to_dataframe)
self.save_data(df, file_name, save_root)
self.check_status()
def start_cv_single(
self,
InitialPotential: float = 0.0,
InitialPotentialVSType: int = 0,
TopPotential: float = 1.0,
TopPotentialVSType: int = 0,
FinallyPotential: float = 0.0,
FinallyPotentialVSType: int = 0,
ScanRate: float = 0.1,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
isVoltageFilterAuto: int = 1,
VoltageFilter: str = "",
isCurrentFilterAuto: int = 1,
currentFilter: str = "",
machineId: int = 0,
delayTime: float = 0.0,
file_name: str = 'cv_single_test',
save_root: str = r"D:\UniLab\results"
):
"""
Single cyclic voltammetry (Single CV)
"""
# Call underlying DLL function
self.machine.Start_Circle_Voltammetry_Single(
InitialPotential,
InitialPotentialVSType,
TopPotential,
TopPotentialVSType,
FinallyPotential,
FinallyPotentialVSType,
ScanRate,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand,
isVoltageFilterAuto,
VoltageFilter,
isCurrentFilterAuto,
currentFilter,
machineId,
delayTime
)
# Wait for experiment to finish
self._status = "Running"
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Retrieve and parse data
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
# Same as LSV, usually three columns: Time, Voltage, Current
splitCount = 3
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
volData = list(self.machine.SplitData(data_result, splitCount, 1))
curData = list(self.machine.SplitData(data_result, splitCount, 2))
data_to_dataframe = {
"TimeData": timeData,
"VolData": volData,
"CurData": curData,
}
df = pd.DataFrame(data_to_dataframe)
# Save data
self.save_data(df, file_name, save_root)
self.check_status()
def start_cv_multi(
self,
IsUseInitialPotential: bool = True,
InitialPotential: float = 0.0,
InitialPotentialVSType: int = 0,
TopPotential1: float = 1.0,
TopPotential1VSType: int = 0,
TopPotential2: float = -1.0,
TopPotential2VSType: int = 0,
IsUseFinallyPotential: bool = True,
FinallyPotential: float = 0.0,
FinallyPotentialVSType: int = 0,
ScanRate: float = 0.1,
cycleCount: int = 3,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
isVoltageFilterAuto: int = 1,
VoltageFilter: str = "",
isCurrentFilterAuto: int = 1,
currentFilter: str = "",
machineId: int = 0,
delayTime: float = 0.0,
file_name: str = "cv_multi_test",
save_root: str = r"D:\UniLab\results",
):
"""
Multiple cyclic voltammetry (Multiple CV).
"""
# Call underlying DLL
self.machine.Start_Circle_Voltammetry_Multi(
IsUseInitialPotential,
InitialPotential,
InitialPotentialVSType,
TopPotential1,
TopPotential1VSType,
TopPotential2,
TopPotential2VSType,
IsUseFinallyPotential,
FinallyPotential,
FinallyPotentialVSType,
ScanRate,
cycleCount,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand,
isVoltageFilterAuto,
VoltageFilter,
isCurrentFilterAuto,
currentFilter,
machineId,
delayTime,
)
# Wait for experiment to finish
self._status = "Running"
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Fetch data and split
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 3 # Assume still three columns: Time/Voltage/Current
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
volData = list(self.machine.SplitData(data_result, splitCount, 1))
curData = list(self.machine.SplitData(data_result, splitCount, 2))
df = pd.DataFrame(
{
"TimeData": timeData,
"VolData": volData,
"CurData": curData,
}
)
# Save
self.save_data(df, file_name, save_root)
self.check_status()
def start_ca(
self,
timePerPoint: float = 0.1,
continueTime: float = 100.0,
InitialPotential: float = 0.0,
InitialPotentialVSType: int = 0,
isVoltageRandAuto: int = 1,
VoltageRand: str = "10",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
isVoltageFilterAuto: int = 1,
VoltageFilter: str = "",
isCurrentFilterAuto: int = 1,
currentFilter: str = "",
machineId: int = 0,
file_name: str = "ca_test",
save_root: str = r"D:\UniLab\results",
):
"""
ChronoAmperometry (CA) — constant potential
"""
# Call DLL
self.machine.Start_ChronoamperonetryParam(
timePerPoint,
continueTime,
InitialPotential,
InitialPotentialVSType,
isVoltageRandAuto,
VoltageRand,
isCurrentRandAuto,
CurrentRand,
isVoltageFilterAuto,
VoltageFilter,
isCurrentFilterAuto,
currentFilter,
machineId,
)
# Wait for experiment to finish
self._status = "Running"
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Fetch and split data (commonly three columns: Time / Voltage / Current)
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 3
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
volData = list(self.machine.SplitData(data_result, splitCount, 1))
curData = list(self.machine.SplitData(data_result, splitCount, 2))
df = pd.DataFrame(
{
"TimeData": timeData,
"VolData": volData,
"CurData": curData,
}
)
# Save
self.save_data(df, file_name, save_root)
self.check_status()
def start_cp(
self,
timePerPoint: float = 0.1,
continueTime: float = 100.0,
current: float = 1.0,
VoltageRand: str = "100",
isCurrentRandAuto: int = 1,
CurrentRand: str = "10",
isVoltageFilterAuto: int = 1,
VoltageFilter: str = "",
isCurrentFilterAuto: int = 1,
currentFilter: str = "",
machineId: int = 0,
file_name: str = "cp_test",
save_root: str = r"D:\UniLab\results",
):
"""
ChronoPotentiometry (CP) — constant current
"""
# Call DLL
self.machine.Start_ChronopotentiometryParam(
timePerPoint,
continueTime,
current,
VoltageRand,
isCurrentRandAuto,
CurrentRand,
isVoltageFilterAuto,
VoltageFilter,
isCurrentFilterAuto,
currentFilter,
machineId,
)
# Wait for experiment to finish
self._status = "Running"
while self._status == "Running":
time.sleep(5)
self.check_status()
print(f"当前状态: {self._status}")
# Fetch and split data (Time / Voltage / Current)
datatype = self.machine.GetResultDataType()
data_result = self.machine.GetData(datatype)
splitCount = 3
timeData = list(self.machine.SplitData(data_result, splitCount, 0))
volData = list(self.machine.SplitData(data_result, splitCount, 1))
curData = list(self.machine.SplitData(data_result, splitCount, 2))
df = pd.DataFrame(
{
"TimeData": timeData,
"VolData": volData,
"CurData": curData,
}
)
# Save
self.save_data(df, file_name, save_root)
self.check_status()
def stop_experiment(self):
self.machine.StopExperiment()
self.check_status()
print("实验已停止")
def save_data(self, data: pd.DataFrame, file_name: str, save_root: str):
if not os.path.exists(save_root):
os.makedirs(save_root)
data.to_csv(os.path.join(save_root, f"{file_name}.csv"), index=False)
print(f"数据已保存到 {save_root}")
# === Core: a unified dh_cmd method that calls start_eis or start_lsv according to methods ===
def dh_cmd(self, method: str, command: str, resource: dict):
"""
Unified handler for different commands such as EIS / LSV / CV / CA.
In the incoming command JSON, use the key "methods" to specify the measurement type: "eis", "lsv", etc.
"""
self.success = False
print(f"接收到命令: {command}")
# Replace "!=!" with quotes and fix True/False and path separators
command = command.replace("!=!", "\"")
command = command.replace('\\', '\\\\')
command = command.replace("True", "true").replace("False", "false")
try:
cmd_dict = json.loads(command)
print(f"命令参数: {cmd_dict}")
# Extract measurement method, default to "EIS" if not specified
# method = cmd_dict.pop("methods", "eis").lower()
# Extract file save path (may not be provided)
save_root = cmd_dict.get("save_root", r"D:\UniLab\results\test")
if method in ("cv_single", "cvs"):
# CV (cyclic voltammetry, single cycle)
print("执行 EIS 测试...")
self.start_cv_single(**cmd_dict)
elif method in ("cv_multi", "cvm"):
# CV (cyclic voltammetry, multiple cycles)
print("执行多循环伏安测试...")
self.start_cv_multi(**cmd_dict)
elif method == "lsv":
# LSV
print("执行 LSV 测试...")
self.start_lsv(**cmd_dict)
elif method == "ca":
# CA
self.start_ca(**cmd_dict)
elif method == "cp":
# CP
self.start_cp(**cmd_dict)
else:
# Default to EIS
print("执行 EIS 测试...")
self.start_eis(**cmd_dict)
print(f"实验完成,数据已保存到 {save_root}")
self.success = True
except Exception as e:
print(f"命令执行失败: {e}")
raise RuntimeError(f"error: {e}")

View File

@@ -0,0 +1,115 @@
import json
import os
import clr
import pandas as pd
# define class ATR30007
# this class is used to control the ATR30007 Raman workstation
class ATR30007:
def __init__(self, dll_path: str = r'D:\Raman_RS'):
self.dll_path = dll_path
self.machine = None
self.status = "Idle"
self._load_dll()
def _load_dll(self):
# 检查ECCore.dll是否存在
eccore_dll_path = os.path.join(self.dll_path, 'ATRWrapper.dll')
if not os.path.exists(eccore_dll_path):
raise FileNotFoundError(f"ATRWrapper.dll不存在于路径 {eccore_dll_path}")
# 将DLL路径添加到系统PATH的最前面
os.environ["PATH"] = self.dll_path + os.pathsep + os.environ["PATH"]
# 添加引用
try:
clr.AddReference(eccore_dll_path)
from Optosky.Wrapper import ATRWrapper
self.machine = ATRWrapper()
print("成功添加ATRWrapper.dll引用导入ATRWrapper类并创建实例")
except Exception as e:
raise RuntimeError(f"添加DLL引用或导入ATRWrapper类时出错: {e}")
def get_machineSn(self):
machineSn = self.machine.GetSn()
print(f"选择的工作站ID: {machineSn}")
def start_Raman(self, IntegTime: int = 5000, LdPower: int = 200,
Ldwave: int = 1, CCDTemp: int = -5,
file_name: str = 'test_raman', save_root: str = None):
#打开仪器
On_flag = self.machine.OpenDevice()
print(f"On_flag: {On_flag}")
#获取仪器SN
wrapper_Sn = self.machine.GetSn()
print(f"wrapper_Sn: {wrapper_Sn}")
#设置当前设备的积分时间, 单位为毫秒
Integ_flag = self.machine.SetIntegrationTime(IntegTime)
print(f"Integ_flag:{Integ_flag}")
#设置激光功率, 单位mW
LdP_flag = self.machine.SetLdPower(LdPower,Ldwave)
print(f"LdP_flag:{LdP_flag}")
#设置 CCD 制冷温度
SetC_flag = self.machine.SetCool(CCDTemp)
print(f"SetC_flag:{SetC_flag}")
#开始采集光谱
self.status = "Running"
Spect = self.machine.AcquireSpectrum()
#开始采集光谱谱图数据转换
Spect_data = list(Spect.get_Data())
Spect_suss_flag = Spect.get_Success()
print(f"Spect_suss_flag:{Spect_suss_flag}")
#获取波数
WaveNum = list(self.machine.GetWaveNum())
#光谱数据基线校正
Spect_bLC = list(self.machine.BaseLineCorrect(Spect_data))
#对数据进行boxcar 平滑
Spect_StB = list(self.machine.SmoothBoxcar(Spect_bLC, 10))
self.status = "Idle"
#关闭仪器
OFF_flag = wrapper.CloseDevice()
print(f"OFF_flag: {OFF_flag}")
data_to_dataframe = {
"WaveNum": WaveNum,
"Spect_data": Spect_data,
"Spect_bLC": Spect_bLC,
"Spect_StB": Spect_StB
}
# 将数据转换为DataFrame格式
df = pd.DataFrame(data_to_dataframe)
self.save_data(df, file_name, save_root)
self.check_status()
def save_data(self, data: pd.DataFrame, file_name: str, save_root: str):
if not os.path.exists(save_root):
os.makedirs(save_root)
data.to_csv(os.path.join(save_root, f"{file_name}.csv"), index=False)
print(f"数据已保存到 {save_root}")
def opsky_cmd(self, command: str):
print(f"接收到命令: {command}")
# replace !=! to "
command = command.replace("!=!", "\"")
command = command.replace('\\', '\\\\')
command = command.replace("True", "true").replace("False", "false")
try:
cmd_dict = json.loads(command)
print(f"命令参数: {cmd_dict}")
# 解析命令参数
# file_name = cmd_dict.get("file_name", "test")
save_root = cmd_dict.get("save_root", r"D:\UniLab\results\250414")
# FIXME: use EIS for test. Add parameter for other tests
self.start_Raman(**cmd_dict) # , file_name=file_name, save_root=save_root
print(f"实验完成,数据已保存到 {save_root}")
except Exception as e:
print(f"命令执行失败: {e}")
raise f"error: {e}"

View File

@@ -0,0 +1,20 @@
# 电化学表征设备:电化学工作站、电池测试柜
XRD_station.Aolong:
description: Aolong XRD_station test
class:
module: unilabos.devices.XRD.XRD_test:XRD_Aolong_AL_Y3500
type: python
status_types:
samplein_status: Bool
sampleout_status: Bool
action_value_mappings:
NMR_cmd:
type: SendCmd
goal:
#command: wf_name
params: command
#resource: resource
feedback:
status: status
result:
success: success

View File

@@ -0,0 +1,44 @@
# 色谱表征设备
hplc.agilent:
description: HPLC device
class:
module: unilabos.devices.hplc.AgilentHPLC:HPLCDriver
type: python
status_types:
device_status: String
could_run: Bool
driver_init_ok: Bool
is_running: Bool
finish_status: String
status_text: String
action_value_mappings:
execute_command_from_outer:
type: SendCmd
goal:
command: command
feedback: {}
result:
success: success
schema:
properties:
device_status:
type: string
could_run:
type: boolean
driver_init_ok:
type: boolean
is_running:
type: boolean
finish_status:
type: string
status_text:
type: string
required:
- device_status
- could_run
- driver_init_ok
- is_running
- finish_status
- status_text
additionalProperties: false
type: object

View File

@@ -0,0 +1,19 @@
# 电化学表征设备:电化学工作站、电池测试柜
electrochem_station.DongHua:
description: Donghua electrochem_station
class:
module: unilabos.devices.electrochem.dh_7000:DH7000
type: python
status_types:
status: String
action_value_mappings:
dh_cmd:
type: WorkStationRun
goal:
wf_name: method
params: command
resource: resource
feedback:
status: status
result:
success: success

View File

@@ -1,5 +1,5 @@
# 光学表征设备:红外、紫外可见、拉曼等
raman_home_made:
raman.home_made:
description: Raman spectroscopy device
class:
module: unilabos.devices.raman_uv.home_made_raman:RamanObj
@@ -22,46 +22,24 @@ raman_home_made:
- status
additionalProperties: false
type: object
hplc.agilent:
description: HPLC device
raman.opsky_ATR30007:
description: Raman spectroscopy device
class:
module: unilabos.devices.hplc.AgilentHPLC:HPLCDriver
module: unilabos.devices.raman_uv.opsky_raman:ATR30007
type: python
status_types:
device_status: String
could_run: Bool
driver_init_ok: Bool
is_running: Bool
finish_status: String
status_text: String
status: String
action_value_mappings:
execute_command_from_outer:
type: SendCmd
start_Raman:
type: GetRaman
goal:
command: command
feedback: {}
result:
success: success
schema:
properties:
device_status:
type: string
could_run:
type: boolean
driver_init_ok:
type: boolean
is_running:
type: boolean
finish_status:
type: string
status_text:
type: string
required:
- device_status
- could_run
- driver_init_ok
- is_running
- finish_status
- status_text
additionalProperties: false
type: object
integtime: IntegTime
ldpower: LdPower
ldwave: LdWave
ccdtemp: CCDTemp
filename: filename
saveroot: saveroot
feedback:
status: status
result: {}

View File

@@ -90,6 +90,8 @@ set(action_files
"action/Evaporate.action"
"action/EvacuateAndRefill.action"
"action/GetRaman.action"
"action/WorkStationRun.action"
"action/AGVTransfer.action"
)

View File

@@ -0,0 +1,13 @@
# Goal
int32 integtime
int32 ldpower
bool ldwave
float64 ccdtemp
string filename
string saveroot
---
# Result
string success
---
# Feedback
string status