From 9a7d5c7c82ae9bfbd9f600a791ce6a8c3f7e8979 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sat, 7 Feb 2026 02:11:43 +0800 Subject: [PATCH] add registry name & add always free --- unilabos/app/ws_client.py | 81 ++++++++++++++++--- .../ros_dev/liquid_handler_joint_publisher.py | 3 +- unilabos/devices/virtual/workbench.py | 31 +++++-- unilabos/registry/devices/virtual_device.yaml | 1 + unilabos/registry/registry.py | 1 + unilabos/ros/device_node_wrapper.py | 3 +- unilabos/ros/nodes/base_device_node.py | 17 ++-- unilabos/ros/nodes/presets/camera.py | 3 +- unilabos/ros/nodes/presets/controller_node.py | 2 + unilabos/ros/nodes/presets/host_node.py | 46 ++++++++++- .../ros/nodes/presets/joint_republisher.py | 3 +- .../nodes/presets/resource_mesh_manager.py | 3 +- unilabos/ros/nodes/presets/serial_node.py | 3 +- unilabos/ros/nodes/presets/workstation.py | 2 + unilabos/utils/decorator.py | 45 +++++++++++ unilabos/utils/import_manager.py | 15 +++- 16 files changed, 228 insertions(+), 31 deletions(-) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 8dd3ec0..111a6e7 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -76,6 +76,7 @@ class JobInfo: start_time: float last_update_time: float = field(default_factory=time.time) ready_timeout: Optional[float] = None # READY状态的超时时间 + always_free: bool = False # 是否为永久闲置动作(不受排队限制) def update_timestamp(self): """更新最后更新时间""" @@ -127,6 +128,15 @@ class DeviceActionManager: # 总是将job添加到all_jobs中 self.all_jobs[job_info.job_id] = job_info + # always_free的动作不受排队限制,直接设为READY + if job_info.always_free: + job_info.status = JobStatus.READY + job_info.update_timestamp() + job_info.set_ready_timeout(10) + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.trace(f"[DeviceActionManager] Job {job_log} always_free, start immediately") + return True + # 检查是否有正在执行或准备执行的任务 if device_key in self.active_jobs: # 有正在执行或准备执行的任务,加入队列 @@ -176,11 +186,15 @@ class DeviceActionManager: logger.error(f"[DeviceActionManager] Job {job_log} is not in READY status, current: {job_info.status}") return False - # 检查设备上是否是这个job - if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id: - job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) - logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}") - return False + # always_free的job不需要检查active_jobs + if not job_info.always_free: + # 检查设备上是否是这个job + if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id: + job_log = format_job_log( + job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name + ) + logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}") + return False # 开始执行任务,将状态从READY转换为STARTED job_info.status = JobStatus.STARTED @@ -203,6 +217,13 @@ class DeviceActionManager: job_info = self.all_jobs[job_id] device_key = job_info.device_action_key + # always_free的job直接清理,不影响队列 + if job_info.always_free: + job_info.status = JobStatus.ENDED + job_info.update_timestamp() + del self.all_jobs[job_id] + return None + # 移除活跃任务 if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id: del self.active_jobs[device_key] @@ -234,9 +255,14 @@ class DeviceActionManager: return None def get_active_jobs(self) -> List[JobInfo]: - """获取所有正在执行的任务""" + """获取所有正在执行的任务(含active_jobs和always_free的STARTED job)""" with self.lock: - return list(self.active_jobs.values()) + jobs = list(self.active_jobs.values()) + # 补充 always_free 的 STARTED job(它们不在 active_jobs 中) + for job in self.all_jobs.values(): + if job.always_free and job.status == JobStatus.STARTED and job not in jobs: + jobs.append(job) + return jobs def get_queued_jobs(self) -> List[JobInfo]: """获取所有排队中的任务""" @@ -261,6 +287,14 @@ class DeviceActionManager: job_info = self.all_jobs[job_id] device_key = job_info.device_action_key + # always_free的job直接清理 + if job_info.always_free: + job_info.status = JobStatus.ENDED + del self.all_jobs[job_id] + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.trace(f"[DeviceActionManager] Always-free job {job_log} cancelled") + return True + # 如果是正在执行的任务 if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id: # 清理active job状态 @@ -334,13 +368,18 @@ class DeviceActionManager: timeout_jobs = [] with self.lock: - # 统计READY状态的任务数量 - ready_jobs_count = sum(1 for job in self.active_jobs.values() if job.status == JobStatus.READY) + # 收集所有需要检查的 READY 任务(active_jobs + always_free READY jobs) + ready_candidates = list(self.active_jobs.values()) + for job in self.all_jobs.values(): + if job.always_free and job.status == JobStatus.READY and job not in ready_candidates: + ready_candidates.append(job) + + ready_jobs_count = sum(1 for job in ready_candidates if job.status == JobStatus.READY) if ready_jobs_count > 0: logger.trace(f"[DeviceActionManager] Checking {ready_jobs_count} READY jobs for timeout") # type: ignore # noqa: E501 # 找到所有超时的READY任务(只检测,不处理) - for job_info in self.active_jobs.values(): + for job_info in ready_candidates: if job_info.is_ready_timeout(): timeout_jobs.append(job_info) job_log = format_job_log( @@ -608,6 +647,24 @@ class MessageProcessor: if host_node: host_node.handle_pong_response(pong_data) + def _check_action_always_free(self, device_id: str, action_name: str) -> bool: + """检查该action是否标记为always_free,通过HostNode统一的_action_value_mappings查找""" + try: + host_node = HostNode.get_instance(0) + if not host_node: + return False + # noinspection PyProtectedMember + action_mappings = host_node._action_value_mappings.get(device_id) + if not action_mappings: + return False + # 尝试直接匹配或 auto- 前缀匹配 + for key in [action_name, f"auto-{action_name}"]: + if key in action_mappings: + return action_mappings[key].get("always_free", False) + return False + except Exception: + return False + async def _handle_query_action_state(self, data: Dict[str, Any]): """处理query_action_state消息""" device_id = data.get("device_id", "") @@ -622,6 +679,9 @@ class MessageProcessor: device_action_key = f"/devices/{device_id}/{action_name}" + # 检查action是否为always_free + action_always_free = self._check_action_always_free(device_id, action_name) + # 创建任务信息 job_info = JobInfo( job_id=job_id, @@ -631,6 +691,7 @@ class MessageProcessor: device_action_key=device_action_key, status=JobStatus.QUEUE, start_time=time.time(), + always_free=action_always_free, ) # 添加到设备管理器 diff --git a/unilabos/devices/ros_dev/liquid_handler_joint_publisher.py b/unilabos/devices/ros_dev/liquid_handler_joint_publisher.py index 2ec7afe..16ff5b6 100644 --- a/unilabos/devices/ros_dev/liquid_handler_joint_publisher.py +++ b/unilabos/devices/ros_dev/liquid_handler_joint_publisher.py @@ -19,10 +19,11 @@ from rclpy.node import Node import re class LiquidHandlerJointPublisher(BaseROS2DeviceNode): - def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", **kwargs): + def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", registry_name: str = "lh_joint_publisher", **kwargs): super().__init__( driver_instance=self, device_id=device_id, + registry_name=registry_name, status_types={}, action_value_mappings={}, hardware_interface={}, diff --git a/unilabos/devices/virtual/workbench.py b/unilabos/devices/virtual/workbench.py index d20885f..f5fae47 100644 --- a/unilabos/devices/virtual/workbench.py +++ b/unilabos/devices/virtual/workbench.py @@ -22,7 +22,7 @@ from threading import Lock, RLock from typing_extensions import TypedDict from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode -from unilabos.utils.decorator import not_action +from unilabos.utils.decorator import not_action, always_free from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, RETURN_UNILABOS_SAMPLES @@ -123,8 +123,8 @@ class VirtualWorkbench: _ros_node: BaseROS2DeviceNode # 配置常量 - ARM_OPERATION_TIME: float = 3.0 # 机械臂操作时间(秒) - HEATING_TIME: float = 10.0 # 加热时间(秒) + ARM_OPERATION_TIME: float = 2 # 机械臂操作时间(秒) + HEATING_TIME: float = 60.0 # 加热时间(秒) NUM_HEATING_STATIONS: int = 3 # 加热台数量 def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs): @@ -141,9 +141,9 @@ class VirtualWorkbench: self.data: Dict[str, Any] = {} # 从config中获取可配置参数 - self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", 3.0)) - self.HEATING_TIME = float(self.config.get("heating_time", 10.0)) - self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", 3)) + self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", self.ARM_OPERATION_TIME)) + self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME)) + self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS)) # 机械臂状态和锁 (使用threading.Lock) self._arm_lock = Lock() @@ -431,6 +431,7 @@ class VirtualWorkbench: sample_uuid, content in sample_uuids.items()] } + @always_free def start_heating( self, sample_uuids: SampleUUIDsType, @@ -501,10 +502,21 @@ class VirtualWorkbench: self._update_data_status(f"加热台{station_id}开始加热{material_id}") - # 模拟加热过程 (10秒) + # 打印当前所有正在加热的台位 + with self._stations_lock: + heating_list = [ + f"加热台{sid}:{s.current_material}" + for sid, s in self._heating_stations.items() + if s.state == HeatingStationState.HEATING and s.current_material + ] + self.logger.info(f"[并行加热] 当前同时加热中: {', '.join(heating_list)}") + + # 模拟加热过程 start_time = time.time() + last_countdown_log = start_time while True: elapsed = time.time() - start_time + remaining = max(0.0, self.HEATING_TIME - elapsed) progress = min(100.0, (elapsed / self.HEATING_TIME) * 100) with self._stations_lock: @@ -512,6 +524,11 @@ class VirtualWorkbench: self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%") + # 每5秒打印一次倒计时 + if time.time() - last_countdown_log >= 5.0: + self.logger.info(f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s") + last_countdown_log = time.time() + if elapsed >= self.HEATING_TIME: break diff --git a/unilabos/registry/devices/virtual_device.yaml b/unilabos/registry/devices/virtual_device.yaml index e44b745..f063575 100644 --- a/unilabos/registry/devices/virtual_device.yaml +++ b/unilabos/registry/devices/virtual_device.yaml @@ -6090,6 +6090,7 @@ virtual_workbench: type: object type: UniLabJsonCommand auto-start_heating: + always_free: true feedback: {} goal: {} goal_default: diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index ef111e6..df4758d 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -838,6 +838,7 @@ class Registry: ("list", "unilabos.registry.placeholder_type:DeviceSlot"), ] }, + **({"always_free": True} if v.get("always_free") else {}), } for k, v in enhanced_info["action_methods"].items() if k not in device_config["class"]["action_value_mappings"] diff --git a/unilabos/ros/device_node_wrapper.py b/unilabos/ros/device_node_wrapper.py index db9caa4..889441a 100644 --- a/unilabos/ros/device_node_wrapper.py +++ b/unilabos/ros/device_node_wrapper.py @@ -44,8 +44,7 @@ def ros2_device_node( # 从属性中自动发现可发布状态 if status_types is None: status_types = {} - if device_config is None: - raise ValueError("device_config cannot be None") + assert device_config is not None, "device_config cannot be None" if action_value_mappings is None: action_value_mappings = {} if hardware_interface is None: diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 89df756..2c9db0c 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -146,7 +146,7 @@ def init_wrapper( device_id: str, device_uuid: str, driver_class: type[T], - device_config: ResourceTreeInstance, + device_config: ResourceDictInstance, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], hardware_interface: Dict[str, Any], @@ -279,6 +279,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): self, driver_instance: T, device_id: str, + registry_name: str, device_uuid: str, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], @@ -300,6 +301,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): """ self.driver_instance = driver_instance self.device_id = device_id + self.registry_name = registry_name self.uuid = device_uuid self.publish_high_frequency = False self.callback_group = ReentrantCallbackGroup() @@ -416,7 +418,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer): # noinspection PyTypeChecker container_instance: RegularContainer = rts_plr_instances[0] - found_resources = self.resource_tracker.figure_resource({"name": container_instance.name}, try_mode=True) + found_resources = self.resource_tracker.figure_resource( + {"name": container_instance.name}, try_mode=True + ) if not len(found_resources): self.resource_tracker.add_resource(container_instance) logger.info(f"添加物料{container_instance.name}到资源跟踪器") @@ -1152,6 +1156,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): "machine_name": BasicConfig.machine_name, "type": "slave", "edge_device_id": self.device_id, + "registry_name": self.registry_name, } }, ensure_ascii=False, @@ -1626,9 +1631,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): else: resolved_sample_uuids[sample_uuid] = material_uuid function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids - self.lab_logger().debug( - f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}" - ) + self.lab_logger().debug(f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}") continue # 处理单个 ResourceSlot @@ -2005,6 +2008,7 @@ class ROS2DeviceNode: if driver_is_ros: driver_params["device_id"] = device_id + driver_params["registry_name"] = device_config.res_content.klass driver_params["resource_tracker"] = self.resource_tracker self._driver_instance = self._driver_creator.create_instance(driver_params) if self._driver_instance is None: @@ -2022,6 +2026,7 @@ class ROS2DeviceNode: children=children, driver_instance=self._driver_instance, # type: ignore device_id=device_id, + registry_name=device_config.res_content.klass, device_uuid=device_uuid, status_types=status_types, action_value_mappings=action_value_mappings, @@ -2033,6 +2038,7 @@ class ROS2DeviceNode: self._ros_node = BaseROS2DeviceNode( driver_instance=self._driver_instance, device_id=device_id, + registry_name=device_config.res_content.klass, device_uuid=device_uuid, status_types=status_types, action_value_mappings=action_value_mappings, @@ -2041,6 +2047,7 @@ class ROS2DeviceNode: resource_tracker=self.resource_tracker, ) self._ros_node: BaseROS2DeviceNode + # 将注册表类型名传递给BaseROS2DeviceNode,用于slave上报 self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}") self.driver_instance._ros_node = self._ros_node # type: ignore self.driver_instance._execute_driver_command = self._ros_node._execute_driver_command # type: ignore diff --git a/unilabos/ros/nodes/presets/camera.py b/unilabos/ros/nodes/presets/camera.py index 25ae921..2267f67 100644 --- a/unilabos/ros/nodes/presets/camera.py +++ b/unilabos/ros/nodes/presets/camera.py @@ -6,12 +6,13 @@ from cv_bridge import CvBridge from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker class VideoPublisher(BaseROS2DeviceNode): - def __init__(self, device_id='video_publisher', device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None): + def __init__(self, device_id='video_publisher', registry_name="", device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None): # 初始化BaseROS2DeviceNode,使用自身作为driver_instance BaseROS2DeviceNode.__init__( self, driver_instance=self, device_id=device_id, + registry_name=registry_name, device_uuid=device_uuid, status_types={}, action_value_mappings={}, diff --git a/unilabos/ros/nodes/presets/controller_node.py b/unilabos/ros/nodes/presets/controller_node.py index 8451073..78d0757 100644 --- a/unilabos/ros/nodes/presets/controller_node.py +++ b/unilabos/ros/nodes/presets/controller_node.py @@ -10,6 +10,7 @@ class ControllerNode(BaseROS2DeviceNode): def __init__( self, device_id: str, + registry_name: str, controller_func: Callable, update_rate: float, inputs: Dict[str, Dict[str, type | str]], @@ -51,6 +52,7 @@ class ControllerNode(BaseROS2DeviceNode): self, driver_instance=self, device_id=device_id, + registry_name=registry_name, status_types=status_types, action_value_mappings=action_value_mappings, hardware_interface=hardware_interface, diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index dde756a..f05bf0c 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -248,6 +248,7 @@ class HostNode(BaseROS2DeviceNode): self, driver_instance=self, device_id=device_id, + registry_name="host_node", device_uuid=host_node_dict["uuid"], status_types={}, action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"], @@ -302,7 +303,8 @@ class HostNode(BaseROS2DeviceNode): } # 用来存储多个ActionClient实例 self._action_value_mappings: Dict[str, Dict] = ( {} - ) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系 + ) # device_id -> action_value_mappings(本地+远程设备统一存储) + self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings) self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态 self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备 self._last_discovery_time = 0.0 # 上次设备发现的时间 @@ -636,6 +638,8 @@ class HostNode(BaseROS2DeviceNode): self.device_machine_names[device_id] = "本地" self.devices_instances[device_id] = d # noinspection PyProtectedMember + self._action_value_mappings[device_id] = d._ros_node._action_value_mappings + # noinspection PyProtectedMember for action_name, action_value_mapping in d._ros_node._action_value_mappings.items(): if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith( "UniLabJsonCommand" @@ -1168,6 +1172,10 @@ class HostNode(BaseROS2DeviceNode): def _node_info_update_callback(self, request, response): """ 更新节点信息回调 + + 处理两种消息: + 1. 首次上报(main_slave_run): 带 devices_config + registry_config,存储 action_value_mappings + 2. 设备重注册(SYNC_SLAVE_NODE_INFO): 带 edge_device_id + registry_name,用 registry_name 索引已存储的 mappings """ self.lab_logger().trace(f"[Host Node] Node info update request received: {request}") try: @@ -1179,12 +1187,48 @@ class HostNode(BaseROS2DeviceNode): info = info["SYNC_SLAVE_NODE_INFO"] machine_name = info["machine_name"] edge_device_id = info["edge_device_id"] + registry_name = info.get("registry_name", "") self.device_machine_names[edge_device_id] = machine_name + + # 用 registry_name 索引已存储的 registry_config,获取 action_value_mappings + if registry_name and registry_name in self._slave_registry_configs: + action_mappings = self._slave_registry_configs[registry_name].get( + "class", {} + ).get("action_value_mappings", {}) + if action_mappings: + self._action_value_mappings[edge_device_id] = action_mappings + self.lab_logger().info( + f"[Host Node] Loaded {len(action_mappings)} action mappings " + f"for remote device {edge_device_id} (registry: {registry_name})" + ) else: devices_config = info.pop("devices_config") registry_config = info.pop("registry_config") if registry_config: http_client.resource_registry({"resources": registry_config}) + + # 存储 slave 的 registry_config,用于后续 SYNC_SLAVE_NODE_INFO 索引 + for reg_name, reg_data in registry_config.items(): + if isinstance(reg_data, dict) and "class" in reg_data: + self._slave_registry_configs[reg_name] = reg_data + + # 解析 devices_config,建立 device_id -> action_value_mappings 映射 + if devices_config: + for device_tree in devices_config: + for device_dict in device_tree: + device_id = device_dict.get("id", "") + class_name = device_dict.get("class", "") + if device_id and class_name and class_name in self._slave_registry_configs: + action_mappings = self._slave_registry_configs[class_name].get( + "class", {} + ).get("action_value_mappings", {}) + if action_mappings: + self._action_value_mappings[device_id] = action_mappings + self.lab_logger().info( + f"[Host Node] Stored {len(action_mappings)} action mappings " + f"for remote device {device_id} (class: {class_name})" + ) + self.lab_logger().debug(f"[Host Node] Node info update: {info}") response.response = "OK" except Exception as e: diff --git a/unilabos/ros/nodes/presets/joint_republisher.py b/unilabos/ros/nodes/presets/joint_republisher.py index 6521830..b829037 100644 --- a/unilabos/ros/nodes/presets/joint_republisher.py +++ b/unilabos/ros/nodes/presets/joint_republisher.py @@ -7,10 +7,11 @@ from rclpy.callback_groups import ReentrantCallbackGroup from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode class JointRepublisher(BaseROS2DeviceNode): - def __init__(self,device_id,resource_tracker, **kwargs): + def __init__(self,device_id, registry_name, resource_tracker, **kwargs): super().__init__( driver_instance=self, device_id=device_id, + registry_name=registry_name, status_types={}, action_value_mappings={}, hardware_interface={}, diff --git a/unilabos/ros/nodes/presets/resource_mesh_manager.py b/unilabos/ros/nodes/presets/resource_mesh_manager.py index 1ff504c..45e330d 100644 --- a/unilabos/ros/nodes/presets/resource_mesh_manager.py +++ b/unilabos/ros/nodes/presets/resource_mesh_manager.py @@ -26,7 +26,7 @@ from unilabos.resources.graphio import initialize_resources from unilabos.registry.registry import lab_registry class ResourceMeshManager(BaseROS2DeviceNode): - def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", rate=50, **kwargs): + def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", registry_name: str = "", rate=50, **kwargs): """初始化资源网格管理器节点 Args: @@ -37,6 +37,7 @@ class ResourceMeshManager(BaseROS2DeviceNode): super().__init__( driver_instance=self, device_id=device_id, + registry_name=registry_name, status_types={}, action_value_mappings={}, hardware_interface={}, diff --git a/unilabos/ros/nodes/presets/serial_node.py b/unilabos/ros/nodes/presets/serial_node.py index 545682b..11a04bd 100644 --- a/unilabos/ros/nodes/presets/serial_node.py +++ b/unilabos/ros/nodes/presets/serial_node.py @@ -7,7 +7,7 @@ from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeRe class ROS2SerialNode(BaseROS2DeviceNode): - def __init__(self, device_id, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None): + def __init__(self, device_id, registry_name, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None): # 保存属性,以便在调用父类初始化前使用 self.port = port self.baudrate = baudrate @@ -28,6 +28,7 @@ class ROS2SerialNode(BaseROS2DeviceNode): BaseROS2DeviceNode.__init__( self, driver_instance=self, + registry_name=registry_name, device_id=device_id, status_types={}, action_value_mappings={}, diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index 0d62579..902e296 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -47,6 +47,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): *, driver_instance: "WorkstationBase", device_id: str, + registry_name: str, device_uuid: str, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], @@ -62,6 +63,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): super().__init__( driver_instance=driver_instance, device_id=device_id, + registry_name=registry_name, device_uuid=device_uuid, status_types=status_types, action_value_mappings={**action_value_mappings, **self.protocol_action_mappings}, diff --git a/unilabos/utils/decorator.py b/unilabos/utils/decorator.py index 57e968a..22a9073 100644 --- a/unilabos/utils/decorator.py +++ b/unilabos/utils/decorator.py @@ -184,6 +184,51 @@ def get_all_subscriptions(instance) -> list: return subscriptions +def always_free(func: F) -> F: + """ + 标记动作为永久闲置(不受busy队列限制)的装饰器 + + 被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制, + 任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。 + + Example: + class MyDriver: + @always_free + def query_status(self, param: str): + # 这个动作可以随时执行,不需要排队 + return self._status + + def transfer(self, volume: float): + # 这个动作会按正常排队逻辑执行 + pass + + Note: + - 可以与其他装饰器组合使用,@always_free 应放在最外层 + - 仅影响 WebSocket 调度层的 busy/free 判断,不影响 ROS2 层 + """ + + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + wrapper._is_always_free = True # type: ignore[attr-defined] + + return wrapper # type: ignore[return-value] + + +def is_always_free(func) -> bool: + """ + 检查函数是否被标记为永久闲置 + + Args: + func: 被检查的函数 + + Returns: + 如果函数被 @always_free 装饰则返回 True,否则返回 False + """ + return getattr(func, "_is_always_free", False) + + def not_action(func: F) -> F: """ 标记方法为非动作的装饰器 diff --git a/unilabos/utils/import_manager.py b/unilabos/utils/import_manager.py index 18a3920..dabbe1a 100644 --- a/unilabos/utils/import_manager.py +++ b/unilabos/utils/import_manager.py @@ -29,7 +29,7 @@ from ast import Constant from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS from unilabos.utils import logger -from unilabos.utils.decorator import is_not_action +from unilabos.utils.decorator import is_not_action, is_always_free class ImportManager: @@ -282,6 +282,9 @@ class ImportManager: continue # 其他非_开头的方法归类为action method_info = self._analyze_method_signature(method) + # 检查是否被 @always_free 装饰器标记 + if is_always_free(method): + method_info["always_free"] = True result["action_methods"][name] = method_info return result @@ -339,6 +342,9 @@ class ImportManager: if self._is_not_action_method(node): continue # 其他非_开头的方法归类为action + # 检查是否被 @always_free 装饰器标记 + if self._is_always_free_method(node): + method_info["always_free"] = True result["action_methods"][method_name] = method_info return result @@ -474,6 +480,13 @@ class ImportManager: return True return False + def _is_always_free_method(self, node: ast.FunctionDef) -> bool: + """检查是否是@always_free装饰的方法""" + for decorator in node.decorator_list: + if isinstance(decorator, ast.Name) and decorator.id == "always_free": + return True + return False + def _get_property_name_from_setter(self, node: ast.FunctionDef) -> str: """从setter装饰器中获取属性名""" for decorator in node.decorator_list: