添加Raman和xrd相关代码

This commit is contained in:
WenzheG
2025-11-05 19:58:51 +08:00
committed by Xuwznln
parent 5fc7eb7586
commit 8807865649
9 changed files with 2486 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
{
"nodes": [
{
"id": "opsky_ATR30007",
"name": "opsky_ATR30007",
"parent": null,
"type": "device",
"class": "opsky_ATR30007",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {},
"data": {},
"children": []
}
],
"links": []
}

View File

@@ -0,0 +1,71 @@
import socket
import time
import csv
from datetime import datetime
import threading
csv_lock = threading.Lock() # 防止多线程写CSV冲突
def scan_once(ip="192.168.1.50", port_in=2001, port_out=2002,
csv_file="scan_results.csv", timeout=5, retries=3):
"""
改进版扫码函数:
- 自动重试
- 全程超时保护
- 更安全的socket关闭
- 文件写入加锁
"""
def save_result(qrcode):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with csv_lock:
with open(csv_file, mode="a", newline="") as f:
writer = csv.writer(f)
writer.writerow([timestamp, qrcode])
print(f"✅ 已保存结果: {timestamp}, {qrcode}")
result = None
for attempt in range(1, retries + 1):
print(f"\n🟡 扫码尝试 {attempt}/{retries} ...")
try:
# -------- Step 1: 触发拍照 --------
with socket.create_connection((ip, port_in), timeout=2) as client_in:
cmd = "start"
client_in.sendall(cmd.encode("ascii")) #把字符串转为byte字节流规则是ascii码
print(f"→ 已发送触发指令: {cmd}")
# -------- Step 2: 等待识别结果 --------
with socket.create_connection((ip, port_out), timeout=timeout) as client_out:
print(f" 已连接相机输出端口 {port_out},等待结果...")
# recv最多阻塞timeout秒
client_out.settimeout(timeout)
data = client_out.recv(2048).decode("ascii", errors="ignore").strip() #结果输出为ascii字符串遇到无法解析的字节则忽略
# .strip():去掉字符串头尾的空白字符(包括 \n, \r, 空格等),便于后续判断是否为空或写入 CSV。
if data:
print(f"📷 识别结果: {data}")
save_result(data) #调用 save_result(data) 把时间戳 + 识别字符串写入 CSV线程安全
result = data #把局部变量 result 设为 data用于函数返回值
break #如果读取成功跳出重试循环for attempt in ...),不再进行后续重试。
else:
print("⚠️ 相机返回空数据,重试中...")
except socket.timeout:
print("⏰ 超时未收到识别结果,重试中...")
except ConnectionRefusedError:
print("❌ 无法连接到扫码器端口,请检查设备是否在线。")
except OSError as e:
print(f"⚠️ 网络错误: {e}")
except Exception as e:
print(f"❌ 未知异常: {e}")
time.sleep(0.5) # 两次扫描之间稍作延时
# -------- Step 3: 返回最终结果 --------
if result:
print(f"✅ 扫码成功:{result}")
else:
print("❌ 多次尝试后仍未获取二维码结果")
return result

View File

@@ -0,0 +1,398 @@
# opsky_atr30007.py
import logging
import time as time_mod
import csv
from datetime import datetime
from typing import Optional, Dict, Any
# 兼容 pymodbus 在不同版本中的位置与 API
try:
from pymodbus.client import ModbusTcpClient
except Exception:
ModbusTcpClient = None
# 导入 run_raman_test假定与本文件同目录
# 如果你的项目是包结构且原先使用相对导入,请改回 `from .raman_module import run_raman_test`
try:
from .raman_module import run_raman_test
except Exception:
# 延迟导入失败不会阻止主流程(在 run 时会再尝试)
run_raman_test = None
logger = logging.getLogger("opsky")
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%y-%m-%d %H:%M:%S")
ch.setFormatter(formatter)
logger.addHandler(ch)
class opsky_ATR30007:
"""
封装 UniLabOS 设备动作逻辑,兼容 pymodbus 2.x / 3.x。
放在独立文件中opsky_atr30007.py
"""
def __init__(
self,
plc_ip: str = "192.168.1.88",
plc_port: int = 502,
robot_ip: str = "192.168.1.200",
robot_port: int = 502,
scan_csv_file: str = "scan_results.csv",
):
self.plc_ip = plc_ip
self.plc_port = plc_port
self.robot_ip = robot_ip
self.robot_port = robot_port
self.scan_csv_file = scan_csv_file
# ----------------- 参数字符串转换 helpers -----------------
@staticmethod
def _str_to_int(s, default):
try:
return int(float(str(s).strip()))
except Exception:
return int(default)
@staticmethod
def _str_to_float(s, default):
try:
return float(str(s).strip())
except Exception:
return float(default)
@staticmethod
def _str_to_bool(s, default):
try:
v = str(s).strip().lower()
if v in ("true", "1", "yes", "y", "t"):
return True
if v in ("false", "0", "no", "n", "f"):
return False
return default
except Exception:
return default
# ----------------- Modbus / 安全读写 -----------------
@staticmethod
def _adapt_req_kwargs_for_read(func_name: str, args: tuple, kwargs: dict):
# 如果调用方传的是 (address, count) positional在新版接口可能是 address=..., count=...
if len(args) == 2 and func_name.startswith("read_"):
address, count = args
args = ()
kwargs.setdefault("address", address)
kwargs.setdefault("count", count)
return args, kwargs
@staticmethod
def _adapt_req_kwargs_for_write(func_name: str, args: tuple, kwargs: dict):
if len(args) == 2 and func_name.startswith("write_"):
address, value = args
args = ()
kwargs.setdefault("address", address)
kwargs.setdefault("value", value)
return args, kwargs
def ensure_connected(self, client, name, ip, port):
"""确保连接存在,失败则尝试重连并返回新的 client 或 None"""
if client is None:
return None
try:
# 不同 pymodbus 版本可能有不同方法检测 socket
is_open = False
try:
is_open = bool(client.is_socket_open())
except Exception:
# fallback: try to read nothing or attempt connection test
try:
# 轻试一次
is_open = client.connected if hasattr(client, "connected") else False
except Exception:
is_open = False
if not is_open:
logger.warning("%s 掉线,尝试重连...", name)
try:
client.close()
except Exception:
pass
time_mod.sleep(0.5)
if ModbusTcpClient:
new_client = ModbusTcpClient(ip, port=port)
try:
if new_client.connect():
logger.info("%s 重新连接成功 (%s:%s)", name, ip, port)
return new_client
except Exception:
pass
logger.warning("%s 重连失败", name)
time_mod.sleep(1)
return None
return client
except Exception as e:
logger.exception("%s 连接检查异常: %s", name, e)
return None
def safe_read(self, client, name, func, *args, retries=3, delay=0.3, **kwargs):
"""兼容 pymodbus 2.x/3.x 的读函数,返回 response 或 None"""
if client is None:
return None
for attempt in range(1, retries + 1):
try:
# adapt args/kwargs for different API styles
args, kwargs = self._adapt_req_kwargs_for_read(func.__name__, args, kwargs)
# unit->slave compatibility
if "unit" in kwargs:
kwargs["slave"] = kwargs.pop("unit")
res = func(*args, **kwargs)
# pymodbus Response 在不同版本表现不同,尽量检测错误
if res is None:
raise RuntimeError("返回 None")
if hasattr(res, "isError") and res.isError():
raise RuntimeError("Modbus 返回 isError()")
return res
except Exception as e:
logger.warning("%s 读异常 (尝试 %d/%d): %s", name, attempt, retries, e)
time_mod.sleep(delay)
logger.error("%s 连续读取失败 %d", name, retries)
return None
def safe_write(self, client, name, func, *args, retries=3, delay=0.3, **kwargs):
"""兼容 pymodbus 2.x/3.x 的写函数,返回 True/False"""
if client is None:
return False
for attempt in range(1, retries + 1):
try:
args, kwargs = self._adapt_req_kwargs_for_write(func.__name__, args, kwargs)
if "unit" in kwargs:
kwargs["slave"] = kwargs.pop("unit")
res = func(*args, **kwargs)
if res is None:
raise RuntimeError("返回 None")
if hasattr(res, "isError") and res.isError():
raise RuntimeError("Modbus 返回 isError()")
return True
except Exception as e:
logger.warning("%s 写异常 (尝试 %d/%d): %s", name, attempt, retries, e)
time_mod.sleep(delay)
logger.error("%s 连续写入失败 %d", name, retries)
return False
def wait_with_quit_check(self, robot, seconds, addr_quit=270):
"""等待指定时间,同时每 0.2s 检查 R270 是否为 1立即退出"""
if robot is None:
time_mod.sleep(seconds)
return False
checks = max(1, int(seconds / 0.2))
for _ in range(checks):
rr = self.safe_read(robot, "机器人", robot.read_holding_registers, address=addr_quit, count=1)
if rr and getattr(rr, "registers", [None])[0] == 1:
logger.info("检测到 R270=1立即退出等待")
return True
time_mod.sleep(0.2)
return False
# ----------------- 主流程 run_once -----------------
def run_once(
self,
integration_time: str = "5000",
laser_power: str = "200",
save_csv: str = "true",
save_plot: str = "true",
normalize: str = "true",
norm_max: str = "1.0",
**_: Any,
) -> Dict[str, Any]:
result: Dict[str, Any] = {"success": False, "event": "none", "details": {}}
integration_time_v = self._str_to_int(integration_time, 5000)
laser_power_v = self._str_to_int(laser_power, 200)
save_csv_v = self._str_to_bool(save_csv, True)
save_plot_v = self._str_to_bool(save_plot, True)
normalize_v = self._str_to_bool(normalize, True)
norm_max_v = None if norm_max in (None, "", "none", "null") else self._str_to_float(norm_max, 1.0)
if ModbusTcpClient is None:
result["details"]["error"] = "未安装 pymodbus无法执行连接"
logger.error(result["details"]["error"])
return result
# 建立连接
plc = ModbusTcpClient(self.plc_ip, port=self.plc_port)
robot = ModbusTcpClient(self.robot_ip, port=self.robot_port)
try:
if not plc.connect():
result["details"]["error"] = "无法连接 PLC"
logger.error(result["details"]["error"])
return result
if not robot.connect():
plc.close()
result["details"]["error"] = "无法连接 机器人"
logger.error(result["details"]["error"])
return result
logger.info("✅ PLC 与 机器人连接成功")
time_mod.sleep(0.2)
# 伺服使能 (coil 写示例)
if self.safe_write(plc, "PLC", plc.write_coil, 10, True):
logger.info("✅ 伺服使能成功 (M10=True)")
else:
logger.warning("⚠️ 伺服使能失败")
# 初始化 CSV 文件
try:
with open(self.scan_csv_file, "w", newline="", encoding="utf-8") as f:
csv.writer(f).writerow(["Bottle_No", "Scan_Result", "Time"])
except Exception as e:
logger.warning("⚠️ 初始化CSV失败: %s", e)
bottle_count = 0
logger.info("🟢 等待机器人触发信号... (R260=1扫码 / R256=1拉曼 / R270=1退出)")
# 主循环:仅响应事件(每次循环后短暂 sleep
while True:
plc = self.ensure_connected(plc, "PLC", self.plc_ip, self.plc_port) or plc
robot = self.ensure_connected(robot, "机器人", self.robot_ip, self.robot_port) or robot
# 检查退出寄存器
quit_signal = self.safe_read(robot, "机器人", robot.read_holding_registers, 270, 1)
if quit_signal and getattr(quit_signal, "registers", [None])[0] == 1:
logger.info("🟥 检测到 R270=1准备退出...")
result["event"] = "quit"
result["success"] = True
break
# 读取关键寄存器256..260
rr = self.safe_read(robot, "机器人", robot.read_holding_registers, 256, 5)
if not rr or not hasattr(rr, "registers"):
time_mod.sleep(0.3)
continue
r256, r257, r258, r259, r260 = (rr.registers + [0, 0, 0, 0, 0])[:5]
# ---------- 扫码逻辑 ----------
if r260 == 1:
bottle_count += 1
logger.info("📸 第 %d 瓶触发扫码 (R260=1)", bottle_count)
try:
# 调用外部扫码函数(用户实现)
from .dmqfengzhuang import scan_once as scan_once_local
scan_result = scan_once_local(ip="192.168.1.50", port_in=2001, port_out=2002)
if scan_result:
logger.info("✅ 扫码成功: %s", scan_result)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(self.scan_csv_file, "a", newline="", encoding="utf-8") as f:
csv.writer(f).writerow([bottle_count, scan_result, timestamp])
else:
logger.warning("⚠️ 扫码失败或无返回")
except Exception as e:
logger.exception("❌ 扫码异常: %s", e)
# 写 R260->0, R261->1
self.safe_write(robot, "机器人", robot.write_register, 260, 0)
time_mod.sleep(0.15)
self.safe_write(robot, "机器人", robot.write_register, 261, 1)
logger.info("➡️ 扫码完成 (R260→0, R261→1)")
result["event"] = "scan"
result["success"] = True
# ---------- 拉曼逻辑 ----------
if r256 == 1:
logger.info("⚙️ 检测到 R256=1放瓶完成")
# PLC 电机右转指令
self.safe_write(plc, "PLC", plc.write_register, 1199, 1)
self.safe_write(plc, "PLC", plc.write_register, 1200, 1)
logger.info("➡️ 电机右转中...")
if self.wait_with_quit_check(robot, 3):
result["event"] = "quit"
break
self.safe_write(plc, "PLC", plc.write_register, 1199, 0)
logger.info("✅ 电机右转完成")
# 调用拉曼测试(尽量捕获异常并记录)
logger.info("🧪 开始拉曼测试...")
try:
# 尝试使用模块导入好的 run_raman_test否则再动态导入
rr_func = run_raman_test
if rr_func is None:
from raman_module import run_raman_test as rr_func
success, file_prefix, df = rr_func(
integration_time=integration_time_v,
laser_power=laser_power_v,
save_csv=save_csv_v,
save_plot=save_plot_v,
normalize=normalize_v,
norm_max=norm_max_v,
)
if success:
logger.info("✅ 拉曼测试完成: %s", file_prefix)
result["event"] = "raman"
result["success"] = True
else:
logger.warning("⚠️ 拉曼测试失败")
except Exception as e:
logger.exception("❌ 拉曼模块异常: %s", e)
# 电机左转回位
self.safe_write(plc, "PLC", plc.write_register, address=1299, value=1)
self.safe_write(plc, "PLC", plc.write_register, address=1300, value=1)
logger.info("⬅️ 电机左转中...")
if self.wait_with_quit_check(robot, 3):
result["event"] = "quit"
break
self.safe_write(plc, "PLC", plc.write_register, address=1299, value=0)
logger.info("✅ 电机左转完成")
# 通知机器人拉曼完成 R257=1
self.safe_write(robot, "机器人", robot.write_register, address=257, value=1)
logger.info("✅ 已写入 R257=1拉曼完成")
# 延迟后清零 R256
logger.info("⏳ 延迟4秒后清零 R256")
if self.wait_with_quit_check(robot, 4):
result["event"] = "quit"
break
self.safe_write(robot, "机器人", robot.write_register, address=256, value=0)
logger.info("✅ 已清零 R256")
# 等待机器人清 R257
logger.info("等待 R257 清零中...")
while True:
rr2 = self.safe_read(robot, "机器人", robot.read_holding_registers, address=257, count=1)
if rr2 and getattr(rr2, "registers", [None])[0] == 0:
logger.info("✅ 检测到 R257=0准备下一循环")
break
if self.wait_with_quit_check(robot, 1):
result["event"] = "quit"
break
time_mod.sleep(0.2)
time_mod.sleep(0.25)
finally:
logger.info("🧹 开始清理...")
try:
self.safe_write(plc, "PLC", plc.write_coil, address=10, value=False)
except Exception:
pass
for addr in [256, 257, 260, 261, 270]:
try:
self.safe_write(robot, "机器人", robot.write_register, address=addr, value=0)
except Exception:
pass
try:
if plc:
plc.close()
except Exception:
pass
try:
if robot:
robot.close()
except Exception:
pass
logger.info("🔚 已关闭所有连接")
return result

View File

@@ -0,0 +1,180 @@
# raman_module.py
import os
import time as time_mod
import numpy as np
import pandas as pd
# clr / ATRWrapper 依赖:在真实环境中使用 Windows + .NET wrapper
# 本模块对缺少 clr 或 Wrapper 的情况提供“仿真”回退,方便离线/调试运行。
try:
import clr
has_clr = True
except Exception:
clr = None
has_clr = False
# 本函数返回 (success: bool, file_prefix: str|None, df: pandas.DataFrame|None)
def run_raman_test(integration_time=5000, laser_power=200,
save_csv=True, save_plot=True,
normalize=False, norm_max=None,
max_wavenum=1300):
"""
拉曼测试流程(更稳健的实现):
- 若能加载 ATRWrapper 则使用之
- 否则生成模拟光谱(方便调试)
返回 (success, file_prefix, df)
"""
timestamp = time_mod.strftime("%Y%m%d_%H%M%S")
file_prefix = f"raman_{timestamp}"
wrapper = None
used_real_device = False
try:
if has_clr:
try:
# 请根据你的 DLL 路径调整
dll_path = r"D:\Raman\Raman_RS\ATRWrapper\ATRWrapper.dll"
if os.path.exists(dll_path):
clr.AddReference(dll_path)
else:
# 试图直接 AddReference 名称(若已在 PATH
try:
clr.AddReference("ATRWrapper")
except Exception:
pass
from Optosky.Wrapper import ATRWrapper # May raise
wrapper = ATRWrapper()
used_real_device = True
except Exception as e:
# 无法加载真实 wrapper -> fallback
print("⚠️ 未能加载 ATRWrapper使用模拟数据。详细:", e)
wrapper = None
if wrapper is None:
# 生成模拟光谱(方便调试)
# 模拟波数轴 50..1300
WaveNum = np.linspace(50, max_wavenum, 1024)
# 合成几条高斯峰 + 噪声
def gauss(x, mu, sig, A):
return A * np.exp(-0.5 * ((x - mu) / sig) ** 2)
Spect_data = (gauss(WaveNum, 200, 8, 1000) +
gauss(WaveNum, 520, 12, 600) +
gauss(WaveNum, 810, 20, 400) +
50 * np.random.normal(scale=1.0, size=WaveNum.shape))
Spect_bLC = Spect_data - np.min(Spect_data) * 0.05 # 简单 baseline
Spect_smooth = np.convolve(Spect_bLC, np.ones(3) / 3, mode="same")
df = pd.DataFrame({
"WaveNum": WaveNum,
"Raw_Spect": Spect_data,
"BaseLineCorrected": Spect_bLC,
"Smooth_Spect": Spect_smooth
})
success = True
file_prefix = f"raman_sim_{timestamp}"
# 保存 CSV / 绘图 等同真实设备
else:
# 使用真实设备 API根据你提供的 wrapper 调用)
On_flag = wrapper.OpenDevice()
print("通讯连接状态:", On_flag)
if not On_flag:
wrapper.CloseDevice()
return False, None, None
wrapper.SetIntegrationTime(int(integration_time))
wrapper.SetLdPower(int(laser_power), 1)
# 可能的冷却设置(如果 wrapper 支持)
try:
wrapper.SetCool(-5)
except Exception:
pass
Spect = wrapper.AcquireSpectrum()
Spect_data = np.array(Spect.get_Data())
if not Spect.get_Success():
print("光谱采集失败")
try:
wrapper.CloseDevice()
except Exception:
pass
return False, None, None
WaveNum = np.array(wrapper.GetWaveNum())
Spect_bLC = np.array(wrapper.BaseLineCorrect(Spect_data))
Spect_smooth = np.array(wrapper.SmoothBoxcar(Spect_bLC, 3))
df = pd.DataFrame({
"WaveNum": WaveNum,
"Raw_Spect": Spect_data,
"BaseLineCorrected": Spect_bLC,
"Smooth_Spect": Spect_smooth
})
wrapper.CloseDevice()
success = True
# 如果需要限定波数范围
mask = df["WaveNum"] <= max_wavenum
df = df[mask].reset_index(drop=True)
# 可选归一化
if normalize:
arr = df["Smooth_Spect"].values
mn, mx = arr.min(), arr.max()
if mx == mn:
df["Smooth_Spect"] = 0.0
else:
scale = 1.0 if norm_max is None else float(norm_max)
df["Smooth_Spect"] = (arr - mn) / (mx - mn) * scale
# 同时处理其它列(可选)
arr_raw = df["Raw_Spect"].values
mn_r, mx_r = arr_raw.min(), arr_raw.max()
if mx_r == mn_r:
df["Raw_Spect"] = 0.0
else:
scale = 1.0 if norm_max is None else float(norm_max)
df["Raw_Spect"] = (arr_raw - mn_r) / (mx_r - mn_r) * scale
# 保存 CSV
if save_csv:
csv_filename = f"{file_prefix}.csv"
df.to_csv(csv_filename, index=False)
print("✅ CSV 文件已生成:", csv_filename)
# 绘图(使用 matplotlib注意不要启用 GUI 后台
if save_plot:
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 5))
plt.plot(df["WaveNum"], df["Raw_Spect"], linestyle='-', alpha=0.6, label="原始")
plt.plot(df["WaveNum"], df["BaseLineCorrected"], linestyle='--', alpha=0.8, label="基线校正")
plt.plot(df["WaveNum"], df["Smooth_Spect"], linewidth=1.2, label="平滑")
plt.xlabel("WaveNum (cm^-1)")
plt.ylabel("Intensity (a.u.)")
plt.title(f"Raman {file_prefix}")
plt.grid(True)
plt.legend()
plt.tight_layout()
plot_filename = f"{file_prefix}.png"
plt.savefig(plot_filename, dpi=300, bbox_inches="tight")
plt.close()
# 小短暂等待以确保文件系统刷新
time_mod.sleep(0.2)
print("✅ 图像已生成:", plot_filename)
except Exception as e:
print("⚠️ 绘图失败:", e)
return success, file_prefix, df
except Exception as e:
print("拉曼测试异常:", e)
try:
if wrapper is not None:
try:
wrapper.CloseDevice()
except Exception:
pass
except Exception:
pass
return False, None, None

View File

@@ -0,0 +1,209 @@
import time
import csv
from datetime import datetime
from pymodbus.client import ModbusTcpClient
from dmqfengzhuang import scan_once
from raman_module import run_raman_test
# =================== 配置 ===================
PLC_IP = "192.168.1.88"
PLC_PORT = 502
ROBOT_IP = "192.168.1.200"
ROBOT_PORT = 502
SCAN_CSV_FILE = "scan_results.csv"
# =================== 通用函数 ===================
def ensure_connected(client, name, ip, port):
if not client.is_socket_open():
print(f"{name} 掉线,正在重连...")
client.close()
time.sleep(1)
new_client = ModbusTcpClient(ip, port=port)
if new_client.connect():
print(f"{name} 重新连接成功 ({ip}:{port})")
return new_client
else:
print(f"{name} 重连失败,稍后重试...")
time.sleep(3)
return None
return client
def safe_read(client, name, func, *args, retries=3, delay=0.3, **kwargs):
for _ in range(retries):
try:
res = func(*args, **kwargs)
if res and not (hasattr(res, "isError") and res.isError()):
return res
except Exception as e:
print(f"{name} 读异常: {e}")
time.sleep(delay)
print(f"{name} 连续读取失败 {retries}")
return None
def safe_write(client, name, func, *args, retries=3, delay=0.3, **kwargs):
for _ in range(retries):
try:
res = func(*args, **kwargs)
if res and not (hasattr(res, "isError") and res.isError()):
return True
except Exception as e:
print(f"{name} 写异常: {e}")
time.sleep(delay)
print(f"{name} 连续写入失败 {retries}")
return False
def wait_with_quit_check(robot, seconds, addr_quit=270):
for _ in range(int(seconds / 0.2)):
rr = safe_read(robot, "机器人", robot.read_holding_registers,
address=addr_quit, count=1)
if rr and rr.registers[0] == 1:
print("检测到 R270=1立即退出循环")
return True
time.sleep(0.2)
return False
# =================== 初始化 ===================
plc = ModbusTcpClient(PLC_IP, port=PLC_PORT)
robot = ModbusTcpClient(ROBOT_IP, port=ROBOT_PORT)
if not plc.connect():
print("无法连接 PLC")
exit()
if not robot.connect():
print("无法连接 机器人")
plc.close()
exit()
print("✅ PLC 与 机器人连接成功")
time.sleep(0.5)
# 伺服使能
if safe_write(plc, "PLC", plc.write_coil, address=10, value=True):
print("✅ 伺服使能成功 (M10=True)")
else:
print("⚠️ 伺服使能失败")
# 初始化扫码 CSV
with open(SCAN_CSV_FILE, "w", newline="", encoding="utf-8") as f:
csv.writer(f).writerow(["Bottle_No", "Scan_Result", "Time"])
bottle_count = 0
print("🟢 等待机器人触发信号... (R260=1扫码 / R256=1拉曼 / R270=1退出)")
# =================== 主监听循环 ===================
while True:
plc = ensure_connected(plc, "PLC", PLC_IP, PLC_PORT) or plc
robot = ensure_connected(robot, "机器人", ROBOT_IP, ROBOT_PORT) or robot
# 退出命令检测
quit_signal = safe_read(robot, "机器人", robot.read_holding_registers,
address=270, count=1)
if quit_signal and quit_signal.registers[0] == 1:
print("🟥 检测到 R270=1准备退出程序...")
break
# 读取关键寄存器
rr = safe_read(robot, "机器人", robot.read_holding_registers,
address=256, count=5)
if not rr:
time.sleep(0.3)
continue
r256, _, r258, r259, r260 = rr.registers[:5]
# ----------- 扫码部分 (R260=1) -----------
if r260 == 1:
bottle_count += 1
print(f"📸 第 {bottle_count} 瓶触发扫码 (R260=1)")
try:
result = scan_once(ip="192.168.1.50", port_in=2001, port_out=2002)
if result:
print(f"✅ 扫码成功: {result}")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(SCAN_CSV_FILE, "a", newline="", encoding="utf-8") as f:
csv.writer(f).writerow([bottle_count, result, timestamp])
else:
print("⚠️ 扫码失败或无返回")
except Exception as e:
print(f"❌ 扫码异常: {e}")
safe_write(robot, "机器人", robot.write_register, address=260, value=0)
time.sleep(0.2)
safe_write(robot, "机器人", robot.write_register, address=261, value=1)
print("➡️ 扫码完成 (R260→0, R261→1)")
# ----------- 拉曼 + 电机部分 (R256=1) -----------
if r256 == 1:
print("⚙️ 检测到 R256=1放瓶完成")
# 电机右转
safe_write(plc, "PLC", plc.write_register, address=1199, value=1)
safe_write(plc, "PLC", plc.write_register, address=1200, value=1)
print("➡️ 电机右转中...")
if wait_with_quit_check(robot, 3):
break
safe_write(plc, "PLC", plc.write_register, address=1199, value=0)
print("✅ 电机右转完成")
# 拉曼测试
print("🧪 开始拉曼测试...")
try:
success, file_prefix, df = run_raman_test(
integration_time=5000,
laser_power=200,
save_csv=True,
save_plot=True,
normalize=True,
norm_max=1.0
)
if success:
print(f"✅ 拉曼完成:{file_prefix}.csv / .png")
else:
print("⚠️ 拉曼失败")
except Exception as e:
print(f"❌ 拉曼测试异常: {e}")
# 电机左转
safe_write(plc, "PLC", plc.write_register, address=1299, value=1)
safe_write(plc, "PLC", plc.write_register, address=1300, value=1)
print("⬅️ 电机左转中...")
if wait_with_quit_check(robot, 3):
break
safe_write(plc, "PLC", plc.write_register, address=1299, value=0)
print("✅ 电机左转完成")
# 写入拉曼完成信号
safe_write(robot, "机器人", robot.write_register, address=257, value=1)
print("✅ 已写入 R257=1拉曼完成")
# 延迟清零 R256
print("⏳ 延迟4秒后清零 R256")
if wait_with_quit_check(robot, 4):
break
safe_write(robot, "机器人", robot.write_register, address=256, value=0)
print("✅ 已清零 R256")
# 等待机器人清零 R257
print("等待 R257 清零中...")
while True:
rr2 = safe_read(robot, "机器人", robot.read_holding_registers, address=257, count=1)
if rr2 and rr2.registers[0] == 0:
print("✅ 检测到 R257=0准备下一循环")
break
if wait_with_quit_check(robot, 1):
break
time.sleep(0.2)
time.sleep(0.2)
# =================== 程序退出清理 ===================
print("🧹 开始清理...")
safe_write(plc, "PLC", plc.write_coil, address=10, value=False)
for addr in [256, 257, 260, 261, 270]:
safe_write(robot, "机器人", robot.write_register, address=addr, value=0)
plc.close()
robot.close()
print("✅ 程序已退出,设备全部复位。")