Merge branch 'workstation_dev_YB3' into workstation_dev_YB3

This commit is contained in:
Calvin Cao
2025-11-08 15:21:25 +08:00
committed by GitHub
34 changed files with 1344 additions and 881 deletions

View File

@@ -3,7 +3,8 @@
"""
import asyncio
from typing import Dict, Any, Optional, List
from typing import Dict, Any, List
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class SmartPumpController:
@@ -14,6 +15,8 @@ class SmartPumpController:
适用于实验室自动化系统中的液体处理任务。
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = "smart_pump_01", port: str = "/dev/ttyUSB0"):
"""
初始化智能泵控制器
@@ -30,6 +33,9 @@ class SmartPumpController:
self.calibration_factor = 1.0
self.pump_mode = "continuous" # continuous, volume, rate
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def connect_device(self, timeout: int = 10) -> bool:
"""
连接到泵设备
@@ -90,7 +96,7 @@ class SmartPumpController:
pump_time = (volume / flow_rate) * 60 # 转换为秒
self.current_flow_rate = flow_rate
await asyncio.sleep(min(pump_time, 3.0)) # 模拟泵送过程
await self._ros_node.sleep(min(pump_time, 3.0)) # 模拟泵送过程
self.total_volume_pumped += volume
self.current_flow_rate = 0.0
@@ -170,6 +176,8 @@ class AdvancedTemperatureController:
适用于需要精确温度控制的化学反应和材料处理过程。
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, controller_id: str = "temp_controller_01"):
"""
初始化温度控制器
@@ -185,6 +193,9 @@ class AdvancedTemperatureController:
self.pid_enabled = True
self.temperature_history: List[Dict] = []
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def set_target_temperature(self, temperature: float, rate: float = 10.0) -> bool:
"""
设置目标温度
@@ -238,7 +249,7 @@ class AdvancedTemperatureController:
}
)
await asyncio.sleep(step_time)
await self._ros_node.sleep(step_time)
# 保持历史记录不超过100条
if len(self.temperature_history) > 100:
@@ -330,6 +341,8 @@ class MultiChannelAnalyzer:
常用于光谱分析、电化学测量等应用场景。
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, analyzer_id: str = "analyzer_01", channels: int = 8):
"""
初始化多通道分析仪
@@ -344,6 +357,9 @@ class MultiChannelAnalyzer:
self.is_measuring = False
self.sample_rate = 1000 # Hz
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def configure_channel(self, channel: int, enabled: bool = True, unit: str = "V") -> bool:
"""
配置通道
@@ -376,7 +392,7 @@ class MultiChannelAnalyzer:
# 模拟数据采集
measurements = []
for second in range(duration):
for _ in range(duration):
timestamp = asyncio.get_event_loop().time()
frame_data = {}
@@ -391,7 +407,7 @@ class MultiChannelAnalyzer:
measurements.append({"timestamp": timestamp, "data": frame_data})
await asyncio.sleep(1.0) # 每秒采集一次
await self._ros_node.sleep(1.0) # 每秒采集一次
self.is_measuring = False
@@ -465,6 +481,8 @@ class AutomatedDispenser:
集成称重功能,确保分配精度和重现性。
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, dispenser_id: str = "dispenser_01"):
"""
初始化自动分配器
@@ -479,6 +497,9 @@ class AutomatedDispenser:
self.container_capacity = 1000.0 # mL
self.precision_mode = True
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def move_to_position(self, x: float, y: float, z: float) -> bool:
"""
移动到指定位置
@@ -517,7 +538,7 @@ class AutomatedDispenser:
if viscosity == "high":
dispense_time *= 2 # 高粘度液体需要更长时间
await asyncio.sleep(min(dispense_time, 5.0)) # 最多等待5秒
await self._ros_node.sleep(min(dispense_time, 5.0)) # 最多等待5秒
self.dispensed_total += volume

View File

@@ -13,7 +13,7 @@ def start_backend(
graph=None,
controllers_config: dict = {},
bridges=[],
without_host: bool = False,
is_slave: bool = False,
visual: str = "None",
resources_mesh_config: dict = {},
**kwargs,
@@ -32,7 +32,7 @@ def start_backend(
raise ValueError(f"Unsupported backend: {backend}")
backend_thread = threading.Thread(
target=main if not without_host else slave,
target=main if not is_slave else slave,
args=(
devices_config,
resources_config,

View File

@@ -375,22 +375,23 @@ def main():
args_dict["bridges"] = []
# 获取通信客户端仅支持WebSocket
comm_client = get_communication_client()
if "websocket" in args_dict["app_bridges"]:
args_dict["bridges"].append(comm_client)
if "fastapi" in args_dict["app_bridges"]:
args_dict["bridges"].append(http_client)
if "websocket" in args_dict["app_bridges"]:
# 获取通信客户端仅支持WebSocket
if BasicConfig.is_host_mode:
comm_client = get_communication_client()
if "websocket" in args_dict["app_bridges"]:
args_dict["bridges"].append(comm_client)
def _exit(signum, frame):
comm_client.stop()
sys.exit(0)
def _exit(signum, frame):
comm_client.stop()
sys.exit(0)
signal.signal(signal.SIGINT, _exit)
signal.signal(signal.SIGTERM, _exit)
comm_client.start()
else:
print_status("SlaveMode跳过Websocket连接")
signal.signal(signal.SIGINT, _exit)
signal.signal(signal.SIGTERM, _exit)
comm_client.start()
args_dict["resources_mesh_config"] = {}
args_dict["resources_edge_config"] = resource_edge_info
# web visiualize 2D

View File

@@ -1,11 +1,12 @@
import json
import time
from typing import Optional, Tuple, Dict, Any
from unilabos.utils.log import logger
from unilabos.utils.type_check import TypeEncoder
def register_devices_and_resources(lab_registry):
def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
"""
注册设备和资源到服务器仅支持HTTP
"""
@@ -28,6 +29,8 @@ def register_devices_and_resources(lab_registry):
resources_to_register[resource_info["id"]] = resource_info
logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}")
if gather_only:
return devices_to_register, resources_to_register
# 注册设备
if devices_to_register:
try:

View File

@@ -12,6 +12,7 @@ from serial import Serial
from serial.serialutil import SerialException
from unilabos.messages import Point3D
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class GrblCNCConnectionError(Exception):
@@ -32,6 +33,7 @@ class GrblCNCInfo:
class GrblCNCAsync:
_status: str = "Offline"
_position: Point3D = Point3D(x=0.0, y=0.0, z=0.0)
_ros_node: BaseROS2DeviceNode
def __init__(self, port: str, address: str = "1", limits: tuple[int, int, int, int, int, int] = (-150, 150, -200, 0, 0, 60)):
self.port = port
@@ -58,6 +60,9 @@ class GrblCNCAsync:
self._run_future: Optional[Future[Any]] = None
self._run_lock = Lock()
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def _read_all(self):
data = self._serial.read_until(b"\n")
data_decoded = data.decode()
@@ -148,7 +153,7 @@ class GrblCNCAsync:
try:
await self._query(command)
while True:
await asyncio.sleep(0.2) # Wait for 0.5 seconds before polling again
await self._ros_node.sleep(0.2) # Wait for 0.5 seconds before polling again
status = await self.get_status()
if "Idle" in status:
@@ -214,7 +219,7 @@ class GrblCNCAsync:
self._pose_number = i
self.pose_number_remaining = len(points) - i
await self.set_position(point)
await asyncio.sleep(0.5)
await self._ros_node.sleep(0.5)
self._step_number = -1
async def stop_operation(self):
@@ -235,7 +240,7 @@ class GrblCNCAsync:
async def open(self):
if self._read_task:
raise GrblCNCConnectionError
self._read_task = asyncio.create_task(self._read_loop())
self._read_task = self._ros_node.create_task(self._read_loop())
try:
await self.get_status()

View File

@@ -2,6 +2,8 @@ import time
import asyncio
from pydantic import BaseModel
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class Point3D(BaseModel):
x: float
@@ -14,9 +16,14 @@ def d(a: Point3D, b: Point3D) -> float:
class MockCNCAsync:
_ros_node: BaseROS2DeviceNode["MockCNCAsync"]
def __init__(self):
self._position: Point3D = Point3D(x=0.0, y=0.0, z=0.0)
self._status = "Idle"
def post_create(self, ros_node):
self._ros_node = ros_node
@property
def position(self) -> Point3D:
@@ -38,5 +45,5 @@ class MockCNCAsync:
self._position.x = current_pos.x + (position.x - current_pos.x) / 20 * (i+1)
self._position.y = current_pos.y + (position.y - current_pos.y) / 20 * (i+1)
self._position.z = current_pos.z + (position.z - current_pos.z) / 20 * (i+1)
await asyncio.sleep(move_time / 20)
await self._ros_node.sleep(move_time / 20)
self._status = "Idle"

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,11 @@
from __future__ import annotations
import re
import traceback
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
from collections import Counter
import asyncio
import time
import pprint as pp
import traceback
from collections import Counter
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod
from pylabrobot.liquid_handling.standard import GripDirection
@@ -25,6 +25,8 @@ from pylabrobot.resources import (
Tip,
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class LiquidHandlerMiddleware(LiquidHandler):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8):
@@ -536,6 +538,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
class LiquidHandlerAbstract(LiquidHandlerMiddleware):
"""Extended LiquidHandler with additional operations."""
support_touch_tip = True
_ros_node: BaseROS2DeviceNode
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool=False, channel_num:int = 8):
"""Initialize a LiquidHandler.
@@ -548,8 +551,11 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
self.group_info = dict()
super().__init__(backend, deck, simulator, channel_num)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
@classmethod
def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]):
def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]):
"""Set the liquid in a well."""
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
well.set_liquids([(liquid_name, volume)]) # type: ignore
@@ -1081,7 +1087,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
print(f"Waiting time: {msg}")
print(f"Current time: {time.strftime('%H:%M:%S')}")
print(f"Time to finish: {time.strftime('%H:%M:%S', time.localtime(time.time() + seconds))}")
await asyncio.sleep(seconds)
await self._ros_node.sleep(seconds)
if msg:
print(f"Done: {msg}")
print(f"Current time: {time.strftime('%H:%M:%S')}")

View File

@@ -30,6 +30,7 @@ from pylabrobot.liquid_handling.standard import (
from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class PRCXIError(RuntimeError):
@@ -162,6 +163,10 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num)
def post_init(self, ros_node: BaseROS2DeviceNode):
super().post_init(ros_node)
self._unilabos_backend.post_init(ros_node)
def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]):
return super().set_liquid(wells, liquid_names, volumes)
@@ -424,6 +429,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
_num_channels = 8 # 默认通道数为 8
_is_reset_ok = False
_ros_node: BaseROS2DeviceNode
@property
def is_reset_ok(self) -> bool:
@@ -456,6 +462,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self._execute_setup = setup
self.debug = debug
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def create_protocol(self, protocol_name):
self.protocol_name = protocol_name
self.steps_todo_list = []
@@ -500,7 +509,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.api_client.call("IAutomation", "Reset")
while not self.is_reset_ok:
print("Waiting for PRCXI9300 to reset...")
await asyncio.sleep(1)
await self._ros_node.sleep(1)
print("PRCXI9300 reset successfully.")
except ConnectionRefusedError as e:
raise RuntimeError(
@@ -533,7 +542,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8)
if len(set(tip_columns)) != 1:
raise ValueError("All pickups must be from the same tip column. Found different columns: " + str(tip_columns))
raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
)
PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1
hole_row = 1
@@ -1109,12 +1120,15 @@ class PRCXI9300Api:
"LiquidDispensingMethod": liquid_method,
}
class DefaultLayout:
def __init__(self, product_name: str = "PRCXI9300"):
self.labresource = {}
if product_name not in ["PRCXI9300", "PRCXI9320"]:
raise ValueError(f"Unsupported product_name: {product_name}. Only 'PRCXI9300' and 'PRCXI9320' are supported.")
raise ValueError(
f"Unsupported product_name: {product_name}. Only 'PRCXI9300' and 'PRCXI9320' are supported."
)
if product_name == "PRCXI9300":
self.rows = 2
@@ -1129,25 +1143,93 @@ class DefaultLayout:
self.layout = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
self.trash_slot = 16
self.waste_liquid_slot = 12
self.default_layout = {"MatrixId":f"{time.time()}","MatrixName":f"{time.time()}","MatrixCount":16,"WorkTablets":
[{"Number": 1, "Code": "T1", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 2, "Code": "T2", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 3, "Code": "T3", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 4, "Code": "T4", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 5, "Code": "T5", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 6, "Code": "T6", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 7, "Code": "T7", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 8, "Code": "T8", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 9, "Code": "T9", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 10, "Code": "T10", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 11, "Code": "T11", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 12, "Code": "T12", "Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0}}, # 这个设置成废液槽,用储液槽表示
{"Number": 13, "Code": "T13", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 14, "Code": "T14", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 15, "Code": "T15", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 16, "Code": "T16", "Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0}} # 这个设置成垃圾桶,用储液槽表示
]
}
self.default_layout = {
"MatrixId": f"{time.time()}",
"MatrixName": f"{time.time()}",
"MatrixCount": 16,
"WorkTablets": [
{
"Number": 1,
"Code": "T1",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 2,
"Code": "T2",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 3,
"Code": "T3",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 4,
"Code": "T4",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 5,
"Code": "T5",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 6,
"Code": "T6",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 7,
"Code": "T7",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 8,
"Code": "T8",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 9,
"Code": "T9",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 10,
"Code": "T10",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 11,
"Code": "T11",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 12,
"Code": "T12",
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0},
}, # 这个设置成废液槽,用储液槽表示
{
"Number": 13,
"Code": "T13",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 14,
"Code": "T14",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 15,
"Code": "T15",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 16,
"Code": "T16",
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0},
}, # 这个设置成垃圾桶,用储液槽表示
],
}
def get_layout(self) -> Dict[str, Any]:
return {
@@ -1155,7 +1237,7 @@ class DefaultLayout:
"columns": self.columns,
"layout": self.layout,
"trash_slot": self.trash_slot,
"waste_liquid_slot": self.waste_liquid_slot
"waste_liquid_slot": self.waste_liquid_slot,
}
def get_trash_slot(self) -> int:
@@ -1178,17 +1260,19 @@ class DefaultLayout:
reserved_positions = {12, 16}
available_positions = [i for i in range(1, 17) if i not in reserved_positions]
# 计算总需求
# 计算总需求
total_needed = sum(count for _, _, count in needs)
if total_needed > len(available_positions):
raise ValueError(f"需要 {total_needed} 个位置,但只有 {len(available_positions)} 个可用位置排除位置12和16")
raise ValueError(
f"需要 {total_needed} 个位置,但只有 {len(available_positions)} 个可用位置排除位置12和16"
)
# 依次分配位置
current_pos = 0
for reagent_name, material_name, count in needs:
material_uuid = self.labresource[material_name]['uuid']
material_enum = self.labresource[material_name]['materialEnum']
material_uuid = self.labresource[material_name]["uuid"]
material_enum = self.labresource[material_name]["materialEnum"]
for _ in range(count):
if current_pos >= len(available_positions):
@@ -1196,17 +1280,18 @@ class DefaultLayout:
position = available_positions[current_pos]
# 找到对应的tablet并更新
for tablet in self.default_layout['WorkTablets']:
if tablet['Number'] == position:
tablet['Material']['uuid'] = material_uuid
tablet['Material']['materialEnum'] = material_enum
layout_list.append(dict(reagent_name=reagent_name, material_name=material_name, positions=position))
for tablet in self.default_layout["WorkTablets"]:
if tablet["Number"] == position:
tablet["Material"]["uuid"] = material_uuid
tablet["Material"]["materialEnum"] = material_enum
layout_list.append(
dict(reagent_name=reagent_name, material_name=material_name, positions=position)
)
break
current_pos += 1
return self.default_layout, layout_list
if __name__ == "__main__":
# Example usage
# 1. 用导出的json给每个T1 T2板子设定相应的物料如果是孔板和枪头盒要对应区分
@@ -1302,10 +1387,7 @@ if __name__ == "__main__":
# # # plate2.set_well_liquids(plate_2_liquids)
# handler = PRCXI9300Handler(deck=deck, host="10.181.214.132", port=9999,
# handler = PRCXI9300Handler(deck=deck, host="10.181.214.132", port=9999,
# timeout=10.0, setup=False, debug=False,
# simulator=True,
# matrix_id="71593",
@@ -1391,10 +1473,7 @@ if __name__ == "__main__":
# # input("Press Enter to continue...") # Wait for user input before proceeding
# # print("PRCXI9300Handler initialized with deck and host settings.")
### 9320 ###
### 9320 ###
deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=100, size_y=100, size_z=100)
@@ -1412,12 +1491,15 @@ if __name__ == "__main__":
new_plate: PRCXI9300Container = PRCXI9300Container.deserialize(well_containers)
return new_plate
def get_tip_rack(name: str, child_prefix: str="tip") -> PRCXI9300Container:
def get_tip_rack(name: str, child_prefix: str = "tip") -> PRCXI9300Container:
tip_racks = opentrons_96_tiprack_10ul(name).serialize()
tip_rack = PRCXI9300Container(
name=name, size_x=50, size_y=50, size_z=10, category="tip_rack", ordering=collections.OrderedDict({
k: f"{child_prefix}_{k}" for k, v in tip_racks["ordering"].items()
})
name=name,
size_x=50,
size_y=50,
size_z=10,
category="tip_rack",
ordering=collections.OrderedDict({k: f"{child_prefix}_{k}" for k, v in tip_racks["ordering"].items()}),
)
tip_rack_serialized = tip_rack.serialize()
tip_rack_serialized["parent_name"] = deck.name
@@ -1629,6 +1711,7 @@ if __name__ == "__main__":
)
backend: PRCXI9300Backend = handler.backend
from pylabrobot.resources import set_volume_tracking
set_volume_tracking(enabled=True)
# res = backend.api_client.get_all_materials()
asyncio.run(handler.setup()) # Initialize the handler and setup the connection
@@ -1640,10 +1723,10 @@ if __name__ == "__main__":
for well in plate13.get_all_items():
# well_pos = well.name.split("_")[1] # 走一行
# if well_pos.startswith("A"):
if well.name.startswith("PlateT13"): # 走整个Plate
# if well_pos.startswith("A"):
if well.name.startswith("PlateT13"): # 走整个Plate
asyncio.run(handler.dispense([well], [0.01], [0]))
# asyncio.run(handler.dispense([plate10.get_item("H12")], [1], [0]))
# asyncio.run(handler.dispense([plate13.get_item("A1")], [1], [0]))
# asyncio.run(handler.dispense([plate14.get_item("C5")], [1], [0]))
@@ -1652,26 +1735,25 @@ if __name__ == "__main__":
asyncio.run(handler.run_protocol())
time.sleep(5)
os._exit(0)
# 第一种情景:一个孔往多个孔加液
# 第一种情景:一个孔往多个孔加液
# plate_2_liquids = handler.set_group("water", [plate2.children[0]], [300])
# plate5_liquids = handler.set_group("master_mix", plate5.children[:23], [100]*23)
# 第二个情景:多个孔往多个孔加液(但是个数得对应)
plate_2_liquids = handler.set_group("water", plate2.children[:23], [300]*23)
plate5_liquids = handler.set_group("master_mix", plate5.children[:23], [100]*23)
# 第二个情景:多个孔往多个孔加液(但是个数得对应)
plate_2_liquids = handler.set_group("water", plate2.children[:23], [300] * 23)
plate5_liquids = handler.set_group("master_mix", plate5.children[:23], [100] * 23)
# plate11.set_well_liquids([("Water", 100) if (i % 8 == 0 and i // 8 < 6) else (None, 100) for i in range(96)]) # Set liquids for every 8 wells in plate8
# plate11.set_well_liquids([("Water", 100) if (i % 8 == 0 and i // 8 < 6) else (None, 100) for i in range(96)]) # Set liquids for every 8 wells in plate8
# A = tree_to_list([resource_plr_to_ulab(deck)])
# # with open("deck.json", "w", encoding="utf-8") as f:
# # json.dump(A, f, indent=4, ensure_ascii=False)
# A = tree_to_list([resource_plr_to_ulab(deck)])
# # with open("deck.json", "w", encoding="utf-8") as f:
# # json.dump(A, f, indent=4, ensure_ascii=False)
# print(plate11.get_well(0).tracker.get_used_volume())
# Initialize the backend and setup the connection
# print(plate11.get_well(0).tracker.get_used_volume())
# Initialize the backend and setup the connection
asyncio.run(handler.transfer_group("water", "master_mix", 10)) # Reset tip tracking
# asyncio.run(handler.pick_up_tips([plate8.children[8]],[0]))
# print(plate8.children[8])
# asyncio.run(handler.run_protocol())
@@ -1685,121 +1767,118 @@ if __name__ == "__main__":
# print(plate1.children[0])
# asyncio.run(handler.discard_tips([0]))
# asyncio.run(handler.add_liquid(
# asp_vols=[10]*7,
# dis_vols=[10]*7,
# reagent_sources=plate11.children[:7],
# targets=plate1.children[2:9],
# use_channels=[0],
# flow_rates=[None] * 7,
# offsets=[Coordinate(0, 0, 0)] * 7,
# liquid_height=[None] * 7,
# blow_out_air_volume=[None] * 2,
# delays=None,
# mix_time=3,
# mix_vol=5,
# spread="custom",
# ))
# asyncio.run(handler.add_liquid(
# asp_vols=[10]*7,
# dis_vols=[10]*7,
# reagent_sources=plate11.children[:7],
# targets=plate1.children[2:9],
# use_channels=[0],
# flow_rates=[None] * 7,
# offsets=[Coordinate(0, 0, 0)] * 7,
# liquid_height=[None] * 7,
# blow_out_air_volume=[None] * 2,
# delays=None,
# mix_time=3,
# mix_vol=5,
# spread="custom",
# ))
# asyncio.run(handler.run_protocol()) # Run the protocol
# # # asyncio.run(handler.transfer_liquid(
# # # asp_vols=[10]*2,
# # # dis_vols=[10]*2,
# # # sources=plate11.children[:2],
# # # targets=plate11.children[-2:],
# # # use_channels=[0],
# # # offsets=[Coordinate(0, 0, 0)] * 4,
# # # liquid_height=[None] * 2,
# # # blow_out_air_volume=[None] * 2,
# # # delays=None,
# # # mix_times=3,
# # # mix_vol=5,
# # # spread="wide",
# # # tip_racks=[plate8]
# # # ))
# # # asyncio.run(handler.remove_liquid(
# # # vols=[10]*2,
# # # sources=plate11.children[:2],
# # # waste_liquid=plate11.children[43],
# # # use_channels=[0],
# # # offsets=[Coordinate(0, 0, 0)] * 4,
# # # liquid_height=[None] * 2,
# # # blow_out_air_volume=[None] * 2,
# # # delays=None,
# # # spread="wide"
# # # ))
# # asyncio.run(handler.run_protocol())
# # # asyncio.run(handler.discard_tips())
# # # asyncio.run(handler.mix(well_containers.children[:8
# # # ], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
# # #print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # # asyncio.run(handler.transfer_liquid(
# # # asp_vols=[10]*2,
# # # dis_vols=[10]*2,
# # # sources=plate11.children[:2],
# # # targets=plate11.children[-2:],
# # # use_channels=[0],
# # # offsets=[Coordinate(0, 0, 0)] * 4,
# # # liquid_height=[None] * 2,
# # # blow_out_air_volume=[None] * 2,
# # # delays=None,
# # # mix_times=3,
# # # mix_vol=5,
# # # spread="wide",
# # # tip_racks=[plate8]
# # # ))
# # # asyncio.run(handler.remove_liquid(
# # # vols=[10]*2,
# # # sources=plate11.children[:2],
# # # waste_liquid=plate11.children[43],
# # # use_channels=[0],
# # # offsets=[Coordinate(0, 0, 0)] * 4,
# # # liquid_height=[None] * 2,
# # # blow_out_air_volume=[None] * 2,
# # # delays=None,
# # # spread="wide"
# # # ))
# # asyncio.run(handler.run_protocol())
# # # asyncio.run(handler.discard_tips())
# # # asyncio.run(handler.mix(well_containers.children[:8
# # # ], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
# # #print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # # asyncio.run(handler.remove_liquid(
# # # vols=[100]*16,
# # # sources=well_containers.children[-16:],
# # # waste_liquid=well_containers.children[:16], # 这个有些奇怪,但是好像也只能这么写
# # # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # # flow_rates=[None] * 32,
# # # offsets=[Coordinate(0, 0, 0)] * 32,
# # # liquid_height=[None] * 32,
# # # blow_out_air_volume=[None] * 32,
# # # spread="wide",
# # # ))
# # # asyncio.run(handler.transfer_liquid(
# # # asp_vols=[100]*16,
# # # dis_vols=[100]*16,
# # # tip_racks=[tip_rack],
# # # sources=well_containers.children[-16:],
# # # targets=well_containers.children[:16],
# # # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # # offsets=[Coordinate(0, 0, 0)] * 32,
# # # asp_flow_rates=[None] * 16,
# # # dis_flow_rates=[None] * 16,
# # # liquid_height=[None] * 32,
# # # blow_out_air_volume=[None] * 32,
# # # mix_times=3,
# # # mix_vol=50,
# # # spread="wide",
# # # ))
# # print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # # input("pick_up_tips add step")
#asyncio.run(handler.run_protocol()) # Run the protocol
# # # input("Running protocol...")
# # # input("Press Enter to continue...") # Wait for user input before proceeding
# # # print("PRCXI9300Handler initialized with deck and host settings.")
# 一些推荐版位组合的测试样例:
# 一些推荐版位组合的测试样例:
# # # asyncio.run(handler.remove_liquid(
# # # vols=[100]*16,
# # # sources=well_containers.children[-16:],
# # # waste_liquid=well_containers.children[:16], # 这个有些奇怪,但是好像也只能这么写
# # # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # # flow_rates=[None] * 32,
# # # offsets=[Coordinate(0, 0, 0)] * 32,
# # # liquid_height=[None] * 32,
# # # blow_out_air_volume=[None] * 32,
# # # spread="wide",
# # # ))
# # # asyncio.run(handler.transfer_liquid(
# # # asp_vols=[100]*16,
# # # dis_vols=[100]*16,
# # # tip_racks=[tip_rack],
# # # sources=well_containers.children[-16:],
# # # targets=well_containers.children[:16],
# # # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # # offsets=[Coordinate(0, 0, 0)] * 32,
# # # asp_flow_rates=[None] * 16,
# # # dis_flow_rates=[None] * 16,
# # # liquid_height=[None] * 32,
# # # blow_out_air_volume=[None] * 32,
# # # mix_times=3,
# # # mix_vol=50,
# # # spread="wide",
# # # ))
# # print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # # input("pick_up_tips add step")
# asyncio.run(handler.run_protocol()) # Run the protocol
# # # input("Running protocol...")
# # # input("Press Enter to continue...") # Wait for user input before proceeding
# # # print("PRCXI9300Handler initialized with deck and host settings.")
# 一些推荐版位组合的测试样例:
# 一些推荐版位组合的测试样例:
with open("prcxi_material.json", "r") as f:
material_info = json.load(f)
layout = DefaultLayout("PRCXI9320")
layout.add_lab_resource(material_info)
MatrixLayout_1, dict_1 = layout.recommend_layout([
("reagent_1", "96 细胞培养皿", 3),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 7),
("reagent_4", "10μL加长 Tip头", 1),
])
MatrixLayout_1, dict_1 = layout.recommend_layout(
[
("reagent_1", "96 细胞培养皿", 3),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 7),
("reagent_4", "10μL加长 Tip头", 1),
]
)
print(dict_1)
MatrixLayout_2, dict_2 = layout.recommend_layout([
("reagent_1", "96深孔板", 4),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 1),
("reagent_4", "10μL加长 Tip头", 1),
])
MatrixLayout_2, dict_2 = layout.recommend_layout(
[
("reagent_1", "96深孔板", 4),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 1),
("reagent_4", "10μL加长 Tip头", 1),
]
)
# with open("prcxi_material.json", "r") as f:
# material_info = json.load(f)

View File

@@ -8,6 +8,8 @@ import serial.tools.list_ports
from serial import Serial
from serial.serialutil import SerialException
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class RunzeSyringePumpMode(Enum):
Normal = 0
@@ -77,6 +79,8 @@ class RunzeSyringePumpInfo:
class RunzeSyringePumpAsync:
_ros_node: BaseROS2DeviceNode
def __init__(self, port: str, address: str = "1", volume: float = 25000, mode: RunzeSyringePumpMode = None):
self.port = port
self.address = address
@@ -102,6 +106,9 @@ class RunzeSyringePumpAsync:
self._run_future: Optional[Future[Any]] = None
self._run_lock = Lock()
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def _adjust_total_steps(self):
self.total_steps = 6000 if self.mode == RunzeSyringePumpMode.Normal else 48000
self.total_steps_vel = 48000 if self.mode == RunzeSyringePumpMode.AccuratePosVel else 6000
@@ -182,7 +189,7 @@ class RunzeSyringePumpAsync:
try:
await self._query(command)
while True:
await asyncio.sleep(0.5) # Wait for 0.5 seconds before polling again
await self._ros_node.sleep(0.5) # Wait for 0.5 seconds before polling again
status = await self.query_device_status()
if status == '`':
@@ -364,7 +371,7 @@ class RunzeSyringePumpAsync:
if self._read_task:
raise RunzeSyringePumpConnectionError
self._read_task = asyncio.create_task(self._read_loop())
self._read_task = self._ros_node.create_task(self._read_loop())
try:
await self.query_device_status()

View File

@@ -3,9 +3,13 @@ import logging
import time as time_module
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualCentrifuge:
"""Virtual centrifuge device - 简化版,只保留核心功能"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
# 处理可能的不同调用方式
@@ -32,6 +36,9 @@ class VirtualCentrifuge:
for key, value in kwargs.items():
if key not in skip_keys and not hasattr(self, key):
setattr(self, key, value)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual centrifuge"""
@@ -132,7 +139,7 @@ class VirtualCentrifuge:
break
# 每秒更新一次
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
# 离心完成
self.data.update({

View File

@@ -2,9 +2,13 @@ import asyncio
import logging
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualColumn:
"""Virtual column device for RunColumn protocol 🏛️"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and 'id' in kwargs:
@@ -28,6 +32,9 @@ class VirtualColumn:
print(f"🏛️ === 虚拟色谱柱 {self.device_id} 已创建 === ✨")
print(f"📏 柱参数: 流速={self._max_flow_rate}mL/min | 长度={self._column_length}cm | 直径={self._column_diameter}cm 🔬")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual column 🚀"""
self.logger.info(f"🔧 初始化虚拟色谱柱 {self.device_id}")
@@ -101,7 +108,7 @@ class VirtualColumn:
step_time = separation_time / steps
for i in range(steps):
await asyncio.sleep(step_time)
await self._ros_node.sleep(step_time)
progress = (i + 1) / steps * 100
volume_processed = (i + 1) * 5.0 # 假设每步处理5mL

View File

@@ -4,70 +4,76 @@ import time as time_module
from typing import Dict, Any, Optional
from unilabos.compile.utils.vessel_parser import get_vessel
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualFilter:
"""Virtual filter device - 完全按照 Filter.action 规范 🌊"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
if device_id is None and 'id' in kwargs:
device_id = kwargs.pop('id')
if config is None and 'config' in kwargs:
config = kwargs.pop('config')
if device_id is None and "id" in kwargs:
device_id = kwargs.pop("id")
if config is None and "config" in kwargs:
config = kwargs.pop("config")
self.device_id = device_id or "unknown_filter"
self.config = config or {}
self.logger = logging.getLogger(f"VirtualFilter.{self.device_id}")
self.data = {}
# 从config或kwargs中获取配置参数
self.port = self.config.get('port') or kwargs.get('port', 'VIRTUAL')
self._max_temp = self.config.get('max_temp') or kwargs.get('max_temp', 100.0)
self._max_stir_speed = self.config.get('max_stir_speed') or kwargs.get('max_stir_speed', 1000.0)
self._max_volume = self.config.get('max_volume') or kwargs.get('max_volume', 500.0)
self.port = self.config.get("port") or kwargs.get("port", "VIRTUAL")
self._max_temp = self.config.get("max_temp") or kwargs.get("max_temp", 100.0)
self._max_stir_speed = self.config.get("max_stir_speed") or kwargs.get("max_stir_speed", 1000.0)
self._max_volume = self.config.get("max_volume") or kwargs.get("max_volume", 500.0)
# 处理其他kwargs参数
skip_keys = {'port', 'max_temp', 'max_stir_speed', 'max_volume'}
skip_keys = {"port", "max_temp", "max_stir_speed", "max_volume"}
for key, value in kwargs.items():
if key not in skip_keys and not hasattr(self, key):
setattr(self, key, value)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual filter 🚀"""
self.logger.info(f"🔧 初始化虚拟过滤器 {self.device_id}")
# 按照 Filter.action 的 feedback 字段初始化
self.data.update({
"status": "Idle",
"progress": 0.0, # Filter.action feedback
"current_temp": 25.0, # Filter.action feedback
"filtered_volume": 0.0, # Filter.action feedback
"message": "Ready for filtration"
})
self.data.update(
{
"status": "Idle",
"progress": 0.0, # Filter.action feedback
"current_temp": 25.0, # Filter.action feedback
"filtered_volume": 0.0, # Filter.action feedback
"message": "Ready for filtration",
}
)
self.logger.info(f"✅ 过滤器 {self.device_id} 初始化完成 🌊")
return True
async def cleanup(self) -> bool:
"""Cleanup virtual filter 🧹"""
self.logger.info(f"🧹 清理虚拟过滤器 {self.device_id} 🔚")
self.data.update({
"status": "Offline"
})
self.data.update({"status": "Offline"})
self.logger.info(f"✅ 过滤器 {self.device_id} 清理完成 💤")
return True
async def filter(
self,
self,
vessel: dict,
filtrate_vessel: dict = {},
stir: bool = False,
stir_speed: float = 300.0,
temp: float = 25.0,
continue_heatchill: bool = False,
volume: float = 0.0
stir: bool = False,
stir_speed: float = 300.0,
temp: float = 25.0,
continue_heatchill: bool = False,
volume: float = 0.0,
) -> bool:
"""Execute filter action - 完全按照 Filter.action 参数 🌊"""
vessel_id, _ = get_vessel(vessel)
@@ -79,59 +85,52 @@ class VirtualFilter:
temp = 25.0 # 0度自动设置为室温
self.logger.info(f"🌡️ 温度自动调整: {original_temp}°C → {temp}°C (室温) 🏠")
elif temp < 4.0:
temp = 4.0 # 小于4度自动设置为4度
temp = 4.0 # 小于4度自动设置为4度
self.logger.info(f"🌡️ 温度自动调整: {original_temp}°C → {temp}°C (最低温度) ❄️")
self.logger.info(f"🌊 开始过滤操作: {vessel_id}{filtrate_vessel_id} 🚰")
self.logger.info(f" 🌪️ 搅拌: {stir} ({stir_speed} RPM)")
self.logger.info(f" 🌡️ 温度: {temp}°C")
self.logger.info(f" 💧 体积: {volume}mL")
self.logger.info(f" 🔥 保持加热: {continue_heatchill}")
# 验证参数
if temp > self._max_temp or temp < 4.0:
error_msg = f"🌡️ 温度 {temp}°C 超出范围 (4-{self._max_temp}°C) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"Error: 温度超出范围 ⚠️",
"message": error_msg
})
self.data.update({"status": f"Error: 温度超出范围 ⚠️", "message": error_msg})
return False
if stir and stir_speed > self._max_stir_speed:
error_msg = f"🌪️ 搅拌速度 {stir_speed} RPM 超出范围 (0-{self._max_stir_speed} RPM) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"Error: 搅拌速度超出范围 ⚠️",
"message": error_msg
})
self.data.update({"status": f"Error: 搅拌速度超出范围 ⚠️", "message": error_msg})
return False
if volume > self._max_volume:
error_msg = f"💧 过滤体积 {volume} mL 超出范围 (0-{self._max_volume} mL) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"Error",
"message": error_msg
})
self.data.update({"status": f"Error", "message": error_msg})
return False
# 开始过滤
filter_volume = volume if volume > 0 else 50.0
self.logger.info(f"🚀 开始过滤 {filter_volume}mL 液体 💧")
self.data.update({
"status": f"Running",
"current_temp": temp,
"filtered_volume": 0.0,
"progress": 0.0,
"message": f"🚀 Starting filtration: {vessel_id}{filtrate_vessel_id}"
})
self.data.update(
{
"status": f"Running",
"current_temp": temp,
"filtered_volume": 0.0,
"progress": 0.0,
"message": f"🚀 Starting filtration: {vessel_id}{filtrate_vessel_id}",
}
)
try:
# 过滤过程 - 实时更新进度
start_time = time_module.time()
# 根据体积和搅拌估算过滤时间
base_time = filter_volume / 5.0 # 5mL/s 基础速度
if stir:
@@ -140,78 +139,79 @@ class VirtualFilter:
if temp > 50.0:
base_time *= 0.7 # 高温加速过滤
self.logger.info(f"🔥 高温加速过滤预计时间减少30% ⚡")
filter_time = max(base_time, 10.0) # 最少10秒
self.logger.info(f"⏱️ 预计过滤时间: {filter_time:.1f}秒 ⌛")
while True:
current_time = time_module.time()
elapsed = current_time - start_time
remaining = max(0, filter_time - elapsed)
progress = min(100.0, (elapsed / filter_time) * 100)
current_filtered = (progress / 100.0) * filter_volume
# 更新状态 - 按照 Filter.action feedback 字段
status_msg = f"🌊 过滤中: {vessel}"
if stir:
status_msg += f" | 🌪️ 搅拌: {stir_speed} RPM"
status_msg += f" | 🌡️ {temp}°C | 📊 {progress:.1f}% | 💧 已过滤: {current_filtered:.1f}mL"
self.data.update({
"progress": progress, # Filter.action feedback
"current_temp": temp, # Filter.action feedback
"filtered_volume": current_filtered, # Filter.action feedback
"status": "Running",
"message": f"🌊 Filtering: {progress:.1f}% complete, {current_filtered:.1f}mL filtered"
})
self.data.update(
{
"progress": progress, # Filter.action feedback
"current_temp": temp, # Filter.action feedback
"filtered_volume": current_filtered, # Filter.action feedback
"status": "Running",
"message": f"🌊 Filtering: {progress:.1f}% complete, {current_filtered:.1f}mL filtered",
}
)
# 进度日志每25%打印一次)
if progress >= 25 and progress % 25 < 1:
self.logger.info(f"📊 过滤进度: {progress:.0f}% | 💧 {current_filtered:.1f}mL 完成 ✨")
if remaining <= 0:
break
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
# 过滤完成
final_temp = temp if continue_heatchill else 25.0
final_status = f"✅ 过滤完成: {vessel} | 💧 {filter_volume}mL → {filtrate_vessel}"
if continue_heatchill:
final_status += " | 🔥 继续加热搅拌"
self.logger.info(f"🔥 继续保持加热搅拌状态 🌪️")
self.data.update({
"status": final_status,
"progress": 100.0, # Filter.action feedback
"current_temp": final_temp, # Filter.action feedback
"filtered_volume": filter_volume, # Filter.action feedback
"message": f"✅ Filtration completed: {filter_volume}mL filtered from {vessel_id}"
})
self.data.update(
{
"status": final_status,
"progress": 100.0, # Filter.action feedback
"current_temp": final_temp, # Filter.action feedback
"filtered_volume": filter_volume, # Filter.action feedback
"message": f"✅ Filtration completed: {filter_volume}mL filtered from {vessel_id}",
}
)
self.logger.info(f"🎉 过滤完成! 💧 {filter_volume}mL 从 {vessel_id} 过滤到 {filtrate_vessel_id}")
self.logger.info(f"📊 最终状态: 温度 {final_temp}°C | 进度 100% | 体积 {filter_volume}mL 🏁")
return True
except Exception as e:
error_msg = f"过滤过程中发生错误: {str(e)} 💥"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"Error",
"message": f"❌ Filtration failed: {str(e)}"
})
self.data.update({"status": f"Error", "message": f"❌ Filtration failed: {str(e)}"})
return False
# === 核心状态属性 - 按照 Filter.action feedback 字段 ===
@property
def status(self) -> str:
return self.data.get("status", "❓ Unknown")
@property
def progress(self) -> float:
"""Filter.action feedback 字段 📊"""
return self.data.get("progress", 0.0)
@property
def current_temp(self) -> float:
"""Filter.action feedback 字段 🌡️"""
@@ -230,15 +230,15 @@ class VirtualFilter:
@property
def message(self) -> str:
return self.data.get("message", "")
@property
def max_temp(self) -> float:
return self._max_temp
@property
def max_stir_speed(self) -> float:
return self._max_stir_speed
@property
def max_volume(self) -> float:
return self._max_volume
return self._max_volume

View File

@@ -3,9 +3,13 @@ import logging
import time as time_module # 重命名time模块避免与参数冲突
from typing import Dict, Any
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualHeatChill:
"""Virtual heat chill device for HeatChillProtocol testing 🌡️"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and 'id' in kwargs:
@@ -35,6 +39,9 @@ class VirtualHeatChill:
print(f"🌡️ === 虚拟温控设备 {self.device_id} 已创建 === ✨")
print(f"🔥 温度范围: {self._min_temp}°C ~ {self._max_temp}°C | 🌪️ 最大搅拌: {self._max_stir_speed} RPM")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual heat chill 🚀"""
self.logger.info(f"🔧 初始化虚拟温控设备 {self.device_id}")
@@ -177,7 +184,7 @@ class VirtualHeatChill:
break
# 等待1秒后再次检查
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
# 操作完成
final_stir_info = f" | 🌪️ 搅拌: {stir_speed} RPM" if stir else ""

View File

@@ -3,13 +3,19 @@ import logging
import time as time_module
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
def debug_print(message):
"""调试输出 🔍"""
print(f"🌪️ [ROTAVAP] {message}", flush=True)
class VirtualRotavap:
"""Virtual rotary evaporator device - 简化版,只保留核心功能 🌪️"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and "id" in kwargs:
@@ -38,56 +44,65 @@ class VirtualRotavap:
print(f"🌪️ === 虚拟旋转蒸发仪 {self.device_id} 已创建 === ✨")
print(f"🔥 温度范围: 10°C ~ {self._max_temp}°C | 🌀 转速范围: 10 ~ {self._max_rotation_speed} RPM")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual rotary evaporator 🚀"""
self.logger.info(f"🔧 初始化虚拟旋转蒸发仪 {self.device_id}")
# 只保留核心状态
self.data.update({
"status": "🏠 待机中",
"rotavap_state": "Ready", # Ready, Evaporating, Completed, Error
"current_temp": 25.0,
"target_temp": 25.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0, # 大气压
"evaporated_volume": 0.0,
"progress": 0.0,
"remaining_time": 0.0,
"message": "🌪️ Ready for evaporation"
})
self.data.update(
{
"status": "🏠 待机中",
"rotavap_state": "Ready", # Ready, Evaporating, Completed, Error
"current_temp": 25.0,
"target_temp": 25.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0, # 大气压
"evaporated_volume": 0.0,
"progress": 0.0,
"remaining_time": 0.0,
"message": "🌪️ Ready for evaporation",
}
)
self.logger.info(f"✅ 旋转蒸发仪 {self.device_id} 初始化完成 🌪️")
self.logger.info(f"📊 设备规格: 温度范围 10°C ~ {self._max_temp}°C | 转速范围 10 ~ {self._max_rotation_speed} RPM")
self.logger.info(
f"📊 设备规格: 温度范围 10°C ~ {self._max_temp}°C | 转速范围 10 ~ {self._max_rotation_speed} RPM"
)
return True
async def cleanup(self) -> bool:
"""Cleanup virtual rotary evaporator 🧹"""
self.logger.info(f"🧹 清理虚拟旋转蒸发仪 {self.device_id} 🔚")
self.data.update({
"status": "💤 离线",
"rotavap_state": "Offline",
"current_temp": 25.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": "💤 System offline"
})
self.data.update(
{
"status": "💤 离线",
"rotavap_state": "Offline",
"current_temp": 25.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": "💤 System offline",
}
)
self.logger.info(f"✅ 旋转蒸发仪 {self.device_id} 清理完成 💤")
return True
async def evaporate(
self,
vessel: str,
pressure: float = 0.1,
temp: float = 60.0,
self,
vessel: str,
pressure: float = 0.1,
temp: float = 60.0,
time: float = 180.0,
stir_speed: float = 100.0,
solvent: str = "",
**kwargs
**kwargs,
) -> bool:
"""Execute evaporate action - 简化版 🌪️"""
# 🔧 新增确保time参数是数值类型
if isinstance(time, str):
try:
@@ -98,31 +113,31 @@ class VirtualRotavap:
elif not isinstance(time, (int, float)):
self.logger.error(f"❌ 时间参数类型无效: {type(time)}使用默认值180.0秒")
time = 180.0
# 确保time是float类型; 并加速
time = float(time) / 10.0
# 🔧 简化处理如果vessel就是设备自己直接操作
if vessel == self.device_id:
debug_print(f"🎯 在设备 {self.device_id} 上直接执行蒸发操作")
actual_vessel = self.device_id
else:
actual_vessel = vessel
# 参数预处理
if solvent:
self.logger.info(f"🧪 识别到溶剂: {solvent}")
# 根据溶剂调整参数
solvent_lower = solvent.lower()
if any(s in solvent_lower for s in ['water', 'aqueous']):
if any(s in solvent_lower for s in ["water", "aqueous"]):
temp = max(temp, 80.0)
pressure = max(pressure, 0.2)
self.logger.info(f"💧 水系溶剂:调整参数 → 温度 {temp}°C, 压力 {pressure} bar")
elif any(s in solvent_lower for s in ['ethanol', 'methanol', 'acetone']):
elif any(s in solvent_lower for s in ["ethanol", "methanol", "acetone"]):
temp = min(temp, 50.0)
pressure = min(pressure, 0.05)
self.logger.info(f"⚡ 易挥发溶剂:调整参数 → 温度 {temp}°C, 压力 {pressure} bar")
self.logger.info(f"🌪️ 开始蒸发操作: {actual_vessel}")
self.logger.info(f" 🥽 容器: {actual_vessel}")
self.logger.info(f" 🌡️ 温度: {temp}°C")
@@ -131,126 +146,140 @@ class VirtualRotavap:
self.logger.info(f" 🌀 转速: {stir_speed} RPM")
if solvent:
self.logger.info(f" 🧪 溶剂: {solvent}")
# 验证参数
if temp > self._max_temp or temp < 10.0:
error_msg = f"🌡️ 温度 {temp}°C 超出范围 (10-{self._max_temp}°C) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"❌ 错误: 温度超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg
})
self.data.update(
{
"status": f"❌ 错误: 温度超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg,
}
)
return False
if stir_speed > self._max_rotation_speed or stir_speed < 10.0:
error_msg = f"🌀 旋转速度 {stir_speed} RPM 超出范围 (10-{self._max_rotation_speed} RPM) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"❌ 错误: 转速超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg
})
self.data.update(
{
"status": f"❌ 错误: 转速超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg,
}
)
return False
if pressure < 0.01 or pressure > 1.0:
error_msg = f"💨 真空度 {pressure} bar 超出范围 (0.01-1.0 bar) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"❌ 错误: 压力超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg
})
self.data.update(
{
"status": f"❌ 错误: 压力超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg,
}
)
return False
# 开始蒸发 - 🔧 现在time已经确保是float类型
self.logger.info(f"🚀 启动蒸发程序! 预计用时 {time/60:.1f}分钟 ⏱️")
self.data.update({
"status": f"🌪️ 蒸发中: {actual_vessel}",
"rotavap_state": "Evaporating",
"current_temp": temp,
"target_temp": temp,
"rotation_speed": stir_speed,
"vacuum_pressure": pressure,
"remaining_time": time,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": f"🌪️ Evaporating {actual_vessel} at {temp}°C, {pressure} bar, {stir_speed} RPM"
})
self.data.update(
{
"status": f"🌪️ 蒸发中: {actual_vessel}",
"rotavap_state": "Evaporating",
"current_temp": temp,
"target_temp": temp,
"rotation_speed": stir_speed,
"vacuum_pressure": pressure,
"remaining_time": time,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": f"🌪️ Evaporating {actual_vessel} at {temp}°C, {pressure} bar, {stir_speed} RPM",
}
)
try:
# 蒸发过程 - 实时更新进度
start_time = time_module.time()
total_time = time
last_logged_progress = 0
while True:
current_time = time_module.time()
elapsed = current_time - start_time
remaining = max(0, total_time - elapsed)
progress = min(100.0, (elapsed / total_time) * 100)
# 模拟蒸发体积 - 根据溶剂类型调整
if solvent and any(s in solvent.lower() for s in ['water', 'aqueous']):
if solvent and any(s in solvent.lower() for s in ["water", "aqueous"]):
evaporated_vol = progress * 0.6 # 水系溶剂蒸发慢
elif solvent and any(s in solvent.lower() for s in ['ethanol', 'methanol', 'acetone']):
elif solvent and any(s in solvent.lower() for s in ["ethanol", "methanol", "acetone"]):
evaporated_vol = progress * 1.0 # 易挥发溶剂蒸发快
else:
evaporated_vol = progress * 0.8 # 默认蒸发量
# 🔧 更新状态 - 确保包含所有必需字段
status_msg = f"🌪️ 蒸发中: {actual_vessel} | 🌡️ {temp}°C | 💨 {pressure} bar | 🌀 {stir_speed} RPM | 📊 {progress:.1f}% | ⏰ 剩余: {remaining:.0f}s"
self.data.update({
"remaining_time": remaining,
"progress": progress,
"evaporated_volume": evaporated_vol,
"current_temp": temp,
"status": status_msg,
"message": f"🌪️ Evaporating: {progress:.1f}% complete, 💧 {evaporated_vol:.1f}mL evaporated, ⏰ {remaining:.0f}s remaining"
})
self.data.update(
{
"remaining_time": remaining,
"progress": progress,
"evaporated_volume": evaporated_vol,
"current_temp": temp,
"status": status_msg,
"message": f"🌪️ Evaporating: {progress:.1f}% complete, 💧 {evaporated_vol:.1f}mL evaporated, ⏰ {remaining:.0f}s remaining",
}
)
# 进度日志每25%打印一次)
if progress >= 25 and int(progress) % 25 == 0 and int(progress) != last_logged_progress:
self.logger.info(f"📊 蒸发进度: {progress:.0f}% | 💧 已蒸发: {evaporated_vol:.1f}mL | ⏰ 剩余: {remaining:.0f}s ✨")
self.logger.info(
f"📊 蒸发进度: {progress:.0f}% | 💧 已蒸发: {evaporated_vol:.1f}mL | ⏰ 剩余: {remaining:.0f}s ✨"
)
last_logged_progress = int(progress)
# 时间到了,退出循环
if remaining <= 0:
break
# 每秒更新一次
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
# 蒸发完成
if solvent and any(s in solvent.lower() for s in ['water', 'aqueous']):
if solvent and any(s in solvent.lower() for s in ["water", "aqueous"]):
final_evaporated = 60.0 # 水系溶剂
elif solvent and any(s in solvent.lower() for s in ['ethanol', 'methanol', 'acetone']):
elif solvent and any(s in solvent.lower() for s in ["ethanol", "methanol", "acetone"]):
final_evaporated = 100.0 # 易挥发溶剂
else:
final_evaporated = 80.0 # 默认
self.data.update({
"status": f"✅ 蒸发完成: {actual_vessel} | 💧 蒸发量: {final_evaporated:.1f}mL",
"rotavap_state": "Completed",
"evaporated_volume": final_evaporated,
"progress": 100.0,
"current_temp": temp,
"remaining_time": 0.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": f"✅ Evaporation completed: {final_evaporated}mL evaporated from {actual_vessel}"
})
self.data.update(
{
"status": f"✅ 蒸发完成: {actual_vessel} | 💧 蒸发量: {final_evaporated:.1f}mL",
"rotavap_state": "Completed",
"evaporated_volume": final_evaporated,
"progress": 100.0,
"current_temp": temp,
"remaining_time": 0.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": f"✅ Evaporation completed: {final_evaporated}mL evaporated from {actual_vessel}",
}
)
self.logger.info(f"🎉 蒸发操作完成! ✨")
self.logger.info(f"📊 蒸发结果:")
@@ -262,24 +291,26 @@ class VirtualRotavap:
self.logger.info(f" ⏱️ 总用时: {total_time:.0f}s")
if solvent:
self.logger.info(f" 🧪 处理溶剂: {solvent} 🏁")
return True
except Exception as e:
# 出错处理
error_msg = f"蒸发过程中发生错误: {str(e)} 💥"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"❌ 蒸发错误: {str(e)}",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": f"❌ Evaporation failed: {str(e)}"
})
self.data.update(
{
"status": f"❌ 蒸发错误: {str(e)}",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": f"❌ Evaporation failed: {str(e)}",
}
)
return False
# === 核心状态属性 ===

View File

@@ -2,9 +2,13 @@ import asyncio
import logging
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualSeparator:
"""Virtual separator device for SeparateProtocol testing"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
# 处理可能的不同调用方式
@@ -35,6 +39,9 @@ class VirtualSeparator:
for key, value in kwargs.items():
if key not in skip_keys and not hasattr(self, key):
setattr(self, key, value)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual separator"""
@@ -119,14 +126,14 @@ class VirtualSeparator:
for repeat in range(repeats):
# 搅拌阶段
for progress in range(0, 51, 10):
await asyncio.sleep(simulation_time / (repeats * 10))
await self._ros_node.sleep(simulation_time / (repeats * 10))
overall_progress = ((repeat * 100) + (progress * 0.5)) / repeats
self.data["progress"] = overall_progress
self.data["message"] = f"{repeat+1}次分离 - 搅拌中 ({progress}%)"
# 静置分相阶段
for progress in range(50, 101, 10):
await asyncio.sleep(simulation_time / (repeats * 10))
await self._ros_node.sleep(simulation_time / (repeats * 10))
overall_progress = ((repeat * 100) + (progress * 0.5)) / repeats
self.data["progress"] = overall_progress
self.data["message"] = f"{repeat+1}次分离 - 静置分相中 ({progress}%)"

View File

@@ -2,11 +2,16 @@ import time
import asyncio
from typing import Union
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualSolenoidValve:
"""
虚拟电磁阀门 - 简单的开关型阀门,只有开启和关闭两个状态
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: dict = None, **kwargs):
# 从配置中获取参数,提供默认值
if config is None:
@@ -21,6 +26,9 @@ class VirtualSolenoidValve:
self._status = "Idle"
self._valve_state = "Closed" # "Open" or "Closed"
self._is_open = False
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""初始化设备"""
@@ -63,7 +71,7 @@ class VirtualSolenoidValve:
self._status = "Busy"
# 模拟阀门响应时间
await asyncio.sleep(self.response_time)
await self._ros_node.sleep(self.response_time)
# 处理不同的命令格式
if isinstance(command, str):

View File

@@ -3,6 +3,8 @@ import logging
import re
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualSolidDispenser:
"""
虚拟固体粉末加样器 - 用于处理 Add Protocol 中的固体试剂添加 ⚗️
@@ -13,6 +15,8 @@ class VirtualSolidDispenser:
- 简单反馈:成功/失败 + 消息 📊
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
self.device_id = device_id or "virtual_solid_dispenser"
self.config = config or {}
@@ -32,6 +36,9 @@ class VirtualSolidDispenser:
print(f"⚗️ === 虚拟固体分配器 {self.device_id} 创建成功! === ✨")
print(f"📊 设备规格: 最大容量 {self.max_capacity}g | 精度 {self.precision}g 🎯")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""初始化固体加样器 🚀"""
self.logger.info(f"🔧 初始化固体分配器 {self.device_id}")
@@ -263,7 +270,7 @@ class VirtualSolidDispenser:
for i in range(steps):
progress = (i + 1) / steps * 100
await asyncio.sleep(step_time)
await self._ros_node.sleep(step_time)
if i % 2 == 0: # 每隔一步显示进度
self.logger.debug(f"📊 加样进度: {progress:.0f}% | {amount_emoji} 正在分配 {reagent}...")

View File

@@ -3,9 +3,13 @@ import logging
import time as time_module
from typing import Dict, Any
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualStirrer:
"""Virtual stirrer device for StirProtocol testing - 功能完整版 🌪️"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and 'id' in kwargs:
@@ -34,6 +38,9 @@ class VirtualStirrer:
print(f"🌪️ === 虚拟搅拌器 {self.device_id} 已创建 === ✨")
print(f"🔧 速度范围: {self._min_speed} ~ {self._max_speed} RPM | 📱 端口: {self.port}")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual stirrer 🚀"""
self.logger.info(f"🔧 初始化虚拟搅拌器 {self.device_id}")
@@ -134,7 +141,7 @@ class VirtualStirrer:
if remaining <= 0:
break
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
self.logger.info(f"✅ 搅拌阶段完成! 🌪️ {stir_speed} RPM × {stir_time}s")
@@ -176,7 +183,7 @@ class VirtualStirrer:
if remaining <= 0:
break
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
self.logger.info(f"✅ 沉降阶段完成! 🛑 静置 {settling_time}s")

View File

@@ -4,6 +4,8 @@ from enum import Enum
from typing import Union, Optional
import logging
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualPumpMode(Enum):
Normal = 0
@@ -14,6 +16,8 @@ class VirtualPumpMode(Enum):
class VirtualTransferPump:
"""虚拟转移泵类 - 模拟泵的基本功能,无需实际硬件 🚰"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: dict = None, **kwargs):
"""
初始化虚拟转移泵
@@ -53,6 +57,9 @@ class VirtualTransferPump:
print(f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s")
print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""初始化虚拟泵 🚀"""
self.logger.info(f"🔧 初始化虚拟转移泵 {self.device_id}")
@@ -104,7 +111,7 @@ class VirtualTransferPump:
async def _simulate_operation(self, duration: float):
"""模拟操作延时 ⏱️"""
self._status = "Busy"
await asyncio.sleep(duration)
await self._ros_node.sleep(duration)
self._status = "Idle"
def _calculate_duration(self, volume: float, velocity: float = None) -> float:
@@ -223,7 +230,7 @@ class VirtualTransferPump:
# 等待一小步时间
if i < steps and step_duration > 0:
await asyncio.sleep(step_duration)
await self._ros_node.sleep(step_duration)
else:
# 移动距离很小,直接完成
self._position = target_position
@@ -341,7 +348,7 @@ class VirtualTransferPump:
# 短暂停顿
self.logger.debug("⏸️ 短暂停顿...")
await asyncio.sleep(0.1)
await self._ros_node.sleep(0.1)
# 排液
await self.dispense(volume, dispense_velocity)

View File

@@ -3,6 +3,7 @@ from cgi import print_arguments
from doctest import debug
from typing import Dict, Any, List, Optional
import requests
from pylabrobot.resources.resource import Resource as ResourcePLR
from pathlib import Path
import pandas as pd
import time
@@ -254,7 +255,7 @@ class BioyondCellWorkstation(BioyondWorkstation):
def auto_feeding4to3(
self,
# ★ 修改点:默认模板路径
xlsx_path: Optional[str] = "C:/ML/GitHub/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/material_template.xlsx",
xlsx_path: Optional[str] = "/Users/sml/work/Unilab/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/material_template.xlsx",
# ---------------- WH4 - 加样头面 (Z=1, 12个点位) ----------------
WH4_x1_y1_z1_1_materialName: str = "", WH4_x1_y1_z1_1_quantity: float = 0.0,
WH4_x2_y1_z1_2_materialName: str = "", WH4_x2_y1_z1_2_quantity: float = 0.0,
@@ -306,7 +307,7 @@ class BioyondCellWorkstation(BioyondWorkstation):
# ---------- 模式 1: Excel 导入 ----------
if xlsx_path:
path = Path(xlsx_path)
path = Path(__file__).parent / Path(xlsx_path)
if path.exists(): # ★ 修改点:路径存在才加载
try:
df = pd.read_excel(path, sheet_name=0, header=None, engine="openpyxl")
@@ -471,14 +472,23 @@ class BioyondCellWorkstation(BioyondWorkstation):
- totalMass 自动计算为所有物料质量之和
- createTime 缺失或为空时自动填充为当前日期YYYY/M/D
"""
path = Path(xlsx_path)
default_path = Path("/Users/sml/work/Unilab/Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_cell/2025092701.xlsx")
path = Path(xlsx_path) if xlsx_path else default_path
print(f"[create_orders] 使用 Excel 路径: {path}")
if path != default_path:
print("[create_orders] 来源: 调用方传入自定义路径")
else:
print("[create_orders] 来源: 使用默认模板路径")
if not path.exists():
print(f"[create_orders] ⚠️ Excel 文件不存在: {path}")
raise FileNotFoundError(f"未找到 Excel 文件:{path}")
try:
df = pd.read_excel(path, sheet_name=0, engine="openpyxl")
except Exception as e:
raise RuntimeError(f"读取 Excel 失败:{e}")
print(f"[create_orders] Excel 读取成功,行数: {len(df)}, 列: {list(df.columns)}")
# 列名容错:返回可选列名,找不到则返回 None
def _pick(col_names: List[str]) -> Optional[str]:
@@ -495,9 +505,20 @@ class BioyondCellWorkstation(BioyondWorkstation):
col_pouch = _pick(["软包组装分液体积", "pouchCellInfo"])
col_cond = _pick(["电导测试分液体积", "conductivityInfo"])
col_cond_cnt = _pick(["电导测试分液瓶数", "conductivityBottleCount"])
print("[create_orders] 列匹配结果:", {
"order_name": col_order_name,
"create_time": col_create_time,
"bottle_type": col_bottle_type,
"mix_time": col_mix_time,
"load": col_load,
"pouch": col_pouch,
"conductivity": col_cond,
"conductivity_bottle_count": col_cond_cnt,
})
# 物料列:所有以 (g) 结尾
material_cols = [c for c in df.columns if isinstance(c, str) and c.endswith("(g)")]
print(f"[create_orders] 识别到的物料列: {material_cols}")
if not material_cols:
raise KeyError("未发现任何以“(g)”结尾的物料列,请检查表头。")
@@ -545,6 +566,9 @@ class BioyondCellWorkstation(BioyondWorkstation):
if mass > 0:
mats.append({"name": mcol.replace("(g)", ""), "mass": mass})
total_mass += mass
else:
if mass < 0:
print(f"[create_orders] 第 {idx+1} 行物料 {mcol} 数值为负数: {mass}")
order_data = {
"batchId": batch_id,
@@ -559,11 +583,22 @@ class BioyondCellWorkstation(BioyondWorkstation):
"materialInfos": mats,
"totalMass": round(total_mass, 4) # 自动汇总
}
print(f"[create_orders] 第 {idx+1} 行解析结果: orderName={order_data['orderName']}, "
f"loadShedding={order_data['loadSheddingInfo']}, pouchCell={order_data['pouchCellInfo']}, "
f"conductivity={order_data['conductivityInfo']}, totalMass={order_data['totalMass']}, "
f"material_count={len(mats)}")
if order_data["totalMass"] <= 0:
print(f"[create_orders] ⚠️ 第 {idx+1} 行总质量 <= 0可能导致 LIMS 校验失败")
if not mats:
print(f"[create_orders] ⚠️ 第 {idx+1} 行未找到有效物料")
orders.append(order_data)
print(f"[create_orders] 即将提交订单数量: {len(orders)}")
response = self._post_lims("/api/lims/order/orders", orders)
print(response)
print(f"[create_orders] 接口返回: {response}")
# 等待任务报送成功
data_list = response.get("data", [])
if data_list:
@@ -1014,7 +1049,35 @@ class BioyondCellWorkstation(BioyondWorkstation):
"create_result": create_result,
"inbound_result": inbound_result,
}
def resource_tree_transfer(self, old_parent: ResourcePLR, plr_resource: ResourcePLR, parent_resource: ResourcePLR):
# ROS2DeviceNode.run_async_func(self._ros_node.resource_tree_transfer, True, **{
# "old_parent": old_parent,
# "plr_resource": plr_resource,
# "parent_resource": parent_resource,
# })
print("resource_tree_transfer", plr_resource, parent_resource)
if hasattr(plr_resource, "unilabos_extra") and plr_resource.unilabos_extra:
if "update_resource_site" in plr_resource.unilabos_extra:
site = plr_resource.unilabos_extra["update_resource_site"]
plr_model = plr_resource.model
board_type = None
for key, (moudle_name,moudle_uuid) in MATERIAL_TYPE_MAPPINGS.items():
if plr_model == moudle_name:
board_type = key
break
if board_type is None:
pass
bottle1 = plr_resource.children[0]
bottle_moudle = bottle1.model
bottle_type = None
for key, (moudle_name, moudle_uuid) in MATERIAL_TYPE_MAPPINGS.items():
if bottle_moudle == moudle_name:
bottle_type = key
break
self.create_sample(plr_resource.name, board_type,bottle_type,site)
return
self.lab_logger().warning(f"无库位的上料,不处理,{plr_resource} 挂载到 {parent_resource}")
def create_sample(
self,

View File

@@ -8,8 +8,8 @@ import os
# BioyondCellWorkstation 默认配置(包含所有必需参数)
API_CONFIG = {
# API 连接配置
# "api_host": os.getenv("BIOYOND_API_HOST", "http://172.21.32.65:44389"),#实机
"api_host": os.getenv("BIOYOND_API_HOST", "http://172.16.10.169:44388"),# 仿真机
# "api_host": os.getenv("BIOYOND_API_HOST", "http://172.16.1.143:44389"),#实机
"api_host": os.getenv("BIOYOND_API_HOST", "http://172.16.7.149:44388"),# 仿真机
"api_key": os.getenv("BIOYOND_API_KEY", "8A819E5C"),
"timeout": int(os.getenv("BIOYOND_TIMEOUT", "30")),
@@ -17,7 +17,7 @@ API_CONFIG = {
"report_token": os.getenv("BIOYOND_REPORT_TOKEN", "CHANGE_ME_TOKEN"),
# HTTP 服务配置
"HTTP_host": os.getenv("BIOYOND_HTTP_HOST", "172.21.32.115"), # HTTP服务监听地址监听计算机飞连ip地址
"HTTP_host": os.getenv("BIOYOND_HTTP_HOST", "172.16.2.140"), # HTTP服务监听地址监听计算机飞连ip地址
"HTTP_port": int(os.getenv("BIOYOND_HTTP_PORT", "8080")),
"debug_mode": False,# 调试模式
}

View File

@@ -177,7 +177,18 @@ class BioyondWorkstation(WorkstationBase):
ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{
"resources": [self.deck]
})
def resource_tree_transfer(self, old_parent: ResourcePLR, plr_resource: ResourcePLR, parent_resource: ResourcePLR):
# ROS2DeviceNode.run_async_func(self._ros_node.resource_tree_transfer, True, **{
# "old_parent": old_parent,
# "plr_resource": plr_resource,
# "parent_resource": parent_resource,
# })
print("resource_tree_transfer", plr_resource, parent_resource)
if hasattr(plr_resource, "unilabos_data") and plr_resource.unilabos_data:
if "update_resource_site" in plr_resource.unilabos_data:
site = plr_resource.unilabos_data["update_resource_site"]
return
self.lab_logger().warning(f"无库位的上料,不处理,{plr_resource} 挂载到 {parent_resource}")
def transfer_resource_to_another(self, resource: List[ResourceSlot], mount_resource: List[ResourceSlot], sites: List[str], mount_device_id: DeviceSlot):
ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True, **{
"plr_resources": resource,

View File

@@ -112,7 +112,7 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
def __init__(
self,
deck: Deck=None,
address: str = "172.21.33.176",
address: str = "172.16.28.102",
port: str = "502",
debug_mode: bool = False,
*args,
@@ -165,7 +165,7 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
print("测试模式,跳过连接")
""" 工站的配置 """
self.nodes = BaseClient.load_csv(os.path.join(os.path.dirname(__file__), 'coin_cell_assembly_a.csv'))
self.nodes = BaseClient.load_csv(os.path.join(os.path.dirname(__file__), 'coin_cell_assembly_1105.csv'))
self.client = modbus_client.register_node_list(self.nodes)
self.success = False
self.allow_data_read = False #允许读取函数运行标志位
@@ -906,7 +906,7 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
return self.success
def func_allpack_cmd(self, elec_num, elec_use_num, elec_vol:int=50, assembly_type:int=7, assembly_pressure:int=4200, file_path: str="C:\\Users\\67484\\Desktop") -> bool:
def func_allpack_cmd(self, elec_num, elec_use_num, elec_vol:int=50, assembly_type:int=7, assembly_pressure:int=4200, file_path: str="/Users/sml/work") -> bool:
elec_num, elec_use_num, elec_vol, assembly_type, assembly_pressure = int(elec_num), int(elec_use_num), int(elec_vol), int(assembly_type), int(assembly_pressure)
summary_csv_file = os.path.join(file_path, "duandian.csv")
# 如果断点文件存在,先读取之前的进度

View File

@@ -0,0 +1,64 @@
Name,DataType,InitValue,Comment,Attribute,DeviceType,Address,
COIL_SYS_START_CMD,BOOL,,,,coil,9010,
COIL_SYS_STOP_CMD,BOOL,,,,coil,9020,
COIL_SYS_RESET_CMD,BOOL,,,,coil,9030,
COIL_SYS_HAND_CMD,BOOL,,,,coil,9040,
COIL_SYS_AUTO_CMD,BOOL,,,,coil,9050,
COIL_SYS_INIT_CMD,BOOL,,,,coil,9060,
COIL_UNILAB_SEND_MSG_SUCC_CMD,BOOL,,,,coil,9700,
COIL_UNILAB_REC_MSG_SUCC_CMD,BOOL,,,,coil,9710,unilab_rec_msg_succ_cmd
COIL_SYS_START_STATUS,BOOL,,,,coil,9210,
COIL_SYS_STOP_STATUS,BOOL,,,,coil,9220,
COIL_SYS_RESET_STATUS,BOOL,,,,coil,9230,
COIL_SYS_HAND_STATUS,BOOL,,,,coil,9240,
COIL_SYS_AUTO_STATUS,BOOL,,,,coil,9250,
COIL_SYS_INIT_STATUS,BOOL,,,,coil,9260,
COIL_REQUEST_REC_MSG_STATUS,BOOL,,,,coil,9500,
COIL_REQUEST_SEND_MSG_STATUS,BOOL,,,,coil,9510,request_send_msg_status
REG_MSG_ELECTROLYTE_USE_NUM,INT16,,,,hold_register,17000,
REG_MSG_ELECTROLYTE_NUM,INT16,,,,hold_register,17002,unilab_send_msg_electrolyte_num
REG_MSG_ELECTROLYTE_VOLUME,INT16,,,,hold_register,17004,unilab_send_msg_electrolyte_vol
REG_MSG_ASSEMBLY_TYPE,INT16,,,,hold_register,17006,unilab_send_msg_assembly_type
REG_MSG_ASSEMBLY_PRESSURE,INT16,,,,hold_register,17008,unilab_send_msg_assembly_pressure
REG_DATA_ASSEMBLY_COIN_CELL_NUM,INT16,,,,hold_register,16000,data_assembly_coin_cell_num
REG_DATA_OPEN_CIRCUIT_VOLTAGE,FLOAT32,,,,hold_register,16002,data_open_circuit_voltage
REG_DATA_AXIS_X_POS,FLOAT32,,,,hold_register,16004,
REG_DATA_AXIS_Y_POS,FLOAT32,,,,hold_register,16006,
REG_DATA_AXIS_Z_POS,FLOAT32,,,,hold_register,16008,
REG_DATA_POLE_WEIGHT,FLOAT32,,,,hold_register,16010,data_pole_weight
REG_DATA_ASSEMBLY_PER_TIME,FLOAT32,,,,hold_register,16012,data_assembly_time
REG_DATA_ASSEMBLY_PRESSURE,INT16,,,,hold_register,16014,data_assembly_pressure
REG_DATA_ELECTROLYTE_VOLUME,INT16,,,,hold_register,16016,data_electrolyte_volume
REG_DATA_COIN_NUM,INT16,,,,hold_register,16018,data_coin_num
REG_DATA_ELECTROLYTE_CODE,STRING,,,,hold_register,16020,data_electrolyte_code()
REG_DATA_COIN_CELL_CODE,STRING,,,,hold_register,16030,data_coin_cell_code()
REG_DATA_STACK_VISON_CODE,STRING,,,,hold_register,18004,data_stack_vision_code()
REG_DATA_GLOVE_BOX_PRESSURE,FLOAT32,,,,hold_register,16050,data_glove_box_pressure
REG_DATA_GLOVE_BOX_WATER_CONTENT,FLOAT32,,,,hold_register,16052,data_glove_box_water_content
REG_DATA_GLOVE_BOX_O2_CONTENT,FLOAT32,,,,hold_register,16054,data_glove_box_o2_content
UNILAB_SEND_ELECTROLYTE_BOTTLE_NUM,BOOL,,,,coil,9720,
UNILAB_RECE_ELECTROLYTE_BOTTLE_NUM,BOOL,,,,coil,9520,
REG_MSG_ELECTROLYTE_NUM_USED,INT16,,,,hold_register,17496,
REG_DATA_ELECTROLYTE_USE_NUM,INT16,,,,hold_register,16000,
UNILAB_SEND_FINISHED_CMD,BOOL,,,,coil,9730,
UNILAB_RECE_FINISHED_CMD,BOOL,,,,coil,9530,
REG_DATA_ASSEMBLY_TYPE,INT16,,,,hold_register,16018,ASSEMBLY_TYPE7or8
COIL_ALUMINUM_FOIL,BOOL,,使用铝箔垫,,coil,9340,
REG_MSG_NE_PLATE_MATRIX,INT16,,负极片矩阵点位,,hold_register,17440,
REG_MSG_SEPARATOR_PLATE_MATRIX,INT16,,隔膜矩阵点位,,hold_register,17450,
REG_MSG_TIP_BOX_MATRIX,INT16,,移液枪头矩阵点位,,hold_register,17480,
REG_MSG_NE_PLATE_NUM,INT16,,负极片盘数,,hold_register,17443,
REG_MSG_SEPARATOR_PLATE_NUM,INT16,,隔膜盘数,,hold_register,17453,
REG_MSG_PRESS_MODE,BOOL,,压制模式false:压力检测模式True:距离模式),,coil,9360,电池压制模式
,,,,,,,
,BOOL,,视觉对位false:使用true:忽略),,coil,9300,视觉对位
,BOOL,,复检false:使用true:忽略),,coil,9310,视觉复检
,BOOL,,手套箱_左仓false:使用true:忽略),,coil,9320,手套箱左仓
,BOOL,,手套箱_右仓false:使用true:忽略),,coil,9420,手套箱右仓
,BOOL,,真空检知false:使用true:忽略),,coil,9350,真空检知
,BOOL,,电解液添加模式false:单次滴液true:二次滴液),,coil,9370,滴液模式
,BOOL,,正极片称重false:使用true:忽略),,coil,9380,正极片称重
,BOOL,,正负极片组装方式false:正装true:倒装),,coil,9390,正负极反装
,BOOL,,压制清洁false:使用true:忽略),,coil,9400,压制清洁
,BOOL,,物料盘摆盘方式false:水平摆盘true:堆叠摆盘),,coil,9410,负极片摆盘方式
REG_MSG_BATTERY_CLEAN_IGNORE,BOOL,,忽略电池清洁false:使用true:忽略),,coil,9460,
1 Name DataType InitValue Comment Attribute DeviceType Address
2 COIL_SYS_START_CMD BOOL coil 9010
3 COIL_SYS_STOP_CMD BOOL coil 9020
4 COIL_SYS_RESET_CMD BOOL coil 9030
5 COIL_SYS_HAND_CMD BOOL coil 9040
6 COIL_SYS_AUTO_CMD BOOL coil 9050
7 COIL_SYS_INIT_CMD BOOL coil 9060
8 COIL_UNILAB_SEND_MSG_SUCC_CMD BOOL coil 9700
9 COIL_UNILAB_REC_MSG_SUCC_CMD BOOL coil 9710 unilab_rec_msg_succ_cmd
10 COIL_SYS_START_STATUS BOOL coil 9210
11 COIL_SYS_STOP_STATUS BOOL coil 9220
12 COIL_SYS_RESET_STATUS BOOL coil 9230
13 COIL_SYS_HAND_STATUS BOOL coil 9240
14 COIL_SYS_AUTO_STATUS BOOL coil 9250
15 COIL_SYS_INIT_STATUS BOOL coil 9260
16 COIL_REQUEST_REC_MSG_STATUS BOOL coil 9500
17 COIL_REQUEST_SEND_MSG_STATUS BOOL coil 9510 request_send_msg_status
18 REG_MSG_ELECTROLYTE_USE_NUM INT16 hold_register 17000
19 REG_MSG_ELECTROLYTE_NUM INT16 hold_register 17002 unilab_send_msg_electrolyte_num
20 REG_MSG_ELECTROLYTE_VOLUME INT16 hold_register 17004 unilab_send_msg_electrolyte_vol
21 REG_MSG_ASSEMBLY_TYPE INT16 hold_register 17006 unilab_send_msg_assembly_type
22 REG_MSG_ASSEMBLY_PRESSURE INT16 hold_register 17008 unilab_send_msg_assembly_pressure
23 REG_DATA_ASSEMBLY_COIN_CELL_NUM INT16 hold_register 16000 data_assembly_coin_cell_num
24 REG_DATA_OPEN_CIRCUIT_VOLTAGE FLOAT32 hold_register 16002 data_open_circuit_voltage
25 REG_DATA_AXIS_X_POS FLOAT32 hold_register 16004
26 REG_DATA_AXIS_Y_POS FLOAT32 hold_register 16006
27 REG_DATA_AXIS_Z_POS FLOAT32 hold_register 16008
28 REG_DATA_POLE_WEIGHT FLOAT32 hold_register 16010 data_pole_weight
29 REG_DATA_ASSEMBLY_PER_TIME FLOAT32 hold_register 16012 data_assembly_time
30 REG_DATA_ASSEMBLY_PRESSURE INT16 hold_register 16014 data_assembly_pressure
31 REG_DATA_ELECTROLYTE_VOLUME INT16 hold_register 16016 data_electrolyte_volume
32 REG_DATA_COIN_NUM INT16 hold_register 16018 data_coin_num
33 REG_DATA_ELECTROLYTE_CODE STRING hold_register 16020 data_electrolyte_code()
34 REG_DATA_COIN_CELL_CODE STRING hold_register 16030 data_coin_cell_code()
35 REG_DATA_STACK_VISON_CODE STRING hold_register 18004 data_stack_vision_code()
36 REG_DATA_GLOVE_BOX_PRESSURE FLOAT32 hold_register 16050 data_glove_box_pressure
37 REG_DATA_GLOVE_BOX_WATER_CONTENT FLOAT32 hold_register 16052 data_glove_box_water_content
38 REG_DATA_GLOVE_BOX_O2_CONTENT FLOAT32 hold_register 16054 data_glove_box_o2_content
39 UNILAB_SEND_ELECTROLYTE_BOTTLE_NUM BOOL coil 9720
40 UNILAB_RECE_ELECTROLYTE_BOTTLE_NUM BOOL coil 9520
41 REG_MSG_ELECTROLYTE_NUM_USED INT16 hold_register 17496
42 REG_DATA_ELECTROLYTE_USE_NUM INT16 hold_register 16000
43 UNILAB_SEND_FINISHED_CMD BOOL coil 9730
44 UNILAB_RECE_FINISHED_CMD BOOL coil 9530
45 REG_DATA_ASSEMBLY_TYPE INT16 hold_register 16018 ASSEMBLY_TYPE7or8
46 COIL_ALUMINUM_FOIL BOOL 使用铝箔垫 coil 9340
47 REG_MSG_NE_PLATE_MATRIX INT16 负极片矩阵点位 hold_register 17440
48 REG_MSG_SEPARATOR_PLATE_MATRIX INT16 隔膜矩阵点位 hold_register 17450
49 REG_MSG_TIP_BOX_MATRIX INT16 移液枪头矩阵点位 hold_register 17480
50 REG_MSG_NE_PLATE_NUM INT16 负极片盘数 hold_register 17443
51 REG_MSG_SEPARATOR_PLATE_NUM INT16 隔膜盘数 hold_register 17453
52 REG_MSG_PRESS_MODE BOOL 压制模式(false:压力检测模式,True:距离模式) coil 9360 电池压制模式
53
54 BOOL 视觉对位(false:使用,true:忽略) coil 9300 视觉对位
55 BOOL 复检(false:使用,true:忽略) coil 9310 视觉复检
56 BOOL 手套箱_左仓(false:使用,true:忽略) coil 9320 手套箱左仓
57 BOOL 手套箱_右仓(false:使用,true:忽略) coil 9420 手套箱右仓
58 BOOL 真空检知(false:使用,true:忽略) coil 9350 真空检知
59 BOOL 电解液添加模式(false:单次滴液,true:二次滴液) coil 9370 滴液模式
60 BOOL 正极片称重(false:使用,true:忽略) coil 9380 正极片称重
61 BOOL 正负极片组装方式(false:正装,true:倒装) coil 9390 正负极反装
62 BOOL 压制清洁(false:使用,true:忽略) coil 9400 压制清洁
63 BOOL 物料盘摆盘方式(false:水平摆盘,true:堆叠摆盘) coil 9410 负极片摆盘方式
64 REG_MSG_BATTERY_CLEAN_IGNORE BOOL 忽略电池清洁(false:使用,true:忽略) coil 9460

View File

@@ -233,7 +233,7 @@ def YB_1BottleCarrier(name: str) -> BottleCarrier:
resource_size_y=beaker_diameter,
name_prefix=name,
),
model="1BottleCarrier",
model="YB_1BottleCarrier",
)
carrier.num_items_x = 1
carrier.num_items_y = 1
@@ -270,7 +270,7 @@ def YB_1GaoNianYeBottleCarrier(name: str) -> BottleCarrier:
resource_size_y=beaker_diameter,
name_prefix=name,
),
model="1GaoNianYeBottleCarrier",
model="YB_1GaoNianYeBottleCarrier",
)
carrier.num_items_x = 1
carrier.num_items_y = 1
@@ -307,7 +307,7 @@ def YB_1Bottle100mlCarrier(name: str) -> BottleCarrier:
resource_size_y=beaker_diameter,
name_prefix=name,
),
model="1Bottle100mlCarrier",
model="YB_1Bottle100mlCarrier",
)
carrier.num_items_x = 1
carrier.num_items_y = 1
@@ -355,7 +355,7 @@ def YB_6x5ml_DispensingVialCarrier(name: str) -> BottleCarrier:
size_y=carrier_size_y,
size_z=carrier_size_z,
sites=sites,
model="6x5ml_DispensingVialCarrier",
model="YB_6x5ml_DispensingVialCarrier",
)
carrier.num_items_x = 4
carrier.num_items_y = 2
@@ -405,7 +405,7 @@ def YB_6x20ml_DispensingVialCarrier(name: str) -> BottleCarrier:
size_y=carrier_size_y,
size_z=carrier_size_z,
sites=sites,
model="6x20ml_DispensingVialCarrier",
model="YB_6x20ml_DispensingVialCarrier",
)
carrier.num_items_x = 4
carrier.num_items_y = 2
@@ -455,7 +455,7 @@ def YB_6x_SmallSolutionBottleCarrier(name: str) -> BottleCarrier:
size_y=carrier_size_y,
size_z=carrier_size_z,
sites=sites,
model="6x_SmallSolutionBottleCarrier",
model="YB_6x_SmallSolutionBottleCarrier",
)
carrier.num_items_x = 4
carrier.num_items_y = 2
@@ -505,7 +505,7 @@ def YB_4x_LargeSolutionBottleCarrier(name: str) -> BottleCarrier:
size_y=carrier_size_y,
size_z=carrier_size_z,
sites=sites,
model="4x_LargeSolutionBottleCarrier",
model="YB_4x_LargeSolutionBottleCarrier",
)
carrier.num_items_x = 2
carrier.num_items_y = 2
@@ -554,7 +554,7 @@ def YB_jia_yang_tou_da_1X1_carrier(name: str) -> BottleCarrier:
size_y=carrier_size_y,
size_z=carrier_size_z,
sites=sites,
model="6x_LargeDispenseHeadCarrier",
model="YB_6x_LargeDispenseHeadCarrier",
)
carrier.num_items_x = 1
carrier.num_items_y = 1
@@ -591,7 +591,7 @@ def YB_AdapterBlock(name: str) -> BottleCarrier:
resource_size_y=adapter_diameter,
name_prefix=name,
),
model="AdapterBlock",
model="YB_AdapterBlock",
)
carrier.num_items_x = 1
carrier.num_items_y = 1
@@ -639,7 +639,7 @@ def YB_TipBox(name: str) -> BottleCarrier:
size_y=carrier_size_y,
size_z=carrier_size_z,
sites=sites,
model="TipBox",
model="YB_TipBox",
)
carrier.num_items_x = 12
carrier.num_items_y = 8

View File

@@ -15,7 +15,7 @@ def YB_jia_yang_tou_da(
height=height,
max_volume=max_volume,
barcode=barcode,
model="Solid_Stock",
model="YB_jia_yang_tou_da_1X1_carrier",
)
"""液1x1"""
@@ -51,7 +51,7 @@ def YB_ye_100ml_Bottle(
height=height,
max_volume=max_volume,
barcode=barcode,
model="Liquid_Bottle_100ml",
model="YB_1Bottle100mlCarrier",
)
"""高粘液"""
@@ -87,7 +87,7 @@ def YB_fen_ye_5ml_Bottle(
height=height,
max_volume=max_volume,
barcode=barcode,
model="Separation_Bottle_5ml",
model="YB_fen_ye_5ml_Bottle",
)
"""20ml分液瓶"""
@@ -105,7 +105,7 @@ def YB_fen_ye_20ml_Bottle(
height=height,
max_volume=max_volume,
barcode=barcode,
model="Separation_Bottle_20ml",
model="YB_fen_ye_20ml_Bottle",
)
"""配液瓶(小)"""
@@ -123,7 +123,7 @@ def YB_pei_ye_xiao_Bottle(
height=height,
max_volume=max_volume,
barcode=barcode,
model="Mixing_Bottle_Small",
model="YB_pei_ye_xiao_Bottle",
)
"""配液瓶(大)"""
@@ -141,7 +141,7 @@ def YB_pei_ye_da_Bottle(
height=height,
max_volume=max_volume,
barcode=barcode,
model="Mixing_Bottle_Large",
model="YB_pei_ye_da_Bottle",
)
"""枪头"""
@@ -159,5 +159,5 @@ def YB_Pipette_Tip(
height=height,
max_volume=max_volume,
barcode=barcode,
model="Pipette_Tip",
model="YB_Pipette_Tip",
)

View File

@@ -6,11 +6,12 @@ from typing import Optional, Dict, Any, List
import rclpy
from unilabos_msgs.srv._serial_command import SerialCommand_Response
from unilabos.app.register import register_devices_and_resources
from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet
from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher
from unilabos_msgs.srv import SerialCommand # type: ignore
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.timer import Timer
@@ -108,66 +109,51 @@ def slave(
rclpy_init_args: List[str] = ["--log-level", "debug"],
) -> None:
"""从节点函数"""
# 1. 初始化 ROS2
if not rclpy.ok():
rclpy.init(args=rclpy_init_args)
executor = rclpy.__executor
if not executor:
executor = rclpy.__executor = MultiThreadedExecutor()
devices_instances = {}
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type != "device":
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
devices_instances[device_id] = d
# 默认初始化
# if d is not None and isinstance(d, Node):
# executor.add_node(d)
# else:
# print(f"Warning: Device {device_id} could not be initialized or is not a valid Node")
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
executor.add_node(n)
if visual != "disable":
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
resource_mesh_manager = ResourceMeshManager(
resources_mesh_config,
resources_config, # type: ignore FIXME
resource_tracker=DeviceNodeResourceTracker(),
device_id="resource_mesh_manager",
)
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
executor.add_node(resource_mesh_manager)
executor.add_node(joint_republisher)
# 1.5 启动 executor 线程
thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
thread.start()
# 2. 创建 Slave Machine Node
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
executor.add_node(n)
# 3. 向 Host 报送节点信息和物料,获取 UUID 映射
uuid_mapping = {}
if not BasicConfig.slave_no_host:
# 3.1 报送节点信息
sclient = n.create_client(SerialCommand, "/node_info_update")
sclient.wait_for_service()
registry_config = {}
devices_to_register, resources_to_register = register_devices_and_resources(lab_registry, True)
registry_config.update(devices_to_register)
registry_config.update(resources_to_register)
request = SerialCommand.Request()
request.command = json.dumps(
{
"machine_name": BasicConfig.machine_name,
"type": "slave",
"devices_config": devices_config.dump(),
"registry_config": lab_registry.obtain_registry_device_info(),
"registry_config": registry_config,
},
ensure_ascii=False,
cls=TypeEncoder,
)
response = sclient.call_async(request).result()
sclient.call_async(request).result()
logger.info(f"Slave node info updated.")
# 使用新的 c2s_update_resource_tree 服务
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
rclient.wait_for_service()
# 序列化 ResourceTreeSet 为 JSON
# 3.2 报送物料树,获取 UUID 映射
if resources_config:
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
rclient.wait_for_service()
request = SerialCommand.Request()
request.command = json.dumps(
{
@@ -180,35 +166,61 @@ def slave(
},
ensure_ascii=False,
)
tree_response: SerialCommand_Response = rclient.call_async(request).result()
tree_response: SerialCommand_Response = rclient.call(request)
uuid_mapping = json.loads(tree_response.response)
# 创建反向映射new_uuid -> old_uuid
reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
for node in resources_config.root_nodes:
if node.res_content.type == "device":
for sub_node in node.children:
# 只有二级子设备
if sub_node.res_content.type != "device":
device_tracker = devices_instances[node.res_content.id].resource_tracker
# sub_node.res_content.uuid 已经是新UUID需要用旧UUID去查找
old_uuid = reverse_uuid_mapping.get(sub_node.res_content.uuid)
if old_uuid:
# 找到旧UUID使用UUID查找
resource_instance = device_tracker.figure_resource({"uuid": old_uuid})
else:
# 未找到旧UUID使用name查找
resource_instance = device_tracker.figure_resource({"name": sub_node.res_content.name})
device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
logger.info(f"Slave resource tree added. UUID mapping: {len(uuid_mapping)} nodes")
# 3.3 使用 UUID 映射更新 resources_config 的 UUID参考 client.py 逻辑)
old_uuids = {node.res_content.uuid: node for node in resources_config.all_nodes}
for old_uuid, node in old_uuids.items():
if old_uuid in uuid_mapping:
new_uuid = uuid_mapping[old_uuid]
node.res_content.uuid = new_uuid
# 更新所有子节点的 parent_uuid
for child in node.children:
child.res_content.parent_uuid = new_uuid
else:
logger.error("Slave模式不允许新增非设备节点下的物料")
continue
if tree_response:
logger.info(f"Slave resource tree added. Response: {tree_response.response}")
else:
logger.warning("Slave resource tree add response is None")
logger.warning(f"资源UUID未更新: {old_uuid}")
else:
logger.info("No resources to add.")
# 4. 初始化所有设备实例(此时 resources_config 的 UUID 已更新)
devices_instances = {}
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type == "device":
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
if d is not None:
devices_instances[device_id] = d
logger.info(f"Device {device_id} initialized.")
else:
logger.warning(f"Device {device_id} initialization failed.")
# 5. 如果启用可视化,创建可视化相关节点
if visual != "disable":
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
# 将 ResourceTreeSet 转换为 list 用于 visual 组件
resources_list = (
[node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes]
if resources_config
else []
)
resource_mesh_manager = ResourceMeshManager(
resources_mesh_config,
resources_list,
resource_tracker=DeviceNodeResourceTracker(),
device_id="resource_mesh_manager",
)
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
lh_joint_pub = LiquidHandlerJointPublisher(
resources_config=resources_list, resource_tracker=DeviceNodeResourceTracker()
)
executor.add_node(resource_mesh_manager)
executor.add_node(joint_republisher)
executor.add_node(lh_joint_pub)
# 7. 保持运行
while True:
time.sleep(1)

View File

@@ -53,7 +53,7 @@ from unilabos.ros.nodes.resource_tracker import (
)
from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
from unilabos.utils.async_util import run_async_func
from rclpy.task import Task, Future
from unilabos.utils.import_manager import default_manager
from unilabos.utils.log import info, debug, warning, error, critical, logger, trace
from unilabos.utils.type_check import get_type_class, TypeEncoder, get_result_info_str
@@ -555,6 +555,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
rclpy.get_global_executor().add_node(self)
self.lab_logger().debug(f"ROS节点初始化完成")
async def sleep(self, rel_time: float, callback_group=None):
if callback_group is None:
callback_group = self.callback_group
await ROS2DeviceNode.async_wait_for(self, rel_time, callback_group)
@classmethod
async def create_task(cls, func, trace_error=True, **kwargs) -> Task:
return ROS2DeviceNode.run_async_func(func, trace_error, **kwargs)
async def update_resource(self, resources: List["ResourcePLR"]):
r = SerialCommand.Request()
tree_set = ResourceTreeSet.from_plr_resources(resources)
@@ -629,6 +638,145 @@ class BaseROS2DeviceNode(Node, Generic[T]):
- remove: 从资源树中移除资源
"""
from pylabrobot.resources.resource import Resource as ResourcePLR
def _handle_add(
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
处理资源添加操作的内部函数
Args:
plr_resources: PLR资源列表
tree_set: 资源树集合
additional_add_params: 额外的添加参数
Returns:
操作结果字典
"""
for plr_resource, tree in zip(plr_resources, tree_set.trees):
self.resource_tracker.add_resource(plr_resource)
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
func = getattr(self.driver_instance, "resource_tree_add", None)
if callable(func):
func(plr_resources)
return {"success": True, "action": "add"}
def _handle_remove(resources_uuid: List[str]) -> Dict[str, Any]:
"""
处理资源移除操作的内部函数
Args:
resources_uuid: 要移除的资源UUID列表
Returns:
操作结果字典,包含移除的资源列表
"""
found_resources: List[List[Union[ResourcePLR, dict]]] = self.resource_tracker.figure_resource(
[{"uuid": uid} for uid in resources_uuid], try_mode=True
)
found_plr_resources = []
other_plr_resources = []
for found_resource in found_resources:
for resource in found_resource:
if issubclass(resource.__class__, ResourcePLR):
found_plr_resources.append(resource)
else:
other_plr_resources.append(resource)
# 调用driver的remove回调
func = getattr(self.driver_instance, "resource_tree_remove", None)
if callable(func):
func(found_plr_resources)
# 从parent卸载并从tracker移除
for plr_resource in found_plr_resources:
if plr_resource.parent is not None:
plr_resource.parent.unassign_child_resource(plr_resource)
self.resource_tracker.remove_resource(plr_resource)
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
for other_plr_resource in other_plr_resources:
self.resource_tracker.remove_resource(other_plr_resource)
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
return {
"success": True,
"action": "remove",
# "removed_plr": found_plr_resources,
# "removed_other": other_plr_resources,
}
def _handle_update(
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
处理资源更新操作的内部函数
Args:
plr_resources: PLR资源列表包含新状态
tree_set: 资源树集合
additional_add_params: 额外的参数
Returns:
操作结果字典
"""
for plr_resource, tree in zip(plr_resources, tree_set.trees):
states = plr_resource.serialize_all_state()
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
)
# Update操作中包含改名需要先remove再add
if original_instance.name != plr_resource.name:
old_name = original_instance.name
new_name = plr_resource.name
self.lab_logger().info(f"物料改名操作:{old_name} -> {new_name}")
# 收集所有相关的uuid包括子节点
_handle_remove([original_instance.unilabos_uuid])
original_instance.name = new_name
_handle_add([original_instance], tree_set, additional_add_params)
self.lab_logger().info(f"物料改名完成:{old_name} -> {new_name}")
# 常规更新:不涉及改名
original_parent_resource = original_instance.parent
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
self.lab_logger().info(
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} "
f"目标父节点{target_parent_resource_uuid} 更新"
)
# 更新extra
if getattr(plr_resource, "unilabos_extra", None) is not None:
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") # type: ignore # noqa: E501
# 如果父节点变化,需要重新挂载
if (
original_parent_resource_uuid != target_parent_resource_uuid
and original_parent_resource is not None
):
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
# 加载状态
original_instance.load_all_state(states)
child_count = len(original_instance.get_all_children())
self.lab_logger().info(
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] " f"及其子节点 {child_count}"
)
# 调用driver的update回调
func = getattr(self.driver_instance, "resource_tree_update", None)
if callable(func):
func(plr_resources)
return {"success": True, "action": "update"}
try:
data = json.loads(req.command)
results = []
@@ -647,7 +795,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
].call_async(
SerialCommand.Request(
command=json.dumps(
{"data": {"data": resources_uuid, "with_children": False}, "action": "get"}
{"data": {"data": resources_uuid, "with_children": True if action == "add" else False}, "action": "get"}
)
)
) # type: ignore
@@ -655,68 +803,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
tree_set = ResourceTreeSet.from_raw_list(raw_nodes)
try:
if action == "add":
# 添加资源到资源跟踪器
if tree_set is None:
raise ValueError("tree_set不能为None")
plr_resources = tree_set.to_plr_resources()
for plr_resource, tree in zip(plr_resources, tree_set.trees):
self.resource_tracker.add_resource(plr_resource)
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
func = getattr(self.driver_instance, "resource_tree_add", None)
if callable(func):
func(plr_resources)
results.append({"success": True, "action": "add"})
result = _handle_add(plr_resources, tree_set, additional_add_params)
results.append(result)
elif action == "update":
# 更新资源
if tree_set is None:
raise ValueError("tree_set不能为None")
plr_resources = tree_set.to_plr_resources()
for plr_resource, tree in zip(plr_resources, tree_set.trees):
states = plr_resource.serialize_all_state()
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
)
original_parent_resource = original_instance.parent
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
self.lab_logger().info(
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} 目标父节点{target_parent_resource_uuid} 更新"
)
# todo: 对extra进行update
if getattr(plr_resource, "unilabos_extra", None) is not None:
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra")
if original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None:
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
original_instance.load_all_state(states)
self.lab_logger().info(
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())}"
)
func = getattr(self.driver_instance, "resource_tree_update", None)
if callable(func):
func(plr_resources)
results.append({"success": True, "action": "update"})
result = _handle_update(plr_resources, tree_set, additional_add_params)
results.append(result)
elif action == "remove":
# 移除资源
found_resources: List[List[Union[ResourcePLR, dict]]] = self.resource_tracker.figure_resource(
[{"uuid": uid} for uid in resources_uuid], try_mode=True
)
found_plr_resources = []
other_plr_resources = []
for found_resource in found_resources:
for resource in found_resource:
if issubclass(resource.__class__, ResourcePLR):
found_plr_resources.append(resource)
else:
other_plr_resources.append(resource)
func = getattr(self.driver_instance, "resource_tree_remove", None)
if callable(func):
func(found_plr_resources)
for plr_resource in found_plr_resources:
if plr_resource.parent is not None:
plr_resource.parent.unassign_child_resource(plr_resource)
self.resource_tracker.remove_resource(plr_resource)
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
for other_plr_resource in other_plr_resources:
self.resource_tracker.remove_resource(other_plr_resource)
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
results.append({"success": True, "action": "remove"})
result = _handle_remove(resources_uuid)
results.append(result)
except Exception as e:
error_msg = f"Error processing {action} operation: {str(e)}"
self.lab_logger().error(f"[Resource Tree Update] {error_msg}")
@@ -725,7 +825,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 返回处理结果
result_json = {"results": results, "total": len(data)}
res.response = json.dumps(result_json, ensure_ascii=False)
res.response = json.dumps(result_json, ensure_ascii=False, cls=TypeEncoder)
self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations")
except json.JSONDecodeError as e:
@@ -995,9 +1095,14 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 通过资源跟踪器获取本地实例
final_resources = queried_resources if is_sequence else queried_resources[0]
final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
]
final_resources = (
self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)
if not is_sequence
else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False)
for res in queried_resources
]
)
action_kwargs[k] = final_resources
except Exception as e:
@@ -1218,6 +1323,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raise JsonCommandInitError(
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
)
def _convert_resource_sync(self, resource_data: Dict[str, Any]):
"""同步转换资源数据为实例"""
# 创建资源查询请求
@@ -1385,18 +1491,27 @@ class ROS2DeviceNode:
它不继承设备类,而是通过代理模式访问设备类的属性和方法。
"""
# 类变量,用于循环管理
_loop = None
_loop_running = False
_loop_thread = None
@classmethod
def run_async_func(cls, func, trace_error=True, **kwargs) -> Task:
def _handle_future_exception(fut):
try:
fut.result()
except Exception as e:
error(f"异步任务 {func.__name__} 报错了")
error(traceback.format_exc())
future = rclpy.get_global_executor().create_task(func(**kwargs))
if trace_error:
future.add_done_callback(_handle_future_exception)
return future
@classmethod
def get_loop(cls):
return cls._loop
@classmethod
def run_async_func(cls, func, trace_error=True, **kwargs):
return run_async_func(func, loop=cls._loop, trace_error=trace_error, **kwargs)
async def async_wait_for(cls, node: Node, wait_time: float, callback_group=None):
future = Future()
timer = node.create_timer(wait_time, lambda : future.set_result(None), callback_group=callback_group, clock=node.get_clock())
await future
timer.cancel()
node.destroy_timer(timer)
@property
def driver_instance(self):
@@ -1436,11 +1551,6 @@ class ROS2DeviceNode:
print_publish: 是否打印发布信息
driver_is_ros:
"""
# 在初始化时检查循环状态
if ROS2DeviceNode._loop_running and ROS2DeviceNode._loop_thread is not None:
pass
elif ROS2DeviceNode._loop_thread is None:
self._start_loop()
# 保存设备类是否支持异步上下文
self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__")
@@ -1529,17 +1639,6 @@ class ROS2DeviceNode:
except Exception as e:
self._ros_node.lab_logger().error(f"设备后初始化失败: {e}")
def _start_loop(self):
def run_event_loop():
loop = asyncio.new_event_loop()
ROS2DeviceNode._loop = loop
asyncio.set_event_loop(loop)
loop.run_forever()
ROS2DeviceNode._loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNodeLoop")
ROS2DeviceNode._loop_thread.start()
logger.info(f"循环线程已启动")
class DeviceInfoType(TypedDict):
id: str

View File

@@ -18,7 +18,8 @@ from unilabos_msgs.srv import (
ResourceDelete,
ResourceUpdate,
ResourceList,
SerialCommand, ResourceGet,
SerialCommand,
ResourceGet,
) # type: ignore
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID
@@ -164,29 +165,16 @@ class HostNode(BaseROS2DeviceNode):
# resources_config 的 root node 是
# # 创建反向映射new_uuid -> old_uuid
# reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
# for tree in resources_config.trees:
# node = tree.root_node
# if node.res_content.type == "device":
# if node.res_content.id == "host_node":
# continue
# # slave节点走c2s更新接口拿到add自行update uuid
# device_tracker = self.devices_instances[node.res_content.id].resource_tracker
# old_uuid = reverse_uuid_mapping.get(node.res_content.uuid)
# if old_uuid:
# # 找到旧UUID使用UUID查找
# resource_instance = device_tracker.uuid_to_resources.get(old_uuid)
# else:
# # 未找到旧UUID使用name查找
# resource_instance = device_tracker.figure_resource(
# {"name": node.res_content.name}
# )
# device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
# else:
# try:
# for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
# self.resource_tracker.add_resource(plr_resource)
# except Exception as ex:
# self.lab_logger().warning("[Host Node-Resource] 根节点物料序列化失败!")
for tree in resources_config.trees:
node = tree.root_node
if node.res_content.type == "device":
continue
else:
try:
for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
self._resource_tracker.add_resource(plr_resource)
except Exception as ex:
self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!")
except Exception as ex:
logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}")
# 初始化Node基类传递空参数覆盖列表
@@ -877,11 +865,10 @@ class HostNode(BaseROS2DeviceNode):
success = False
uuid_mapping = {}
if len(self.bridges) > 0:
from unilabos.app.web.client import HTTPClient
from unilabos.app.web.client import HTTPClient, http_client
client: HTTPClient = self.bridges[-1]
resource_start_time = time.time()
uuid_mapping = client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
uuid_mapping = http_client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
success = True
resource_end_time = time.time()
self.lab_logger().info(
@@ -989,9 +976,10 @@ class HostNode(BaseROS2DeviceNode):
"""
更新节点信息回调
"""
self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
# self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
try:
from unilabos.app.communication import get_communication_client
from unilabos.app.web.client import HTTPClient, http_client
info = json.loads(request.command)
if "SYNC_SLAVE_NODE_INFO" in info:
@@ -1000,10 +988,10 @@ class HostNode(BaseROS2DeviceNode):
edge_device_id = info["edge_device_id"]
self.device_machine_names[edge_device_id] = machine_name
else:
comm_client = get_communication_client()
registry_config = info["registry_config"]
for device_config in registry_config:
comm_client.publish_registry(device_config["id"], device_config)
devices_config = info.pop("devices_config")
registry_config = info.pop("registry_config")
if registry_config:
http_client.resource_registry({"resources": registry_config})
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK"
except Exception as e:
@@ -1029,10 +1017,9 @@ class HostNode(BaseROS2DeviceNode):
success = False
if len(self.bridges) > 0: # 边的提交待定
from unilabos.app.web.client import HTTPClient
from unilabos.app.web.client import HTTPClient, http_client
client: HTTPClient = self.bridges[-1]
r = client.resource_add(add_schema(resources))
r = http_client.resource_add(add_schema(resources))
success = bool(r)
response.success = success

View File

@@ -2,7 +2,7 @@ import traceback
import uuid
from pydantic import BaseModel, field_serializer, field_validator
from pydantic import Field
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
from unilabos.utils.log import logger
@@ -894,7 +894,7 @@ class DeviceNodeResourceTracker(object):
new_uuid = name_to_uuid_map[resource_name]
self.set_resource_uuid(res, new_uuid)
self.uuid_to_resources[new_uuid] = res
logger.debug(f"设置资源UUID: {resource_name} -> {new_uuid}")
logger.trace(f"设置资源UUID: {resource_name} -> {new_uuid}")
return 1
return 0
@@ -917,7 +917,8 @@ class DeviceNodeResourceTracker(object):
if resource_name and resource_name in name_to_extra_map:
extra = name_to_extra_map[resource_name]
self.set_resource_extra(res, extra)
logger.debug(f"设置资源Extra: {resource_name} -> {extra}")
if len(extra):
logger.debug(f"设置资源Extra: {resource_name} -> {extra}")
return 1
return 0
@@ -927,7 +928,7 @@ class DeviceNodeResourceTracker(object):
"""
递归遍历资源树更新所有节点的uuid
Args:0
Args:
resource: 资源对象可以是dict或实例
uuid_map: uuid映射字典{old_uuid: new_uuid}
@@ -952,6 +953,27 @@ class DeviceNodeResourceTracker(object):
return self._traverse_and_process(resource, process)
def loop_gather_uuid(self, resource) -> List[str]:
"""
递归遍历资源树收集所有节点的uuid
Args:
resource: 资源对象可以是dict或实例
Returns:
收集到的uuid列表
"""
uuid_list = []
def process(res):
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid:
uuid_list.append(current_uuid)
return 0
self._traverse_and_process(resource, process)
return uuid_list
def _collect_uuid_mapping(self, resource):
"""
递归收集资源的 uuid 映射到 uuid_to_resources
@@ -965,14 +987,15 @@ class DeviceNodeResourceTracker(object):
if current_uuid:
old = self.uuid_to_resources.get(current_uuid)
self.uuid_to_resources[current_uuid] = res
logger.debug(
logger.trace(
f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}"
)
return 1
return 0
self._traverse_and_process(resource, process)
def _remove_uuid_mapping(self, resource):
def _remove_uuid_mapping(self, resource) -> int:
"""
递归清除资源的 uuid 映射
@@ -984,10 +1007,11 @@ class DeviceNodeResourceTracker(object):
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid and current_uuid in self.uuid_to_resources:
self.uuid_to_resources.pop(current_uuid)
logger.debug(f"移除资源UUID映射: {current_uuid} -> {res}")
logger.trace(f"移除资源UUID映射: {current_uuid} -> {res}")
return 1
return 0
self._traverse_and_process(resource, process)
return self._traverse_and_process(resource, process)
def parent_resource(self, resource):
if id(resource) in self.resource2parent_resource:
@@ -1042,13 +1066,12 @@ class DeviceNodeResourceTracker(object):
removed = True
break
if not removed:
# 递归清除uuid映射
count = self._remove_uuid_mapping(resource)
if not count:
logger.warning(f"尝试移除不存在的资源: {resource}")
return False
# 递归清除uuid映射
self._remove_uuid_mapping(resource)
# 清除 resource2parent_resource 中与该资源相关的映射
# 需要清除1) 该资源作为 key 的映射 2) 该资源作为 value 的映射
keys_to_remove = []
@@ -1071,7 +1094,9 @@ class DeviceNodeResourceTracker(object):
self.uuid_to_resources.clear()
self.resource2parent_resource.clear()
def figure_resource(self, query_resource, try_mode=False):
def figure_resource(
self, query_resource: Union[List[Union[dict, "PLRResource"]], dict, "PLRResource"], try_mode=False
) -> Union[List[Union[dict, "PLRResource", List[Union[dict, "PLRResource"]]]], dict, "PLRResource"]:
if isinstance(query_resource, list):
return [self.figure_resource(r, try_mode) for r in query_resource]
elif (

View File

@@ -1,22 +0,0 @@
import asyncio
import traceback
from asyncio import get_event_loop
from unilabos.utils.log import error
def run_async_func(func, *, loop=None, trace_error=True, **kwargs):
if loop is None:
loop = get_event_loop()
def _handle_future_exception(fut):
try:
fut.result()
except Exception as e:
error(f"异步任务 {func.__name__} 报错了")
error(traceback.format_exc())
future = asyncio.run_coroutine_threadsafe(func(**kwargs), loop)
if trace_error:
future.add_done_callback(_handle_future_exception)
return future