diff --git a/unilabos/devices/cytomat/cytomat.py b/unilabos/devices/cytomat/cytomat.py new file mode 100644 index 00000000..454111de --- /dev/null +++ b/unilabos/devices/cytomat/cytomat.py @@ -0,0 +1,61 @@ +import serial +import time + +ser = serial.Serial( + port="COM18", + baudrate=9600, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=15, + +def send_cmd(cmd: str, wait: float = 1.0) -> str: + """向 Cytomat 发送一行命令并打印/返回响应。""" + print(f">>> {cmd}") + ser.write((cmd + "\r").encode("ascii")) + time.sleep(wait) + resp = ser.read_all().decode("ascii", errors="ignore").strip() + print(f"<<< {resp or ''}") + return resp + +def initialize(): + """设备初始化 (ll:in)。""" + return send_cmd("ll:in") + +def wp_to_storage(pos: int): + """WP → 库位。pos: 1–9999 绝对地址。""" + return send_cmd(f"mv:ws {pos:04d}") + +def storage_to_tfs(stacker: int, level: int): + """库位 → TFS1。""" + return send_cmd(f"mv:st {stacker:02d} {level:02d}") + +def get_basic_state(): + """查询 Basic State Register。""" + return send_cmd("ch:bs") + +def set_pitch(stacker: int, pitch_mm: int): + """设置单个 stacker 的层间距(mm)。""" + return send_cmd(f"se:cs {stacker:02d} {pitch_mm}") + +def tfs_to_storage(stacker: int, level: int): + """TFS1 → 库位。""" + return send_cmd(f"mv:ts {stacker:02d} {level:02d}") + +# ---------- 示例工作流 ---------- +if __name__ == "__main__": + try: + if not ser.is_open: + ser.open() + initialize() + wp_to_storage(10) + storage_to_tfs(17, 3) + get_basic_state() + tfs_to_storage(7, 5) + + except Exception as exc: + print("Error:", exc) + finally: + ser.close() + print("Done.") + diff --git a/unilabos/devices/sealer/sealer.py b/unilabos/devices/sealer/sealer.py new file mode 100644 index 00000000..f2149279 --- /dev/null +++ b/unilabos/devices/sealer/sealer.py @@ -0,0 +1,100 @@ +import serial, time, re + +class SimpleSealer: + """ + It purposely skips CRC/ACK handling and sends raw commands of the form + '**00??[=xxxx]zz!'. Good enough for quick experiments or automation + scripts where robustness is less critical. + + Example + ------- + >>> sealer = SimpleSealer("COM24") + >>> sealer.set_temperature(160) # 160 °C + >>> sealer.set_time(2.0) # 2 s + >>> sealer.seal_cycle() # wait‑heat‑seal‑eject + >>> sealer.close() + """ + T_RE = re.compile(r"\*T\d\d:\d\d:\d\d=(\d+),(\d),(\d),") + + def __init__(self, port: str, baud: int = 19200, thresh_c: int = 150): + self.port = port + self.baud = baud + self.thresh_c = thresh_c + self.ser = serial.Serial(port, baud, timeout=0.3) + self.ser.reset_input_buffer() + + # ---------- low‑level helpers ---------- + def _send(self, raw: str): + """Write an already‑formed ASCII command, e.g. '**00DH=0160zz!'.""" + self.ser.write(raw.encode()) + print(">>>", raw) + + def _read_frame(self) -> str: + """Read one frame (ending at '!') and strip the terminator.""" + return self.ser.read_until(b'!').decode(errors='ignore').strip() + + # ---------- high‑level commands ---------- + def set_temperature(self, celsius: int): + self._send(f"**00DH={celsius:04d}zz!") + + def set_time(self, seconds: float): + units = int(round(seconds * 10)) + self._send(f"**00DT={units:04d}zz!") + + def open_drawer(self): + self._send("**00MOzz!") + + def close_drawer(self): + self._send("**00MCzz!") + + def seal(self): + self._send("**00GSzz!") + + # ---------- waits ---------- + def wait_temp(self): + print(f"[Waiting ≥{self.thresh_c}°C]") + while True: + frame = self._read_frame() + if frame.startswith("*T"): + m = self.T_RE.match(frame) + if not m: + continue + temp = int(m.group(1)) / 10 + blk = int(m.group(3)) # 1=Ready,4=Converging + print(f"\rTemp={temp:5.1f}°C | Block={blk}", end="") + if temp >= self.thresh_c and blk in (1, 0, 4): + print(" <-- OK") + return + + def wait_finish(self): + while True: + frame = self._read_frame() + if frame.startswith("*T"): + parts = frame.split('=')[1].split(',') + status = int(parts[1]) + cnt = int(parts[6][: -3] if parts[6].endswith("!") else parts[6]) + print(f"\rRemaining {cnt/10:4.1f}s", end="") + if status == 4: + print("\n[Seal Done]") + return + + # ---------- convenience helpers ---------- + def seal_cycle(self): + """Full cycle: wait heat, seal, wait finish, eject drawer.""" + time.sleep(10) + self.seal() + self.open_drawer() + + def close(self): + self.ser.close() + + +if __name__ == "__main__": + # Quick demo usage (modify COM port and parameters as needed) + sealer = SimpleSealer("COM24") + try: + sealer.set_temperature(160) # °C + sealer.set_time(2.0) # seconds + sealer.seal_cycle() + finally: + sealer.close() \ No newline at end of file diff --git a/unilabos/devices/xpeel/xpeel.py b/unilabos/devices/xpeel/xpeel.py new file mode 100644 index 00000000..a67cc1c3 --- /dev/null +++ b/unilabos/devices/xpeel/xpeel.py @@ -0,0 +1,59 @@ +import serial +import time + +class SealRemoverController: + def __init__(self, port='COM17', baudrate=9600, timeout=2): + self.ser = serial.Serial(port, baudrate, timeout=timeout) + + def send_command(self, command): + full_cmd = f"{command}\r\n".encode('ascii') + self.ser.write(full_cmd) + time.sleep(0.5) # 稍等设备响应 + return self.read_response() + + def read_response(self): + lines = [] + while self.ser.in_waiting: + line = self.ser.readline().decode('ascii').strip() + lines.append(line) + return lines + + def reset(self): + return self.send_command("*reset") + + def restart(self): + return self.send_command("*restart") + + def check_status(self): + return self.send_command("*stat") + + def move_in(self): + return self.send_command("*movein") + + def move_out(self): + return self.send_command("*moveout") + + def move_up(self): + return self.send_command("*moveup") + + def move_down(self): + return self.send_command("*movedown") + + def peel(self, param_set=1, adhere_time=1): + # param_set: 1~9, adhere_time: 1(2.5s) ~ 4(10s) + return self.send_command(f"*xpeel:{param_set}{adhere_time}") + + def check_tape(self): + return self.send_command("*tapeleft") + + def close(self): + self.ser.close() + +if __name__ == "__main__": + remover = SealRemoverController(port='COM17') + remover.restart # "restart" + remover.check_status() # "检查状态:" + remover.reset() # 复位设备 + remover.peel(param_set=4, adhere_time=1) #执行撕膜操作,慢速+2.5s + remover.move_out() # 送出板子 + remover.close() \ No newline at end of file