Workbench example, adjust log level, and ci check (#220)

* TestLatency Return Value Example & gitignore update

* Adjust log level & Add workbench virtual example & Add not action decorator & Add check_mode &

* Add CI Check
This commit is contained in:
Xuwznln
2026-01-26 02:15:13 +08:00
committed by GitHub
parent ec7ca6a1fe
commit 216f19fb62
18 changed files with 1329 additions and 5821 deletions

View File

@@ -0,0 +1,687 @@
"""
Virtual Workbench Device - 模拟工作台设备
包含:
- 1个机械臂 (每次操作3s, 独占锁)
- 3个加热台 (每次加热10s, 可并行)
工作流程:
1. A1-A5 物料同时启动,竞争机械臂
2. 机械臂将物料移动到空闲加热台
3. 加热完成后机械臂将物料移动到C1-C5
注意:调用来自线程池,使用 threading.Lock 进行同步
"""
import logging
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
from threading import Lock, RLock
from typing_extensions import TypedDict
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.utils.decorator import not_action
# ============ TypedDict 返回类型定义 ============
class MoveToHeatingStationResult(TypedDict):
"""move_to_heating_station 返回类型"""
success: bool
station_id: int
material_id: str
material_number: int
message: str
class StartHeatingResult(TypedDict):
"""start_heating 返回类型"""
success: bool
station_id: int
material_id: str
material_number: int
message: str
class MoveToOutputResult(TypedDict):
"""move_to_output 返回类型"""
success: bool
station_id: int
material_id: str
class PrepareMaterialsResult(TypedDict):
"""prepare_materials 返回类型 - 批量准备物料"""
success: bool
count: int
material_1: int # 物料编号1
material_2: int # 物料编号2
material_3: int # 物料编号3
material_4: int # 物料编号4
material_5: int # 物料编号5
message: str
# ============ 状态枚举 ============
class HeatingStationState(Enum):
"""加热台状态枚举"""
IDLE = "idle" # 空闲
OCCUPIED = "occupied" # 已放置物料,等待加热
HEATING = "heating" # 加热中
COMPLETED = "completed" # 加热完成,等待取走
class ArmState(Enum):
"""机械臂状态枚举"""
IDLE = "idle" # 空闲
BUSY = "busy" # 工作中
@dataclass
class HeatingStation:
"""加热台数据结构"""
station_id: int
state: HeatingStationState = HeatingStationState.IDLE
current_material: Optional[str] = None # 当前物料 (如 "A1", "A2")
material_number: Optional[int] = None # 物料编号 (1-5)
heating_start_time: Optional[float] = None
heating_progress: float = 0.0
class VirtualWorkbench:
"""
Virtual Workbench Device - 虚拟工作台设备
模拟一个包含1个机械臂和3个加热台的工作站
- 机械臂操作耗时3秒同一时间只能执行一个操作
- 加热台加热耗时10秒3个加热台可并行工作
工作流:
1. 物料A1-A5并发启动线程池竞争机械臂使用权
2. 获取机械臂后,查找空闲加热台
3. 机械臂将物料放入加热台,开始加热
4. 加热完成后机械臂将物料移动到目标位置Cn
"""
_ros_node: BaseROS2DeviceNode
# 配置常量
ARM_OPERATION_TIME: float = 3.0 # 机械臂操作时间(秒)
HEATING_TIME: float = 10.0 # 加热时间(秒)
NUM_HEATING_STATIONS: int = 3 # 加热台数量
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and "id" in kwargs:
device_id = kwargs.pop("id")
if config is None and "config" in kwargs:
config = kwargs.pop("config")
self.device_id = device_id or "virtual_workbench"
self.config = config or {}
self.logger = logging.getLogger(f"VirtualWorkbench.{self.device_id}")
self.data: Dict[str, Any] = {}
# 从config中获取可配置参数
self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", 3.0))
self.HEATING_TIME = float(self.config.get("heating_time", 10.0))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", 3))
# 机械臂状态和锁 (使用threading.Lock)
self._arm_lock = Lock()
self._arm_state = ArmState.IDLE
self._arm_current_task: Optional[str] = None
# 加热台状态 (station_id -> HeatingStation) - 立即初始化不依赖initialize()
self._heating_stations: Dict[int, HeatingStation] = {
i: HeatingStation(station_id=i)
for i in range(1, self.NUM_HEATING_STATIONS + 1)
}
self._stations_lock = RLock() # 可重入锁,保护加热台状态
# 任务追踪
self._active_tasks: Dict[str, Dict[str, Any]] = {} # material_id -> task_info
self._tasks_lock = Lock()
# 处理其他kwargs参数
skip_keys = {"arm_operation_time", "heating_time", "num_heating_stations"}
for key, value in kwargs.items():
if key not in skip_keys and not hasattr(self, key):
setattr(self, key, value)
self.logger.info(f"=== 虚拟工作台 {self.device_id} 已创建 ===")
self.logger.info(
f"机械臂操作时间: {self.ARM_OPERATION_TIME}s | "
f"加热时间: {self.HEATING_TIME}s | "
f"加热台数量: {self.NUM_HEATING_STATIONS}"
)
@not_action
def post_init(self, ros_node: BaseROS2DeviceNode):
"""ROS节点初始化后回调"""
self._ros_node = ros_node
@not_action
def initialize(self) -> bool:
"""初始化虚拟工作台"""
self.logger.info(f"初始化虚拟工作台 {self.device_id}")
# 重置加热台状态 (已在__init__中创建这里重置为初始状态)
with self._stations_lock:
for station in self._heating_stations.values():
station.state = HeatingStationState.IDLE
station.current_material = None
station.material_number = None
station.heating_progress = 0.0
# 初始化状态
self.data.update({
"status": "Ready",
"arm_state": ArmState.IDLE.value,
"arm_current_task": None,
"heating_stations": self._get_stations_status(),
"active_tasks_count": 0,
"message": "工作台就绪",
})
self.logger.info(f"工作台初始化完成: {self.NUM_HEATING_STATIONS}个加热台就绪")
return True
@not_action
def cleanup(self) -> bool:
"""清理虚拟工作台"""
self.logger.info(f"清理虚拟工作台 {self.device_id}")
self._arm_state = ArmState.IDLE
self._arm_current_task = None
with self._stations_lock:
self._heating_stations.clear()
with self._tasks_lock:
self._active_tasks.clear()
self.data.update({
"status": "Offline",
"arm_state": ArmState.IDLE.value,
"heating_stations": {},
"message": "工作台已关闭",
})
return True
def _get_stations_status(self) -> Dict[int, Dict[str, Any]]:
"""获取所有加热台状态"""
with self._stations_lock:
return {
station_id: {
"state": station.state.value,
"current_material": station.current_material,
"material_number": station.material_number,
"heating_progress": station.heating_progress,
}
for station_id, station in self._heating_stations.items()
}
def _update_data_status(self, message: Optional[str] = None):
"""更新状态数据"""
self.data.update({
"arm_state": self._arm_state.value,
"arm_current_task": self._arm_current_task,
"heating_stations": self._get_stations_status(),
"active_tasks_count": len(self._active_tasks),
})
if message:
self.data["message"] = message
def _find_available_heating_station(self) -> Optional[int]:
"""查找空闲的加热台
Returns:
空闲加热台ID如果没有则返回None
"""
with self._stations_lock:
for station_id, station in self._heating_stations.items():
if station.state == HeatingStationState.IDLE:
return station_id
return None
def _acquire_arm(self, task_description: str) -> bool:
"""获取机械臂使用权(阻塞直到获取)
Args:
task_description: 任务描述,用于日志
Returns:
是否成功获取
"""
self.logger.info(f"[{task_description}] 等待获取机械臂...")
# 阻塞等待获取锁
self._arm_lock.acquire()
self._arm_state = ArmState.BUSY
self._arm_current_task = task_description
self._update_data_status(f"机械臂执行: {task_description}")
self.logger.info(f"[{task_description}] 成功获取机械臂使用权")
return True
def _release_arm(self):
"""释放机械臂"""
task = self._arm_current_task
self._arm_state = ArmState.IDLE
self._arm_current_task = None
self._arm_lock.release()
self._update_data_status(f"机械臂已释放 (完成: {task})")
self.logger.info(f"机械臂已释放 (完成: {task})")
def prepare_materials(
self,
count: int = 5,
) -> PrepareMaterialsResult:
"""
批量准备物料 - 虚拟起始节点
作为工作流的起始节点,生成指定数量的物料编号供后续节点使用。
输出5个handle (material_1 ~ material_5)分别对应实验1~5。
Args:
count: 待生成的物料数量默认5 (生成 A1-A5)
Returns:
PrepareMaterialsResult: 包含 material_1 ~ material_5 用于传递给 move_to_heating_station
"""
# 生成物料列表 A1 - A{count}
materials = [i for i in range(1, count + 1)]
self.logger.info(
f"[准备物料] 生成 {count} 个物料: "
f"A1-A{count} -> material_1~material_{count}"
)
return {
"success": True,
"count": count,
"material_1": materials[0] if len(materials) > 0 else 0,
"material_2": materials[1] if len(materials) > 1 else 0,
"material_3": materials[2] if len(materials) > 2 else 0,
"material_4": materials[3] if len(materials) > 3 else 0,
"material_5": materials[4] if len(materials) > 4 else 0,
"message": f"已准备 {count} 个物料: A1-A{count}",
}
def move_to_heating_station(
self,
material_number: int,
) -> MoveToHeatingStationResult:
"""
将物料从An位置移动到加热台
多线程并发调用时,会竞争机械臂使用权,并自动查找空闲加热台
Args:
material_number: 物料编号 (1-5)
Returns:
MoveToHeatingStationResult: 包含 station_id, material_number 等用于传递给下一个节点
"""
# 根据物料编号生成物料ID
material_id = f"A{material_number}"
task_desc = f"移动{material_id}到加热台"
self.logger.info(f"[任务] {task_desc} - 开始执行")
# 记录任务
with self._tasks_lock:
self._active_tasks[material_id] = {
"status": "waiting_for_arm",
"start_time": time.time(),
}
try:
# 步骤1: 等待获取机械臂使用权(竞争)
with self._tasks_lock:
self._active_tasks[material_id]["status"] = "waiting_for_arm"
self._acquire_arm(task_desc)
# 步骤2: 查找空闲加热台
with self._tasks_lock:
self._active_tasks[material_id]["status"] = "finding_station"
station_id = None
# 循环等待直到找到空闲加热台
while station_id is None:
station_id = self._find_available_heating_station()
if station_id is None:
self.logger.info(f"[{material_id}] 没有空闲加热台,等待中...")
# 释放机械臂,等待后重试
self._release_arm()
time.sleep(0.5)
self._acquire_arm(task_desc)
# 步骤3: 占用加热台 - 立即标记为OCCUPIED防止其他任务选择同一加热台
with self._stations_lock:
self._heating_stations[station_id].state = HeatingStationState.OCCUPIED
self._heating_stations[station_id].current_material = material_id
self._heating_stations[station_id].material_number = material_number
# 步骤4: 模拟机械臂移动操作 (3秒)
with self._tasks_lock:
self._active_tasks[material_id]["status"] = "arm_moving"
self._active_tasks[material_id]["assigned_station"] = station_id
self.logger.info(f"[{material_id}] 机械臂正在移动到加热台{station_id}...")
time.sleep(self.ARM_OPERATION_TIME)
# 步骤5: 放入加热台完成
self._update_data_status(f"{material_id}已放入加热台{station_id}")
self.logger.info(f"[{material_id}] 已放入加热台{station_id} (用时{self.ARM_OPERATION_TIME}s)")
# 释放机械臂
self._release_arm()
with self._tasks_lock:
self._active_tasks[material_id]["status"] = "placed_on_station"
return {
"success": True,
"station_id": station_id,
"material_id": material_id,
"material_number": material_number,
"message": f"{material_id}已成功移动到加热台{station_id}",
}
except Exception as e:
self.logger.error(f"[{material_id}] 移动失败: {str(e)}")
if self._arm_lock.locked():
self._release_arm()
return {
"success": False,
"station_id": -1,
"material_id": material_id,
"material_number": material_number,
"message": f"移动失败: {str(e)}",
}
def start_heating(
self,
station_id: int,
material_number: int,
) -> StartHeatingResult:
"""
启动指定加热台的加热程序
Args:
station_id: 加热台ID (1-3),从 move_to_heating_station 的 handle 传入
material_number: 物料编号,从 move_to_heating_station 的 handle 传入
Returns:
StartHeatingResult: 包含 station_id, material_number 等用于传递给下一个节点
"""
self.logger.info(f"[加热台{station_id}] 开始加热")
if station_id not in self._heating_stations:
return {
"success": False,
"station_id": station_id,
"material_id": "",
"material_number": material_number,
"message": f"无效的加热台ID: {station_id}",
}
with self._stations_lock:
station = self._heating_stations[station_id]
if station.current_material is None:
return {
"success": False,
"station_id": station_id,
"material_id": "",
"material_number": material_number,
"message": f"加热台{station_id}上没有物料",
}
if station.state == HeatingStationState.HEATING:
return {
"success": False,
"station_id": station_id,
"material_id": station.current_material,
"material_number": material_number,
"message": f"加热台{station_id}已经在加热中",
}
material_id = station.current_material
# 开始加热
station.state = HeatingStationState.HEATING
station.heating_start_time = time.time()
station.heating_progress = 0.0
with self._tasks_lock:
if material_id in self._active_tasks:
self._active_tasks[material_id]["status"] = "heating"
self._update_data_status(f"加热台{station_id}开始加热{material_id}")
# 模拟加热过程 (10秒)
start_time = time.time()
while True:
elapsed = time.time() - start_time
progress = min(100.0, (elapsed / self.HEATING_TIME) * 100)
with self._stations_lock:
self._heating_stations[station_id].heating_progress = progress
self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%")
if elapsed >= self.HEATING_TIME:
break
time.sleep(1.0)
# 加热完成
with self._stations_lock:
self._heating_stations[station_id].state = HeatingStationState.COMPLETED
self._heating_stations[station_id].heating_progress = 100.0
with self._tasks_lock:
if material_id in self._active_tasks:
self._active_tasks[material_id]["status"] = "heating_completed"
self._update_data_status(f"加热台{station_id}加热完成")
self.logger.info(f"[加热台{station_id}] {material_id}加热完成 (用时{self.HEATING_TIME}s)")
return {
"success": True,
"station_id": station_id,
"material_id": material_id,
"material_number": material_number,
"message": f"加热台{station_id}加热完成",
}
def move_to_output(
self,
station_id: int,
material_number: int,
) -> MoveToOutputResult:
"""
将物料从加热台移动到输出位置Cn
Args:
station_id: 加热台ID (1-3),从 start_heating 的 handle 传入
material_number: 物料编号,从 start_heating 的 handle 传入,用于确定输出位置 Cn
Returns:
MoveToOutputResult: 包含执行结果
"""
output_number = material_number # 物料编号决定输出位置
if station_id not in self._heating_stations:
return {
"success": False,
"station_id": station_id,
"material_id": "",
"output_position": f"C{output_number}",
"message": f"无效的加热台ID: {station_id}",
}
with self._stations_lock:
station = self._heating_stations[station_id]
material_id = station.current_material
if material_id is None:
return {
"success": False,
"station_id": station_id,
"material_id": "",
"output_position": f"C{output_number}",
"message": f"加热台{station_id}上没有物料",
}
if station.state != HeatingStationState.COMPLETED:
return {
"success": False,
"station_id": station_id,
"material_id": material_id,
"output_position": f"C{output_number}",
"message": f"加热台{station_id}尚未完成加热 (当前状态: {station.state.value})",
}
output_position = f"C{output_number}"
task_desc = f"从加热台{station_id}移动{material_id}{output_position}"
self.logger.info(f"[任务] {task_desc}")
try:
with self._tasks_lock:
if material_id in self._active_tasks:
self._active_tasks[material_id]["status"] = "waiting_for_arm_output"
# 获取机械臂
self._acquire_arm(task_desc)
with self._tasks_lock:
if material_id in self._active_tasks:
self._active_tasks[material_id]["status"] = "arm_moving_to_output"
# 模拟机械臂操作 (3秒)
self.logger.info(f"[{material_id}] 机械臂正在从加热台{station_id}取出并移动到{output_position}...")
time.sleep(self.ARM_OPERATION_TIME)
# 清空加热台
with self._stations_lock:
self._heating_stations[station_id].state = HeatingStationState.IDLE
self._heating_stations[station_id].current_material = None
self._heating_stations[station_id].material_number = None
self._heating_stations[station_id].heating_progress = 0.0
self._heating_stations[station_id].heating_start_time = None
# 释放机械臂
self._release_arm()
# 任务完成
with self._tasks_lock:
if material_id in self._active_tasks:
self._active_tasks[material_id]["status"] = "completed"
self._active_tasks[material_id]["end_time"] = time.time()
self._update_data_status(f"{material_id}已移动到{output_position}")
self.logger.info(f"[{material_id}] 已成功移动到{output_position} (用时{self.ARM_OPERATION_TIME}s)")
return {
"success": True,
"station_id": station_id,
"material_id": material_id,
"output_position": output_position,
"message": f"{material_id}已成功移动到{output_position}",
}
except Exception as e:
self.logger.error(f"移动到输出位置失败: {str(e)}")
if self._arm_lock.locked():
self._release_arm()
return {
"success": False,
"station_id": station_id,
"material_id": "",
"output_position": output_position,
"message": f"移动失败: {str(e)}",
}
# ============ 状态属性 ============
@property
def status(self) -> str:
return self.data.get("status", "Unknown")
@property
def arm_state(self) -> str:
return self._arm_state.value
@property
def arm_current_task(self) -> str:
return self._arm_current_task or ""
@property
def heating_station_1_state(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(1)
return station.state.value if station else "unknown"
@property
def heating_station_1_material(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(1)
return station.current_material or "" if station else ""
@property
def heating_station_1_progress(self) -> float:
with self._stations_lock:
station = self._heating_stations.get(1)
return station.heating_progress if station else 0.0
@property
def heating_station_2_state(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(2)
return station.state.value if station else "unknown"
@property
def heating_station_2_material(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(2)
return station.current_material or "" if station else ""
@property
def heating_station_2_progress(self) -> float:
with self._stations_lock:
station = self._heating_stations.get(2)
return station.heating_progress if station else 0.0
@property
def heating_station_3_state(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(3)
return station.state.value if station else "unknown"
@property
def heating_station_3_material(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(3)
return station.current_material or "" if station else ""
@property
def heating_station_3_progress(self) -> float:
with self._stations_lock:
station = self._heating_stations.get(3)
return station.heating_progress if station else 0.0
@property
def active_tasks_count(self) -> int:
with self._tasks_lock:
return len(self._active_tasks)
@property
def message(self) -> str:
return self.data.get("message", "")