From 4b5a83efa4d487bf3146632e1dd834df72b2b346 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Fri, 19 Sep 2025 21:09:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dprotocolnode=E7=9A=84?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../devices/workstation/workstation_base.py | 134 +++------------- unilabos/registry/devices/work_station.yaml | 144 +----------------- unilabos/registry/registry.py | 19 ++- unilabos/ros/nodes/base_device_node.py | 1 - unilabos/ros/utils/driver_creator.py | 13 +- 5 files changed, 45 insertions(+), 266 deletions(-) diff --git a/unilabos/devices/workstation/workstation_base.py b/unilabos/devices/workstation/workstation_base.py index 5337f6da..f8cea028 100644 --- a/unilabos/devices/workstation/workstation_base.py +++ b/unilabos/devices/workstation/workstation_base.py @@ -158,10 +158,6 @@ class WorkstationBase(ABC): # PLR 物料系统 self.deck: Optional[Deck] = None self.plr_resources: Dict[str, PLRResource] = {} - - # 资源同步器(可选) - self.resource_synchronizer = ResourceSynchronizer(self) # 要在driver中自行初始化,只有workstation用 - # 硬件接口 self.hardware_interface: Union[Any, str] = None @@ -174,67 +170,19 @@ class WorkstationBase(ABC): # 支持的工作流(静态预定义) self.supported_workflows: Dict[str, WorkflowInfo] = {} + def post_init(self, ros_node: ROS2WorkstationNode) -> None: # 初始化物料系统 + self._ros_node = ros_node self._initialize_material_system() - # 注册支持的工作流 - self._register_supported_workflows() - - logger.info(f"工作站 {device_id} 初始化完成(简化版)") - def _initialize_material_system(self): """初始化物料系统 - 使用 graphio 转换""" - try: - from unilabos.resources.graphio import resource_ulab_to_plr - - # 1. 合并 deck_config 和 children 创建完整的资源树 - complete_resource_config = self._create_complete_resource_config() - - # 2. 使用 graphio 转换为 PLR 资源 - self.deck = resource_ulab_to_plr(complete_resource_config, plr_model=True) - - # 3. 建立资源映射 - self._build_resource_mappings(self.deck) - - # 4. 如果有资源同步器,执行初始同步 - if self.resource_synchronizer: - # 这里可以异步执行,暂时跳过 - pass - - logger.info(f"工作站 {self.device_id} 物料系统初始化成功,创建了 {len(self.plr_resources)} 个资源") - - except Exception as e: - logger.error(f"工作站 {self.device_id} 物料系统初始化失败: {e}") - raise + pass def _create_complete_resource_config(self) -> Dict[str, Any]: """创建完整的资源配置 - 合并 deck_config 和 children""" # 创建主 deck 配置 - deck_resource = { - "id": f"{self.device_id}_deck", - "name": f"{self.device_id}_deck", - "type": "deck", - "position": {"x": 0, "y": 0, "z": 0}, - "config": { - "size_x": self.deck_config.get("size_x", 1000.0), - "size_y": self.deck_config.get("size_y", 1000.0), - "size_z": self.deck_config.get("size_z", 100.0), - **{k: v for k, v in self.deck_config.items() if k not in ["size_x", "size_y", "size_z"]}, - }, - "data": {}, - "children": [], - "parent": None, - } - - # 添加子资源 - if self._children: - children_list = [] - for child_id, child_config in self._children.items(): - child_resource = self._normalize_child_resource(child_id, child_config, deck_resource["id"]) - children_list.append(child_resource) - deck_resource["children"] = children_list - - return deck_resource + return {} def _normalize_child_resource(self, resource_id: str, config: Dict[str, Any], parent_id: str) -> Dict[str, Any]: """标准化子资源配置""" @@ -284,12 +232,12 @@ class WorkstationBase(ABC): def set_hardware_interface(self, hardware_interface: Union[Any, str]): """设置硬件接口""" self.hardware_interface = hardware_interface - logger.info(f"工作站 {self.device_id} 硬件接口设置: {type(hardware_interface).__name__}") + logger.info(f"工作站 {self._ros_node.device_id} 硬件接口设置: {type(hardware_interface).__name__}") def set_workstation_node(self, workstation_node: "ROS2WorkstationNode"): """设置协议节点引用(用于代理模式)""" self._ros_node = workstation_node - logger.info(f"工作站 {self.device_id} 关联协议节点") + logger.info(f"工作站 {self._ros_node.device_id} 关联协议节点") # ============ 设备操作接口 ============ @@ -351,18 +299,18 @@ class WorkstationBase(ABC): async def sync_with_external_system(self) -> bool: """与外部物料系统同步""" if not self.resource_synchronizer: - logger.info(f"工作站 {self.device_id} 没有配置资源同步器") + logger.info(f"工作站 {self._ros_node.device_id} 没有配置资源同步器") return True try: success = await self.resource_synchronizer.sync_from_external() if success: - logger.info(f"工作站 {self.device_id} 外部同步成功") + logger.info(f"工作站 {self._ros_node.device_id} 外部同步成功") else: - logger.warning(f"工作站 {self.device_id} 外部同步失败") + logger.warning(f"工作站 {self._ros_node.device_id} 外部同步失败") return success except Exception as e: - logger.error(f"工作站 {self.device_id} 外部同步异常: {e}") + logger.error(f"工作站 {self._ros_node.device_id} 外部同步异常: {e}") return False # ============ 简化的工作流控制 ============ @@ -380,23 +328,23 @@ class WorkstationBase(ABC): if success: self.current_workflow_status = WorkflowStatus.RUNNING - logger.info(f"工作站 {self.device_id} 工作流 {workflow_name} 启动成功") + logger.info(f"工作站 {self._ros_node.device_id} 工作流 {workflow_name} 启动成功") else: self.current_workflow_status = WorkflowStatus.ERROR - logger.error(f"工作站 {self.device_id} 工作流 {workflow_name} 启动失败") + logger.error(f"工作站 {self._ros_node.device_id} 工作流 {workflow_name} 启动失败") return success except Exception as e: self.current_workflow_status = WorkflowStatus.ERROR - logger.error(f"工作站 {self.device_id} 执行工作流失败: {e}") + logger.error(f"工作站 {self._ros_node.device_id} 执行工作流失败: {e}") return False def stop_workflow(self, emergency: bool = False) -> bool: """停止工作流""" try: if self.current_workflow_status in [WorkflowStatus.IDLE, WorkflowStatus.STOPPED]: - logger.warning(f"工作站 {self.device_id} 没有正在运行的工作流") + logger.warning(f"工作站 {self._ros_node.device_id} 没有正在运行的工作流") return True self.current_workflow_status = WorkflowStatus.STOPPING @@ -406,16 +354,16 @@ class WorkstationBase(ABC): if success: self.current_workflow_status = WorkflowStatus.STOPPED - logger.info(f"工作站 {self.device_id} 工作流停止成功 (紧急: {emergency})") + logger.info(f"工作站 {self._ros_node.device_id} 工作流停止成功 (紧急: {emergency})") else: self.current_workflow_status = WorkflowStatus.ERROR - logger.error(f"工作站 {self.device_id} 工作流停止失败") + logger.error(f"工作站 {self._ros_node.device_id} 工作流停止失败") return success except Exception as e: self.current_workflow_status = WorkflowStatus.ERROR - logger.error(f"工作站 {self.device_id} 停止工作流失败: {e}") + logger.error(f"工作站 {self._ros_node.device_id} 停止工作流失败: {e}") return False # ============ 状态属性 ============ @@ -441,49 +389,7 @@ class WorkstationBase(ABC): return 0.0 return time.time() - self.workflow_start_time - # ============ 抽象方法 - 子类必须实现 ============ - @abstractmethod - def _register_supported_workflows(self): - """注册支持的工作流 - 子类必须实现""" - pass - - @abstractmethod - def _execute_workflow_impl(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: - """执行工作流的具体实现 - 子类必须实现""" - pass - - @abstractmethod - def _stop_workflow_impl(self, emergency: bool = False) -> bool: - """停止工作流的具体实现 - 子类必须实现""" - pass - -class WorkstationExample(WorkstationBase): - """工作站示例实现""" - - def _register_supported_workflows(self): - """注册支持的工作流""" - self.supported_workflows["example_workflow"] = WorkflowInfo( - name="example_workflow", - description="这是一个示例工作流", - estimated_duration=300.0, - required_materials=["sample_plate"], - output_product="processed_plate", - parameters_schema={"param1": "string", "param2": "integer"}, - ) - - def _execute_workflow_impl(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: - """执行工作流的具体实现""" - if workflow_name not in self.supported_workflows: - logger.error(f"工作站 {self.device_id} 不支持工作流: {workflow_name}") - return False - - # 这里添加实际的工作流逻辑 - logger.info(f"工作站 {self.device_id} 正在执行工作流: {workflow_name} with parameters {parameters}") - return True - - def _stop_workflow_impl(self, emergency: bool = False) -> bool: - """停止工作流的具体实现""" - # 这里添加实际的停止逻辑 - logger.info(f"工作站 {self.device_id} 正在停止工作流 (紧急: {emergency})") - return True \ No newline at end of file +class ProtocolNode(WorkstationBase): + def __init__(self, station_resource: Optional[PLRResource], *args, **kwargs): + super().__init__(station_resource, *args, **kwargs) diff --git a/unilabos/registry/devices/work_station.yaml b/unilabos/registry/devices/work_station.yaml index 3bc4dfa4..c5e7fced 100644 --- a/unilabos/registry/devices/work_station.yaml +++ b/unilabos/registry/devices/work_station.yaml @@ -6024,156 +6024,18 @@ workstation: title: WashSolid type: object type: WashSolid - auto-create_ros_action_server: - feedback: {} - goal: {} - goal_default: - action_name: null - action_value_mapping: null - handles: {} - result: {} - schema: - description: create_ros_action_server的参数schema - properties: - feedback: {} - goal: - properties: - action_name: - type: string - action_value_mapping: - type: string - required: - - action_name - - action_value_mapping - type: object - result: {} - required: - - goal - title: create_ros_action_server参数 - type: object - type: UniLabJsonCommand - auto-execute_single_action: - feedback: {} - goal: {} - goal_default: - action_kwargs: null - action_name: null - device_id: null - handles: {} - result: {} - schema: - description: execute_single_action的参数schema - properties: - feedback: {} - goal: - properties: - action_kwargs: - type: string - action_name: - type: string - device_id: - type: string - required: - - device_id - - action_name - - action_kwargs - type: object - result: {} - required: - - goal - title: execute_single_action参数 - type: object - type: UniLabJsonCommandAsync - auto-initialize_device: - feedback: {} - goal: {} - goal_default: - device_config: null - device_id: null - handles: {} - result: {} - schema: - description: initialize_device的参数schema - properties: - feedback: {} - goal: - properties: - device_config: - type: string - device_id: - type: string - required: - - device_id - - device_config - type: object - result: {} - required: - - goal - title: initialize_device参数 - type: object - type: UniLabJsonCommand - module: unilabos.ros.nodes.presets.workstation:ROS2WorkstationNode + module: unilabos.devices.workstation.workstation_base:ProtocolNode status_types: {} - type: ros2 + type: python config_info: [] description: Workstation handles: [] icon: '' - init_param_schema: - config: - properties: - action_value_mappings: - type: object - children: - type: object - device_id: - type: string - driver_instance: - type: string - hardware_interface: - type: object - print_publish: - default: true - type: string - protocol_type: - items: - type: string - type: array - resource_tracker: - type: string - status_types: - type: object - required: - - protocol_type - - children - - driver_instance - - device_id - - status_types - - action_value_mappings - - hardware_interface - type: object - data: - properties: {} - required: [] - type: object - version: 1.0.0 -workstation.example: - category: - - work_station - class: - action_value_mappings: {} - module: unilabos.devices.workstation.workstation_base:WorkstationExample - status_types: {} - type: ros2 - config_info: [] - description: Example Workstation - handles: [] - icon: '' init_param_schema: config: properties: station_resource: - type: object + type: string required: - station_resource type: object diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 48e2df77..adc9c5c4 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -19,6 +19,9 @@ from unilabos.utils.type_check import NoAliasDumper DEFAULT_PATHS = [Path(__file__).absolute().parent] +class ROSMsgNotFound(Exception): + pass + @singleton class Registry: @@ -308,7 +311,7 @@ class Registry: return type_class else: logger.error(f"[UniLab Registry] 无法找到类型 '{type_name}' 用于设备 {device_id} 的 {field_name}") - sys.exit(1) + raise ROSMsgNotFound(f"类型 '{type_name}' 未找到,用于设备 {device_id} 的 {field_name}") def _get_json_schema_type(self, type_str: str) -> str: """ @@ -495,7 +498,10 @@ class Registry: if isinstance(status_type, tuple) or status_type in ["Any", "None", "Unknown"]: status_type = "String" # 替换成ROS的String,便于显示 device_config["class"]["status_types"][status_name] = status_type - target_type = self._replace_type_with_class(status_type, device_id, f"状态 {status_name}") + try: + target_type = self._replace_type_with_class(status_type, device_id, f"状态 {status_name}") + except ROSMsgNotFound: + continue if target_type in [ dict, list, @@ -568,9 +574,12 @@ class Registry: action_type_str: str = action_config["type"] # 通过Json发放指令,而不是通过特殊的ros action进行处理 if not action_type_str.startswith("UniLabJsonCommand"): - target_type = self._replace_type_with_class( - action_type_str, device_id, f"动作 {action_name}" - ) + try: + target_type = self._replace_type_with_class( + action_type_str, device_id, f"动作 {action_name}" + ) + except ROSMsgNotFound: + continue action_str_type_mapping[action_type_str] = target_type if target_type is not None: action_config["goal_default"] = yaml.safe_load( diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 6213cd0b..08d5321e 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -980,7 +980,6 @@ class ROS2DeviceNode: ) else: from unilabos.devices.workstation.workstation_base import WorkstationBase - if issubclass(self._driver_class, WorkstationBase): # 是WorkstationNode的子节点,就要调用WorkstationNodeCreator self.driver_is_workstation = True self._driver_creator = WorkstationNodeCreator(driver_class, children=children, resource_tracker=self.resource_tracker) diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index 0203d699..76c6f776 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -296,11 +296,14 @@ class WorkstationNodeCreator(DeviceClassCreator[T]): try: # 创建实例,额外补充一个给protocol node的字段,后面考虑取消 data["children"] = self.children - station_resource_dict = data["station_resource"] - from pylabrobot.resources import Deck, Resource - plrc = PyLabRobotCreator(Deck, self.children, self.resource_tracker) - station_resource = plrc.create_instance(station_resource_dict) - data["station_resource"] = station_resource + station_resource_dict = data.get("station_resource") + if station_resource_dict: + from pylabrobot.resources import Deck, Resource + plrc = PyLabRobotCreator(Deck, self.children, self.resource_tracker) + station_resource = plrc.create_instance(station_resource_dict) + data["station_resource"] = station_resource + else: + data["station_resource"] = None self.device_instance = super(WorkstationNodeCreator, self).create_instance(data) self.post_create() return self.device_instance