mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-07 23:45:10 +00:00
add registry name & add always free
This commit is contained in:
@@ -184,6 +184,51 @@ def get_all_subscriptions(instance) -> list:
|
||||
return subscriptions
|
||||
|
||||
|
||||
def always_free(func: F) -> F:
|
||||
"""
|
||||
标记动作为永久闲置(不受busy队列限制)的装饰器
|
||||
|
||||
被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制,
|
||||
任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。
|
||||
|
||||
Example:
|
||||
class MyDriver:
|
||||
@always_free
|
||||
def query_status(self, param: str):
|
||||
# 这个动作可以随时执行,不需要排队
|
||||
return self._status
|
||||
|
||||
def transfer(self, volume: float):
|
||||
# 这个动作会按正常排队逻辑执行
|
||||
pass
|
||||
|
||||
Note:
|
||||
- 可以与其他装饰器组合使用,@always_free 应放在最外层
|
||||
- 仅影响 WebSocket 调度层的 busy/free 判断,不影响 ROS2 层
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
wrapper._is_always_free = True # type: ignore[attr-defined]
|
||||
|
||||
return wrapper # type: ignore[return-value]
|
||||
|
||||
|
||||
def is_always_free(func) -> bool:
|
||||
"""
|
||||
检查函数是否被标记为永久闲置
|
||||
|
||||
Args:
|
||||
func: 被检查的函数
|
||||
|
||||
Returns:
|
||||
如果函数被 @always_free 装饰则返回 True,否则返回 False
|
||||
"""
|
||||
return getattr(func, "_is_always_free", False)
|
||||
|
||||
|
||||
def not_action(func: F) -> F:
|
||||
"""
|
||||
标记方法为非动作的装饰器
|
||||
|
||||
Reference in New Issue
Block a user