diff --git a/.gitignore b/.gitignore index 333df5b..e2c2063 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ __pycache__/ .vscode *.py[cod] *$py.class +service # C extensions *.so diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index 0bac96f..aaade1b 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -43,10 +43,6 @@ class MQTTClient: def _on_connect(self, client, userdata, flags, rc, properties=None): logger.info("[MQTT] Connected with result code " + str(rc)) client.subscribe(f"labs/{MQConfig.lab_id}/job/start/", 0) - isok, data = devices() - if not isok: - logger.error("[MQTT] on_connect ErrorHostNotInit") - return def _on_message(self, client, userdata, msg) -> None: logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload)) diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 91789b7..f3be457 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -25,7 +25,20 @@ class Registry: self.ResourceCreateFromOuterEasy = self._replace_type_with_class( "ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource" ) - self.device_type_registry = { + self.device_type_registry = {} + self.resource_type_registry = {} + self._setup_called = False # 跟踪setup是否已调用 + # 其他状态变量 + # self.is_host_mode = False # 移至BasicConfig中 + + def setup(self): + # 检查是否已调用过setup + if self._setup_called: + logger.critical("[UniLab Registry] setup方法已被调用过,不允许多次调用") + return + + from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type + self.device_type_registry.update({ "host_node": { "description": "UniLabOS主机节点", "class": { @@ -34,7 +47,7 @@ class Registry: "status_types": {}, "action_value_mappings": { "create_resource_detailed": { - "type": msg_converter_manager.search_class("ResourceCreateFromOuter"), + "type": self.ResourceCreateFromOuter, "goal": { "resources": "resources", "device_ids": "device_ids", @@ -43,13 +56,14 @@ class Registry: "other_calling_params": "other_calling_params", }, "feedback": {}, - "result": { - "success": "success" - }, - "schema": ros_action_to_json_schema(self.ResourceCreateFromOuter) + "result": {"success": "success"}, + "schema": ros_action_to_json_schema(self.ResourceCreateFromOuter), + "goal_default": yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal)) + ) }, "create_resource": { - "type": msg_converter_manager.search_class("ResourceCreateFromOuterEasy"), + "type": self.ResourceCreateFromOuterEasy, "goal": { "res_id": "res_id", "class_name": "class_name", @@ -62,35 +76,20 @@ class Registry: "slot_on_deck": "slot_on_deck", }, "feedback": {}, - "result": { - "success": "success" - }, - "schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy) - } - } + "result": {"success": "success"}, + "schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy), + "goal_default": yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal)) + ) + }, + }, }, - "schema": { - "properties": {}, - "additionalProperties": False, - "type": "object" - }, - "file_path": "/" + "icon": "icon_device.webp", + "registry_type": "device", + "schema": {"properties": {}, "additionalProperties": False, "type": "object"}, + "file_path": "/", } - } - self.resource_type_registry = {} - self._setup_called = False # 跟踪setup是否已调用 - # 其他状态变量 - # self.is_host_mode = False # 移至BasicConfig中 - - def setup(self): - # 检查是否已调用过setup - if self._setup_called: - logger.critical("[UniLab Registry] setup方法已被调用过,不允许多次调用") - return - - # 标记setup已被调用 - self._setup_called = True - + }) logger.debug(f"[UniLab Registry] ----------Setup----------") self.registry_paths = [Path(path).absolute() for path in self.registry_paths] for i, path in enumerate(self.registry_paths): @@ -100,6 +99,8 @@ class Registry: self.load_device_types(path) self.load_resource_types(path) logger.info("[UniLab Registry] 注册表设置完成") + # 标记setup已被调用 + self._setup_called = True def load_resource_types(self, path: os.PathLike): abs_path = Path(path).absolute() @@ -115,6 +116,9 @@ class Registry: resource_info["file_path"] = str(file.absolute()).replace("\\", "/") if "description" not in resource_info: resource_info["description"] = "" + if "icon" not in resource_info: + resource_info["icon"] = "" + resource_info["registry_type"] = "resource" self.resource_type_registry.update(data) logger.debug( f"[UniLab Registry] Resource-{current_resource_number} File-{i+1}/{len(files)} " @@ -164,6 +168,7 @@ class Registry: ) current_device_number = len(self.device_type_registry) + 1 from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type + for i, file in enumerate(files): data = yaml.safe_load(open(file, encoding="utf-8")) if data: @@ -173,6 +178,9 @@ class Registry: device_config["file_path"] = str(file.absolute()).replace("\\", "/") if "description" not in device_config: device_config["description"] = "" + if "icon" not in device_config: + device_config["icon"] = "" + device_config["registry_type"] = "device" if "class" in device_config: # 处理状态类型 if "status_types" in device_config["class"]: @@ -189,7 +197,9 @@ class Registry: action_config["type"], device_id, f"动作 {action_name}" ) if action_config["type"] is not None: - action_config["goal_default"] = yaml.safe_load(io.StringIO(get_yaml_from_goal_type(action_config["type"].Goal))) + action_config["goal_default"] = yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(action_config["type"].Goal)) + ) action_config["schema"] = ros_action_to_json_schema(action_config["type"]) else: logger.warning( @@ -212,13 +222,17 @@ class Registry: def obtain_registry_device_info(self): devices = [] for device_id, device_info in self.device_type_registry.items(): - msg = { - "id": device_id, - **device_info - } + msg = {"id": device_id, **device_info} devices.append(msg) return devices + def obtain_registry_resource_info(self): + resources = [] + for resource_id, resource_info in self.resource_type_registry.items(): + msg = {"id": resource_id, **resource_info} + resources.append(msg) + return resources + # 全局单例实例 lab_registry = Registry() diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index a8a3299..5346a47 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -12,8 +12,14 @@ from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.service import Service from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \ - SerialCommand # type: ignore +from unilabos_msgs.srv import ( + ResourceAdd, + ResourceGet, + ResourceDelete, + ResourceUpdate, + ResourceList, + SerialCommand, +) # type: ignore from unique_identifier_msgs.msg import UUID from unilabos.registry.registry import lab_registry @@ -100,16 +106,26 @@ class HostNode(BaseROS2DeviceNode): # 创建设备、动作客户端和目标存储 self.devices_names: Dict[str, str] = {device_id: self.namespace} # 存储设备名称和命名空间的映射 self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例 - self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射 + self.device_machine_names: Dict[str, str] = { + device_id: "本地", + } # 存储设备ID到机器名称的映射 self._action_clients: Dict[str, ActionClient] = { # 为了方便了解实际的数据类型,host的默认写好 "/devices/host_node/create_resource": ActionClient( - self, lab_registry.ResourceCreateFromOuterEasy, "/devices/host_node/create_resource", callback_group=self.callback_group + self, + lab_registry.ResourceCreateFromOuterEasy, + "/devices/host_node/create_resource", + callback_group=self.callback_group, ), "/devices/host_node/create_resource_detailed": ActionClient( - self, lab_registry.ResourceCreateFromOuter, "/devices/host_node/create_resource_detailed", callback_group=self.callback_group - ) + self, + lab_registry.ResourceCreateFromOuter, + "/devices/host_node/create_resource_detailed", + callback_group=self.callback_group, + ), } # 用来存储多个ActionClient实例 - self._action_value_mappings: Dict[str, Dict] = {} # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系 + self._action_value_mappings: Dict[str, Dict] = ( + {} + ) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系 self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态 self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备 self._last_discovery_time = 0.0 # 上次设备发现的时间 @@ -123,8 +139,11 @@ class HostNode(BaseROS2DeviceNode): self.device_status_timestamps = {} # 用来存储设备状态最后更新时间 from unilabos.app.mq import mqtt_client - for device_config in lab_registry.obtain_registry_device_info(): - mqtt_client.publish_registry(device_config["id"], device_config) + + for device_info in lab_registry.obtain_registry_device_info(): + mqtt_client.publish_registry(device_info["id"], device_info) + for resource_info in lab_registry.obtain_registry_resource_info(): + mqtt_client.publish_registry(resource_info["id"], resource_info) # 首次发现网络中的设备 self._discover_devices() @@ -149,21 +168,20 @@ class HostNode(BaseROS2DeviceNode): ].items(): controller_config["update_rate"] = update_rate self.initialize_controller(controller_id, controller_config) - resources_config.insert(0, { - "id": "host_node", - "name": "host_node", - "parent": None, - "type": "device", - "class": "host_node", - "position": { - "x": 0, - "y": 0, - "z": 0 + resources_config.insert( + 0, + { + "id": "host_node", + "name": "host_node", + "parent": None, + "type": "device", + "class": "host_node", + "position": {"x": 0, "y": 0, "z": 0}, + "config": {}, + "data": {}, + "children": [], }, - "config": {}, - "data": {}, - "children": [] - }) + ) resource_with_parent_name = [] resource_ids_to_instance = {i["id"]: i for i in resources_config} for res in resources_config: @@ -233,7 +251,7 @@ class HostNode(BaseROS2DeviceNode): target=self._send_re_register, args=(sclient,), daemon=True, - name=f"ROSDevice{self.device_id}_query_host_name_{namespace}" + name=f"ROSDevice{self.device_id}_query_host_name_{namespace}", ).start() elif device_key not in self._online_devices: # 设备重新上线 @@ -244,7 +262,7 @@ class HostNode(BaseROS2DeviceNode): target=self._send_re_register, args=(sclient,), daemon=True, - name=f"ROSDevice{self.device_id}_query_host_name_{namespace}" + name=f"ROSDevice{self.device_id}_query_host_name_{namespace}", ).start() # 检测离线设备 @@ -288,7 +306,7 @@ class HostNode(BaseROS2DeviceNode): self, action_type, action_id, callback_group=self.callback_group ) self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}") - action_name = action_id[len(namespace) + 1:] + action_name = action_id[len(namespace) + 1 :] edge_device_id = namespace[9:] # from unilabos.app.mq import mqtt_client # info_with_schema = ros_action_to_json_schema(action_type) @@ -301,52 +319,81 @@ class HostNode(BaseROS2DeviceNode): except Exception as e: self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}") - def create_resource_detailed(self, resources: list["Resource"], device_ids: list[str], bind_parent_ids: list[str], bind_locations: list[Point], other_calling_params: list[str]): - for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip(resources, device_ids, bind_parent_ids, bind_locations, other_calling_params): + def create_resource_detailed( + self, + resources: list["Resource"], + device_ids: list[str], + bind_parent_ids: list[str], + bind_locations: list[Point], + other_calling_params: list[str], + ): + for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip( + resources, device_ids, bind_parent_ids, bind_locations, other_calling_params + ): # 这里要求device_id传入必须是edge_device_id namespace = "/devices/" + device_id srv_address = f"/srv{namespace}/append_resource" sclient = self.create_client(SerialCommand, srv_address) sclient.wait_for_service() request = SerialCommand.Request() - request.command = json.dumps({ - "resource": resource, # 单个/单组 可为 list[list[Resource]] - "namespace": namespace, - "edge_device_id": device_id, - "bind_parent_id": bind_parent_id, - "bind_location": { - "x": bind_location.x, - "y": bind_location.y, - "z": bind_location.z, + request.command = json.dumps( + { + "resource": resource, # 单个/单组 可为 list[list[Resource]] + "namespace": namespace, + "edge_device_id": device_id, + "bind_parent_id": bind_parent_id, + "bind_location": { + "x": bind_location.x, + "y": bind_location.y, + "z": bind_location.z, + }, + "other_calling_param": json.loads(other_calling_param) if other_calling_param else {}, }, - "other_calling_param": json.loads(other_calling_param) if other_calling_param else {}, - }, ensure_ascii=False) + ensure_ascii=False, + ) response = sclient.call(request) pass pass - def create_resource(self, device_id: str, res_id: str, class_name: str, parent: str, bind_locations: Point, liquid_input_slot: list[int], liquid_type: list[str], liquid_volume: list[int], slot_on_deck: int): - init_new_res = initialize_resource({ - "name": res_id, - "class": class_name, - "parent": parent, - "position": { - "x": bind_locations.x, - "y": bind_locations.y, - "z": bind_locations.z, + def create_resource( + self, + device_id: str, + res_id: str, + class_name: str, + parent: str, + bind_locations: Point, + liquid_input_slot: list[int], + liquid_type: list[str], + liquid_volume: list[int], + slot_on_deck: int, + ): + init_new_res = initialize_resource( + { + "name": res_id, + "class": class_name, + "parent": parent, + "position": { + "x": bind_locations.x, + "y": bind_locations.y, + "z": bind_locations.z, + }, } - }) # flatten的格式 + ) # flatten的格式 resources = [init_new_res] device_id = [device_id] bind_parent_id = [parent] bind_location = [bind_locations] - other_calling_param = [json.dumps({ - "ADD_LIQUID_TYPE": liquid_type, - "LIQUID_VOLUME": liquid_volume, - "LIQUID_INPUT_SLOT": liquid_input_slot, - "initialize_full": False, - "slot": slot_on_deck - })] + other_calling_param = [ + json.dumps( + { + "ADD_LIQUID_TYPE": liquid_type, + "LIQUID_VOLUME": liquid_volume, + "LIQUID_INPUT_SLOT": liquid_input_slot, + "initialize_full": False, + "slot": slot_on_deck, + } + ) + ] return self.create_resource_detailed(resources, device_id, bind_parent_id, bind_location, other_calling_param) @@ -377,7 +424,9 @@ class HostNode(BaseROS2DeviceNode): if action_id not in self._action_clients: action_type = action_value_mapping["type"] self._action_clients[action_id] = ActionClient(self, action_type, action_id) - self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的 + self.lab_logger().debug( + f"[Host Node] Created ActionClient (Local): {action_id}" + ) # 子设备再创建用的是Discover发现的 # from unilabos.app.mq import mqtt_client # info_with_schema = ros_action_to_json_schema(action_type) # mqtt_client.publish_actions(action_name, {