mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-18 05:21:19 +00:00
Update runze pump format
This commit is contained in:
@@ -1,10 +1,9 @@
|
|||||||
import asyncio
|
|
||||||
from threading import Lock, Event
|
from threading import Lock, Event
|
||||||
from enum import Enum
|
|
||||||
from dataclasses import dataclass
|
|
||||||
import time
|
import time
|
||||||
import traceback
|
from dataclasses import dataclass
|
||||||
from typing import Any, Union, Optional, overload
|
from enum import Enum
|
||||||
|
from threading import Lock, Event
|
||||||
|
from typing import Union, Optional
|
||||||
|
|
||||||
import serial.tools.list_ports
|
import serial.tools.list_ports
|
||||||
from serial import Serial
|
from serial import Serial
|
||||||
@@ -100,10 +99,7 @@ class RunzeSyringePump:
|
|||||||
# baudrate=9600,
|
# baudrate=9600,
|
||||||
# port=port
|
# port=port
|
||||||
# )
|
# )
|
||||||
self.hardware_interface = Serial(
|
self.hardware_interface = Serial(baudrate=9600, port=port)
|
||||||
baudrate=9600,
|
|
||||||
port=port
|
|
||||||
)
|
|
||||||
|
|
||||||
except (OSError, SerialException) as e:
|
except (OSError, SerialException) as e:
|
||||||
# raise RunzeSyringePumpConnectionError from e
|
# raise RunzeSyringePumpConnectionError from e
|
||||||
@@ -120,7 +116,7 @@ class RunzeSyringePump:
|
|||||||
self.total_steps_vel = 48000 if self.mode == RunzeSyringePumpMode.AccuratePosVel else 6000
|
self.total_steps_vel = 48000 if self.mode == RunzeSyringePumpMode.AccuratePosVel else 6000
|
||||||
|
|
||||||
def send_command(self, full_command: str):
|
def send_command(self, full_command: str):
|
||||||
full_command_data = bytearray(full_command, 'ascii')
|
full_command_data = bytearray(full_command, "ascii")
|
||||||
response = self.hardware_interface.write(full_command_data)
|
response = self.hardware_interface.write(full_command_data)
|
||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
output = self._receive(self.hardware_interface.read_until(b"\n"))
|
output = self._receive(self.hardware_interface.read_until(b"\n"))
|
||||||
@@ -131,7 +127,7 @@ class RunzeSyringePump:
|
|||||||
if self._closing:
|
if self._closing:
|
||||||
raise RunzeSyringePumpConnectionError
|
raise RunzeSyringePumpConnectionError
|
||||||
|
|
||||||
run = 'R' if not "?" in command else ''
|
run = "R" if "?" not in command else ""
|
||||||
full_command = f"/{self.address}{command}{run}\r\n"
|
full_command = f"/{self.address}{command}{run}\r\n"
|
||||||
|
|
||||||
output = self.send_command(full_command)[3:-3]
|
output = self.send_command(full_command)[3:-3]
|
||||||
@@ -161,7 +157,7 @@ class RunzeSyringePump:
|
|||||||
time.sleep(0.5) # Wait for 0.5 seconds before polling again
|
time.sleep(0.5) # Wait for 0.5 seconds before polling again
|
||||||
|
|
||||||
status = self.get_status()
|
status = self.get_status()
|
||||||
if status == 'Idle':
|
if status == "Idle":
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
pass
|
pass
|
||||||
@@ -275,11 +271,11 @@ class RunzeSyringePump:
|
|||||||
return self._valve_position
|
return self._valve_position
|
||||||
|
|
||||||
def set_valve_position(self, position: Union[int, str, float]):
|
def set_valve_position(self, position: Union[int, str, float]):
|
||||||
if type(position) == float:
|
if isinstance(position, float):
|
||||||
position = round(position / 120)
|
position = round(position / 120)
|
||||||
command = f"I{position}" if type(position) == int or ord(position) <= 57 else position.upper()
|
command = f"I{position}" if isinstance(position, int) or ord(position) <= 57 else position.upper()
|
||||||
response = self._run(command)
|
response = self._run(command)
|
||||||
self._valve_position = f"{position}" if type(position) == int or ord(position) <= 57 else position.upper()
|
self._valve_position = f"{position}" if isinstance(position, int) or ord(position) <= 57 else position.upper()
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def get_valve_position(self) -> str:
|
def get_valve_position(self) -> str:
|
||||||
|
|||||||
@@ -176,6 +176,9 @@ class RunzeMultiplePump:
|
|||||||
return output
|
return output
|
||||||
|
|
||||||
def _receive(self, data: bytes) -> str:
|
def _receive(self, data: bytes) -> str:
|
||||||
|
"""
|
||||||
|
Do not change this method.
|
||||||
|
"""
|
||||||
if not data:
|
if not data:
|
||||||
return ""
|
return ""
|
||||||
ascii_string = "".join(chr(byte) for byte in data)
|
ascii_string = "".join(chr(byte) for byte in data)
|
||||||
|
|||||||
Reference in New Issue
Block a user