mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-14 13:14:39 +00:00
封膜仪、撕膜仪、耗材站接口
This commit is contained in:
61
unilabos/devices/cytomat/cytomat.py
Normal file
61
unilabos/devices/cytomat/cytomat.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import serial
|
||||
import time
|
||||
|
||||
ser = serial.Serial(
|
||||
port="COM18",
|
||||
baudrate=9600,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
timeout=15,
|
||||
|
||||
def send_cmd(cmd: str, wait: float = 1.0) -> str:
|
||||
"""向 Cytomat 发送一行命令并打印/返回响应。"""
|
||||
print(f">>> {cmd}")
|
||||
ser.write((cmd + "\r").encode("ascii"))
|
||||
time.sleep(wait)
|
||||
resp = ser.read_all().decode("ascii", errors="ignore").strip()
|
||||
print(f"<<< {resp or '<no response>'}")
|
||||
return resp
|
||||
|
||||
def initialize():
|
||||
"""设备初始化 (ll:in)。"""
|
||||
return send_cmd("ll:in")
|
||||
|
||||
def wp_to_storage(pos: int):
|
||||
"""WP → 库位。pos: 1–9999 绝对地址。"""
|
||||
return send_cmd(f"mv:ws {pos:04d}")
|
||||
|
||||
def storage_to_tfs(stacker: int, level: int):
|
||||
"""库位 → TFS1。"""
|
||||
return send_cmd(f"mv:st {stacker:02d} {level:02d}")
|
||||
|
||||
def get_basic_state():
|
||||
"""查询 Basic State Register。"""
|
||||
return send_cmd("ch:bs")
|
||||
|
||||
def set_pitch(stacker: int, pitch_mm: int):
|
||||
"""设置单个 stacker 的层间距(mm)。"""
|
||||
return send_cmd(f"se:cs {stacker:02d} {pitch_mm}")
|
||||
|
||||
def tfs_to_storage(stacker: int, level: int):
|
||||
"""TFS1 → 库位。"""
|
||||
return send_cmd(f"mv:ts {stacker:02d} {level:02d}")
|
||||
|
||||
# ---------- 示例工作流 ----------
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
if not ser.is_open:
|
||||
ser.open()
|
||||
initialize()
|
||||
wp_to_storage(10)
|
||||
storage_to_tfs(17, 3)
|
||||
get_basic_state()
|
||||
tfs_to_storage(7, 5)
|
||||
|
||||
except Exception as exc:
|
||||
print("Error:", exc)
|
||||
finally:
|
||||
ser.close()
|
||||
print("Done.")
|
||||
|
||||
100
unilabos/devices/sealer/sealer.py
Normal file
100
unilabos/devices/sealer/sealer.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import serial, time, re
|
||||
|
||||
class SimpleSealer:
|
||||
"""
|
||||
It purposely skips CRC/ACK handling and sends raw commands of the form
|
||||
'**00??[=xxxx]zz!'. Good enough for quick experiments or automation
|
||||
scripts where robustness is less critical.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> sealer = SimpleSealer("COM24")
|
||||
>>> sealer.set_temperature(160) # 160 °C
|
||||
>>> sealer.set_time(2.0) # 2 s
|
||||
>>> sealer.seal_cycle() # wait‑heat‑seal‑eject
|
||||
>>> sealer.close()
|
||||
"""
|
||||
T_RE = re.compile(r"\*T\d\d:\d\d:\d\d=(\d+),(\d),(\d),")
|
||||
|
||||
def __init__(self, port: str, baud: int = 19200, thresh_c: int = 150):
|
||||
self.port = port
|
||||
self.baud = baud
|
||||
self.thresh_c = thresh_c
|
||||
self.ser = serial.Serial(port, baud, timeout=0.3)
|
||||
self.ser.reset_input_buffer()
|
||||
|
||||
# ---------- low‑level helpers ----------
|
||||
def _send(self, raw: str):
|
||||
"""Write an already‑formed ASCII command, e.g. '**00DH=0160zz!'."""
|
||||
self.ser.write(raw.encode())
|
||||
print(">>>", raw)
|
||||
|
||||
def _read_frame(self) -> str:
|
||||
"""Read one frame (ending at '!') and strip the terminator."""
|
||||
return self.ser.read_until(b'!').decode(errors='ignore').strip()
|
||||
|
||||
# ---------- high‑level commands ----------
|
||||
def set_temperature(self, celsius: int):
|
||||
self._send(f"**00DH={celsius:04d}zz!")
|
||||
|
||||
def set_time(self, seconds: float):
|
||||
units = int(round(seconds * 10))
|
||||
self._send(f"**00DT={units:04d}zz!")
|
||||
|
||||
def open_drawer(self):
|
||||
self._send("**00MOzz!")
|
||||
|
||||
def close_drawer(self):
|
||||
self._send("**00MCzz!")
|
||||
|
||||
def seal(self):
|
||||
self._send("**00GSzz!")
|
||||
|
||||
# ---------- waits ----------
|
||||
def wait_temp(self):
|
||||
print(f"[Waiting ≥{self.thresh_c}°C]")
|
||||
while True:
|
||||
frame = self._read_frame()
|
||||
if frame.startswith("*T"):
|
||||
m = self.T_RE.match(frame)
|
||||
if not m:
|
||||
continue
|
||||
temp = int(m.group(1)) / 10
|
||||
blk = int(m.group(3)) # 1=Ready,4=Converging
|
||||
print(f"\rTemp={temp:5.1f}°C | Block={blk}", end="")
|
||||
if temp >= self.thresh_c and blk in (1, 0, 4):
|
||||
print(" <-- OK")
|
||||
return
|
||||
|
||||
def wait_finish(self):
|
||||
while True:
|
||||
frame = self._read_frame()
|
||||
if frame.startswith("*T"):
|
||||
parts = frame.split('=')[1].split(',')
|
||||
status = int(parts[1])
|
||||
cnt = int(parts[6][: -3] if parts[6].endswith("!") else parts[6])
|
||||
print(f"\rRemaining {cnt/10:4.1f}s", end="")
|
||||
if status == 4:
|
||||
print("\n[Seal Done]")
|
||||
return
|
||||
|
||||
# ---------- convenience helpers ----------
|
||||
def seal_cycle(self):
|
||||
"""Full cycle: wait heat, seal, wait finish, eject drawer."""
|
||||
time.sleep(10)
|
||||
self.seal()
|
||||
self.open_drawer()
|
||||
|
||||
def close(self):
|
||||
self.ser.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Quick demo usage (modify COM port and parameters as needed)
|
||||
sealer = SimpleSealer("COM24")
|
||||
try:
|
||||
sealer.set_temperature(160) # °C
|
||||
sealer.set_time(2.0) # seconds
|
||||
sealer.seal_cycle()
|
||||
finally:
|
||||
sealer.close()
|
||||
59
unilabos/devices/xpeel/xpeel.py
Normal file
59
unilabos/devices/xpeel/xpeel.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import serial
|
||||
import time
|
||||
|
||||
class SealRemoverController:
|
||||
def __init__(self, port='COM17', baudrate=9600, timeout=2):
|
||||
self.ser = serial.Serial(port, baudrate, timeout=timeout)
|
||||
|
||||
def send_command(self, command):
|
||||
full_cmd = f"{command}\r\n".encode('ascii')
|
||||
self.ser.write(full_cmd)
|
||||
time.sleep(0.5) # 稍等设备响应
|
||||
return self.read_response()
|
||||
|
||||
def read_response(self):
|
||||
lines = []
|
||||
while self.ser.in_waiting:
|
||||
line = self.ser.readline().decode('ascii').strip()
|
||||
lines.append(line)
|
||||
return lines
|
||||
|
||||
def reset(self):
|
||||
return self.send_command("*reset")
|
||||
|
||||
def restart(self):
|
||||
return self.send_command("*restart")
|
||||
|
||||
def check_status(self):
|
||||
return self.send_command("*stat")
|
||||
|
||||
def move_in(self):
|
||||
return self.send_command("*movein")
|
||||
|
||||
def move_out(self):
|
||||
return self.send_command("*moveout")
|
||||
|
||||
def move_up(self):
|
||||
return self.send_command("*moveup")
|
||||
|
||||
def move_down(self):
|
||||
return self.send_command("*movedown")
|
||||
|
||||
def peel(self, param_set=1, adhere_time=1):
|
||||
# param_set: 1~9, adhere_time: 1(2.5s) ~ 4(10s)
|
||||
return self.send_command(f"*xpeel:{param_set}{adhere_time}")
|
||||
|
||||
def check_tape(self):
|
||||
return self.send_command("*tapeleft")
|
||||
|
||||
def close(self):
|
||||
self.ser.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
remover = SealRemoverController(port='COM17')
|
||||
remover.restart # "restart"
|
||||
remover.check_status() # "检查状态:"
|
||||
remover.reset() # 复位设备
|
||||
remover.peel(param_set=4, adhere_time=1) #执行撕膜操作,慢速+2.5s
|
||||
remover.move_out() # 送出板子
|
||||
remover.close()
|
||||
Reference in New Issue
Block a user