修复必须两次启动edge后端才有节点生成的bug

新增resources报送
This commit is contained in:
Xuwznln
2025-05-28 22:06:49 +08:00
parent 344e91bce3
commit 8a50081c1e
4 changed files with 160 additions and 100 deletions

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@ __pycache__/
.vscode
*.py[cod]
*$py.class
service
# C extensions
*.so

View File

@@ -43,10 +43,6 @@ class MQTTClient:
def _on_connect(self, client, userdata, flags, rc, properties=None):
logger.info("[MQTT] Connected with result code " + str(rc))
client.subscribe(f"labs/{MQConfig.lab_id}/job/start/", 0)
isok, data = devices()
if not isok:
logger.error("[MQTT] on_connect ErrorHostNotInit")
return
def _on_message(self, client, userdata, msg) -> None:
logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload))

View File

@@ -25,7 +25,20 @@ class Registry:
self.ResourceCreateFromOuterEasy = self._replace_type_with_class(
"ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource"
)
self.device_type_registry = {
self.device_type_registry = {}
self.resource_type_registry = {}
self._setup_called = False # 跟踪setup是否已调用
# 其他状态变量
# self.is_host_mode = False # 移至BasicConfig中
def setup(self):
# 检查是否已调用过setup
if self._setup_called:
logger.critical("[UniLab Registry] setup方法已被调用过不允许多次调用")
return
from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type
self.device_type_registry.update({
"host_node": {
"description": "UniLabOS主机节点",
"class": {
@@ -34,7 +47,7 @@ class Registry:
"status_types": {},
"action_value_mappings": {
"create_resource_detailed": {
"type": msg_converter_manager.search_class("ResourceCreateFromOuter"),
"type": self.ResourceCreateFromOuter,
"goal": {
"resources": "resources",
"device_ids": "device_ids",
@@ -43,13 +56,14 @@ class Registry:
"other_calling_params": "other_calling_params",
},
"feedback": {},
"result": {
"success": "success"
},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuter)
"result": {"success": "success"},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuter),
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal))
)
},
"create_resource": {
"type": msg_converter_manager.search_class("ResourceCreateFromOuterEasy"),
"type": self.ResourceCreateFromOuterEasy,
"goal": {
"res_id": "res_id",
"class_name": "class_name",
@@ -62,35 +76,20 @@ class Registry:
"slot_on_deck": "slot_on_deck",
},
"feedback": {},
"result": {
"success": "success"
},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy)
}
}
"result": {"success": "success"},
"schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy),
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
)
},
},
},
"schema": {
"properties": {},
"additionalProperties": False,
"type": "object"
},
"file_path": "/"
"icon": "icon_device.webp",
"registry_type": "device",
"schema": {"properties": {}, "additionalProperties": False, "type": "object"},
"file_path": "/",
}
}
self.resource_type_registry = {}
self._setup_called = False # 跟踪setup是否已调用
# 其他状态变量
# self.is_host_mode = False # 移至BasicConfig中
def setup(self):
# 检查是否已调用过setup
if self._setup_called:
logger.critical("[UniLab Registry] setup方法已被调用过不允许多次调用")
return
# 标记setup已被调用
self._setup_called = True
})
logger.debug(f"[UniLab Registry] ----------Setup----------")
self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths):
@@ -100,6 +99,8 @@ class Registry:
self.load_device_types(path)
self.load_resource_types(path)
logger.info("[UniLab Registry] 注册表设置完成")
# 标记setup已被调用
self._setup_called = True
def load_resource_types(self, path: os.PathLike):
abs_path = Path(path).absolute()
@@ -115,6 +116,9 @@ class Registry:
resource_info["file_path"] = str(file.absolute()).replace("\\", "/")
if "description" not in resource_info:
resource_info["description"] = ""
if "icon" not in resource_info:
resource_info["icon"] = ""
resource_info["registry_type"] = "resource"
self.resource_type_registry.update(data)
logger.debug(
f"[UniLab Registry] Resource-{current_resource_number} File-{i+1}/{len(files)} "
@@ -164,6 +168,7 @@ class Registry:
)
current_device_number = len(self.device_type_registry) + 1
from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type
for i, file in enumerate(files):
data = yaml.safe_load(open(file, encoding="utf-8"))
if data:
@@ -173,6 +178,9 @@ class Registry:
device_config["file_path"] = str(file.absolute()).replace("\\", "/")
if "description" not in device_config:
device_config["description"] = ""
if "icon" not in device_config:
device_config["icon"] = ""
device_config["registry_type"] = "device"
if "class" in device_config:
# 处理状态类型
if "status_types" in device_config["class"]:
@@ -189,7 +197,9 @@ class Registry:
action_config["type"], device_id, f"动作 {action_name}"
)
if action_config["type"] is not None:
action_config["goal_default"] = yaml.safe_load(io.StringIO(get_yaml_from_goal_type(action_config["type"].Goal)))
action_config["goal_default"] = yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(action_config["type"].Goal))
)
action_config["schema"] = ros_action_to_json_schema(action_config["type"])
else:
logger.warning(
@@ -212,13 +222,17 @@ class Registry:
def obtain_registry_device_info(self):
devices = []
for device_id, device_info in self.device_type_registry.items():
msg = {
"id": device_id,
**device_info
}
msg = {"id": device_id, **device_info}
devices.append(msg)
return devices
def obtain_registry_resource_info(self):
resources = []
for resource_id, resource_info in self.resource_type_registry.items():
msg = {"id": resource_id, **resource_info}
resources.append(msg)
return resources
# 全局单例实例
lab_registry = Registry()

View File

@@ -12,8 +12,14 @@ from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.service import Service
from unilabos_msgs.msg import Resource # type: ignore
from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \
SerialCommand # type: ignore
from unilabos_msgs.srv import (
ResourceAdd,
ResourceGet,
ResourceDelete,
ResourceUpdate,
ResourceList,
SerialCommand,
) # type: ignore
from unique_identifier_msgs.msg import UUID
from unilabos.registry.registry import lab_registry
@@ -100,16 +106,26 @@ class HostNode(BaseROS2DeviceNode):
# 创建设备、动作客户端和目标存储
self.devices_names: Dict[str, str] = {device_id: self.namespace} # 存储设备名称和命名空间的映射
self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例
self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射
self.device_machine_names: Dict[str, str] = {
device_id: "本地",
} # 存储设备ID到机器名称的映射
self._action_clients: Dict[str, ActionClient] = { # 为了方便了解实际的数据类型host的默认写好
"/devices/host_node/create_resource": ActionClient(
self, lab_registry.ResourceCreateFromOuterEasy, "/devices/host_node/create_resource", callback_group=self.callback_group
self,
lab_registry.ResourceCreateFromOuterEasy,
"/devices/host_node/create_resource",
callback_group=self.callback_group,
),
"/devices/host_node/create_resource_detailed": ActionClient(
self, lab_registry.ResourceCreateFromOuter, "/devices/host_node/create_resource_detailed", callback_group=self.callback_group
)
self,
lab_registry.ResourceCreateFromOuter,
"/devices/host_node/create_resource_detailed",
callback_group=self.callback_group,
),
} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = {} # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
self._action_value_mappings: Dict[str, Dict] = (
{}
) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
self._last_discovery_time = 0.0 # 上次设备发现的时间
@@ -123,8 +139,11 @@ class HostNode(BaseROS2DeviceNode):
self.device_status_timestamps = {} # 用来存储设备状态最后更新时间
from unilabos.app.mq import mqtt_client
for device_config in lab_registry.obtain_registry_device_info():
mqtt_client.publish_registry(device_config["id"], device_config)
for device_info in lab_registry.obtain_registry_device_info():
mqtt_client.publish_registry(device_info["id"], device_info)
for resource_info in lab_registry.obtain_registry_resource_info():
mqtt_client.publish_registry(resource_info["id"], resource_info)
# 首次发现网络中的设备
self._discover_devices()
@@ -149,21 +168,20 @@ class HostNode(BaseROS2DeviceNode):
].items():
controller_config["update_rate"] = update_rate
self.initialize_controller(controller_id, controller_config)
resources_config.insert(0, {
"id": "host_node",
"name": "host_node",
"parent": None,
"type": "device",
"class": "host_node",
"position": {
"x": 0,
"y": 0,
"z": 0
resources_config.insert(
0,
{
"id": "host_node",
"name": "host_node",
"parent": None,
"type": "device",
"class": "host_node",
"position": {"x": 0, "y": 0, "z": 0},
"config": {},
"data": {},
"children": [],
},
"config": {},
"data": {},
"children": []
})
)
resource_with_parent_name = []
resource_ids_to_instance = {i["id"]: i for i in resources_config}
for res in resources_config:
@@ -233,7 +251,7 @@ class HostNode(BaseROS2DeviceNode):
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}",
).start()
elif device_key not in self._online_devices:
# 设备重新上线
@@ -244,7 +262,7 @@ class HostNode(BaseROS2DeviceNode):
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}",
).start()
# 检测离线设备
@@ -288,7 +306,7 @@ class HostNode(BaseROS2DeviceNode):
self, action_type, action_id, callback_group=self.callback_group
)
self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}")
action_name = action_id[len(namespace) + 1:]
action_name = action_id[len(namespace) + 1 :]
edge_device_id = namespace[9:]
# from unilabos.app.mq import mqtt_client
# info_with_schema = ros_action_to_json_schema(action_type)
@@ -301,52 +319,81 @@ class HostNode(BaseROS2DeviceNode):
except Exception as e:
self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}")
def create_resource_detailed(self, resources: list["Resource"], device_ids: list[str], bind_parent_ids: list[str], bind_locations: list[Point], other_calling_params: list[str]):
for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip(resources, device_ids, bind_parent_ids, bind_locations, other_calling_params):
def create_resource_detailed(
self,
resources: list["Resource"],
device_ids: list[str],
bind_parent_ids: list[str],
bind_locations: list[Point],
other_calling_params: list[str],
):
for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip(
resources, device_ids, bind_parent_ids, bind_locations, other_calling_params
):
# 这里要求device_id传入必须是edge_device_id
namespace = "/devices/" + device_id
srv_address = f"/srv{namespace}/append_resource"
sclient = self.create_client(SerialCommand, srv_address)
sclient.wait_for_service()
request = SerialCommand.Request()
request.command = json.dumps({
"resource": resource, # 单个/单组 可为 list[list[Resource]]
"namespace": namespace,
"edge_device_id": device_id,
"bind_parent_id": bind_parent_id,
"bind_location": {
"x": bind_location.x,
"y": bind_location.y,
"z": bind_location.z,
request.command = json.dumps(
{
"resource": resource, # 单个/单组 可为 list[list[Resource]]
"namespace": namespace,
"edge_device_id": device_id,
"bind_parent_id": bind_parent_id,
"bind_location": {
"x": bind_location.x,
"y": bind_location.y,
"z": bind_location.z,
},
"other_calling_param": json.loads(other_calling_param) if other_calling_param else {},
},
"other_calling_param": json.loads(other_calling_param) if other_calling_param else {},
}, ensure_ascii=False)
ensure_ascii=False,
)
response = sclient.call(request)
pass
pass
def create_resource(self, device_id: str, res_id: str, class_name: str, parent: str, bind_locations: Point, liquid_input_slot: list[int], liquid_type: list[str], liquid_volume: list[int], slot_on_deck: int):
init_new_res = initialize_resource({
"name": res_id,
"class": class_name,
"parent": parent,
"position": {
"x": bind_locations.x,
"y": bind_locations.y,
"z": bind_locations.z,
def create_resource(
self,
device_id: str,
res_id: str,
class_name: str,
parent: str,
bind_locations: Point,
liquid_input_slot: list[int],
liquid_type: list[str],
liquid_volume: list[int],
slot_on_deck: int,
):
init_new_res = initialize_resource(
{
"name": res_id,
"class": class_name,
"parent": parent,
"position": {
"x": bind_locations.x,
"y": bind_locations.y,
"z": bind_locations.z,
},
}
}) # flatten的格式
) # flatten的格式
resources = [init_new_res]
device_id = [device_id]
bind_parent_id = [parent]
bind_location = [bind_locations]
other_calling_param = [json.dumps({
"ADD_LIQUID_TYPE": liquid_type,
"LIQUID_VOLUME": liquid_volume,
"LIQUID_INPUT_SLOT": liquid_input_slot,
"initialize_full": False,
"slot": slot_on_deck
})]
other_calling_param = [
json.dumps(
{
"ADD_LIQUID_TYPE": liquid_type,
"LIQUID_VOLUME": liquid_volume,
"LIQUID_INPUT_SLOT": liquid_input_slot,
"initialize_full": False,
"slot": slot_on_deck,
}
)
]
return self.create_resource_detailed(resources, device_id, bind_parent_id, bind_location, other_calling_param)
@@ -377,7 +424,9 @@ class HostNode(BaseROS2DeviceNode):
if action_id not in self._action_clients:
action_type = action_value_mapping["type"]
self._action_clients[action_id] = ActionClient(self, action_type, action_id)
self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的
self.lab_logger().debug(
f"[Host Node] Created ActionClient (Local): {action_id}"
) # 子设备再创建用的是Discover发现的
# from unilabos.app.mq import mqtt_client
# info_with_schema = ros_action_to_json_schema(action_type)
# mqtt_client.publish_actions(action_name, {