Files
Uni-Lab-OS/unilabos/devices/cnc/grbl_sync.py
Junhan Chang c78ac482d8 Initial commit
2025-04-17 15:19:47 +08:00

206 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import asyncio
from threading import Event, Lock
from enum import Enum
from dataclasses import dataclass
import re
import time
from typing import Any, Union, Optional, overload
import serial.tools.list_ports
from serial import Serial
from serial.serialutil import SerialException
from unilabos.messages import Point3D
class GrblCNCConnectionError(Exception):
pass
@dataclass(frozen=True, kw_only=True)
class GrblCNCInfo:
port: str
address: str = "1"
limits: tuple[int, int, int, int, int, int] = (-150, 150, -200, 0, -80, 0)
def create(self):
return GrblCNC(self.port, self.address, self.limits)
class GrblCNC:
_status: str = "Offline"
_position: Point3D = Point3D(x=0.0, y=0.0, z=0.0)
_spindle_speed: float = 0.0
def __init__(self, port: str, address: str = "1", limits: tuple[int, int, int, int, int, int] = (-150, 150, -200, 0, -80, 0)):
self.port = port
self.address = address
self.limits = limits
try:
self._serial = Serial(
baudrate=115200,
port=port
)
except (OSError, SerialException) as e:
raise GrblCNCConnectionError from e
self._busy = False
self._closing = False
self._pose_number = self.pose_number_remaining = -1
self._query_lock = Lock()
self._run_lock = Lock()
self._error_event = Event()
def _read_all(self):
data = self._serial.read_until(b"\n")
data_decoded = data.decode()
while not "ok" in data_decoded and not "Grbl" in data_decoded:
data += self._serial.read_until(b"\n")
data_decoded = data.decode()
return data
@overload
def _query(self, command: str, dtype: type[bool]) -> bool:
pass
@overload
def _query(self, command: str, dtype: type[int]) -> int:
pass
@overload
def _query(self, command: str, dtype = None) -> str:
pass
def _query(self, command: str, dtype: Optional[type] = None):
with self._query_lock:
if self._closing or self._error_event.is_set():
raise GrblCNCConnectionError
self._read_extra_line = command.startswith("?")
run = ''
full_command = f"{command}{run}\n"
full_command_data = bytearray(full_command, 'ascii')
try:
# await asyncio.to_thread(lambda: self._serial.write(full_command_data))
self._serial.write(full_command_data)
time.sleep(0.1)
return self._receive(self._read_all())
except (SerialException, asyncio.TimeoutError) as e:
self._error_event.set()
raise GrblCNCConnectionError from e
def _receive(self, data: bytes):
ascii_string = "".join(chr(byte) for byte in data)
was_busy = self._busy
self._busy = "Idle" not in ascii_string
return ascii_string
def _run(self, command: str):
with self._run_lock:
try:
self._query(command)
while True:
time.sleep(0.2) # Wait for 0.5 seconds before polling again
status = self.get_status()
if "Idle" in status:
break
except:
self._error_event.set()
def initialize(self):
time.sleep(0.5)
self._run("G0X0Y0Z0")
status = self.get_status()
return status
# Operations
# Status Queries
@property
def status(self) -> str:
return self._status
def get_status(self):
__pos_pattern__ = re.compile('.Pos:(\-?\d+\.\d+),(\-?\d+\.\d+),(\-?\d+\.\d+)')
__status_pattern__ = re.compile('<([a-zA-Z]+),')
response = self._query("?")
pat = re.search(__pos_pattern__, response)
if pat is not None:
pos = pat.group().split(":")[1].split(",")
self._status = re.search(__status_pattern__, response).group(1).lstrip("<").rstrip(",")
self._position = Point3D(x=float(pos[0]), y=float(pos[1]), z=float(pos[2]))
return self.status
# Position Setpoint and Queries
@property
def position(self) -> Point3D:
# 由于此时一定调用过 get_status所以 position 一定是被更新过的
return self._position
def get_position(self):
return self.position
def set_position(self, position: Point3D):
"""
Move to absolute position (unit: mm)
Args:
x, y, z: float
Returns:
None
"""
x = max(self.limits[0], min(self.limits[1], position.x))
y = max(self.limits[2], min(self.limits[3], position.y))
z = max(self.limits[4], min(self.limits[5], position.z))
return self._run(f"G0X{x:.3f}Y{y:.3f}Z{z:.3f}")
def move_through_points(self, positions: list[Point3D]):
for i, point in enumerate(positions):
self._pose_number = i
self.pose_number_remaining = len(positions) - i
self.set_position(point)
time.sleep(0.5)
self._pose_number = -1
@property
def spindle_speed(self) -> float:
return self._spindle_speed
# def get_spindle_speed(self):
# self._spindle_speed = float(self._query("M3?"))
# return self.spindle_speed
def set_spindle_speed(self, spindle_speed: float, max_velocity: float = 500):
if spindle_speed < 0:
spindle_speed = 0
self._run("M5")
else:
spindle_speed = min(max_velocity, spindle_speed)
self._run(f"M3 S{spindle_speed}")
self._spindle_speed = spindle_speed
def stop_operation(self):
return self._run("T")
# Queries
async def wait_error(self):
await self._error_event.wait()
@staticmethod
def list():
for item in serial.tools.list_ports.comports():
yield GrblCNCInfo(port=item.device)