mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-06 15:05:13 +00:00
Merge branch 'dev' into workstation_dev_new
This commit is contained in:
@@ -1,11 +1,12 @@
|
||||
import collections
|
||||
import copy
|
||||
from dataclasses import dataclass, field
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from typing import Optional, Dict, Any, List, ClassVar, Set, Union
|
||||
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union
|
||||
|
||||
from action_msgs.msg import GoalStatus
|
||||
from geometry_msgs.msg import Point
|
||||
@@ -41,6 +42,14 @@ from unilabos.ros.nodes.presets.controller_node import ControllerNode
|
||||
from unilabos.utils.exception import DeviceClassInvalid
|
||||
from unilabos.utils.type_check import serialize_result_info
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from unilabos.app.ws_client import QueueItem
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeviceActionStatus:
|
||||
job_ids: Dict[str, float] = field(default_factory=dict)
|
||||
|
||||
|
||||
class HostNode(BaseROS2DeviceNode):
|
||||
"""
|
||||
@@ -51,6 +60,9 @@ class HostNode(BaseROS2DeviceNode):
|
||||
|
||||
_instance: ClassVar[Optional["HostNode"]] = None
|
||||
_ready_event: ClassVar[threading.Event] = threading.Event()
|
||||
_device_action_status: ClassVar[collections.defaultdict[str, DeviceActionStatus]] = collections.defaultdict(
|
||||
DeviceActionStatus
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls, timeout=None) -> Optional["HostNode"]:
|
||||
@@ -152,11 +164,15 @@ class HostNode(BaseROS2DeviceNode):
|
||||
self.device_status = {} # 用来存储设备状态
|
||||
self.device_status_timestamps = {} # 用来存储设备状态最后更新时间
|
||||
if BasicConfig.upload_registry:
|
||||
from unilabos.app.mq import mqtt_client
|
||||
register_devices_and_resources(mqtt_client, lab_registry)
|
||||
from unilabos.app.communication import get_communication_client
|
||||
|
||||
comm_client = get_communication_client()
|
||||
register_devices_and_resources(comm_client, lab_registry)
|
||||
else:
|
||||
self.lab_logger().warning("本次启动注册表不报送云端,如果您需要联网调试,请使用unilab-register命令进行单独报送,或者在启动命令增加--upload_registry")
|
||||
time.sleep(1) # 等待MQTT连接稳定
|
||||
self.lab_logger().warning(
|
||||
"本次启动注册表不报送云端,如果您需要联网调试,请使用unilab-register命令进行单独报送,或者在启动命令增加--upload_registry"
|
||||
)
|
||||
time.sleep(1) # 等待通信连接稳定
|
||||
# 首次发现网络中的设备
|
||||
self._discover_devices()
|
||||
|
||||
@@ -214,6 +230,7 @@ class HostNode(BaseROS2DeviceNode):
|
||||
for bridge in self.bridges:
|
||||
if hasattr(bridge, "resource_add"):
|
||||
from unilabos.app.web.client import HTTPClient
|
||||
|
||||
client: HTTPClient = bridge
|
||||
resource_start_time = time.time()
|
||||
resource_add_res = client.resource_add(add_schema(resource_with_parent_name), False)
|
||||
@@ -340,9 +357,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
self.lab_logger().trace(f"[Host Node] Created ActionClient (Discovery): {action_id}")
|
||||
action_name = action_id[len(namespace) + 1 :]
|
||||
edge_device_id = namespace[9:]
|
||||
# from unilabos.app.mq import mqtt_client
|
||||
# from unilabos.app.comm_factory import get_communication_client
|
||||
# comm_client = get_communication_client()
|
||||
# info_with_schema = ros_action_to_json_schema(action_type)
|
||||
# mqtt_client.publish_actions(action_name, {
|
||||
# comm_client.publish_actions(action_name, {
|
||||
# "device_id": edge_device_id,
|
||||
# "device_type": "",
|
||||
# "action_name": action_name,
|
||||
@@ -365,7 +383,9 @@ class HostNode(BaseROS2DeviceNode):
|
||||
):
|
||||
# 这里要求device_id传入必须是edge_device_id
|
||||
if device_id not in self.devices_names:
|
||||
self.lab_logger().error(f"[Host Node] Device {device_id} not found in devices_names. Create resource failed.")
|
||||
self.lab_logger().error(
|
||||
f"[Host Node] Device {device_id} not found in devices_names. Create resource failed."
|
||||
)
|
||||
raise ValueError(f"[Host Node] Device {device_id} not found in devices_names. Create resource failed.")
|
||||
|
||||
device_key = f"{self.devices_names[device_id]}/{device_id}"
|
||||
@@ -425,10 +445,12 @@ class HostNode(BaseROS2DeviceNode):
|
||||
res_creation_input.update(
|
||||
{
|
||||
"data": {
|
||||
"liquids": [{
|
||||
"liquid_type": liquid_type[0] if liquid_type else None,
|
||||
"liquid_volume": liquid_volume[0] if liquid_volume else None,
|
||||
}]
|
||||
"liquids": [
|
||||
{
|
||||
"liquid_type": liquid_type[0] if liquid_type else None,
|
||||
"liquid_volume": liquid_volume[0] if liquid_volume else None,
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
)
|
||||
@@ -451,7 +473,9 @@ class HostNode(BaseROS2DeviceNode):
|
||||
)
|
||||
]
|
||||
|
||||
response = await self.create_resource_detailed(resources, device_ids, bind_parent_id, bind_location, other_calling_param)
|
||||
response = await self.create_resource_detailed(
|
||||
resources, device_ids, bind_parent_id, bind_location, other_calling_param
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
@@ -482,7 +506,9 @@ class HostNode(BaseROS2DeviceNode):
|
||||
self.devices_instances[device_id] = d
|
||||
# noinspection PyProtectedMember
|
||||
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
|
||||
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith("UniLabJsonCommand"):
|
||||
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith(
|
||||
"UniLabJsonCommand"
|
||||
):
|
||||
continue
|
||||
action_id = f"/devices/{device_id}/{action_name}"
|
||||
if action_id not in self._action_clients:
|
||||
@@ -491,9 +517,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
self.lab_logger().trace(
|
||||
f"[Host Node] Created ActionClient (Local): {action_id}"
|
||||
) # 子设备再创建用的是Discover发现的
|
||||
# from unilabos.app.mq import mqtt_client
|
||||
# from unilabos.app.comm_factory import get_communication_client
|
||||
# comm_client = get_communication_client()
|
||||
# info_with_schema = ros_action_to_json_schema(action_type)
|
||||
# mqtt_client.publish_actions(action_name, {
|
||||
# comm_client.publish_actions(action_name, {
|
||||
# "device_id": device_id,
|
||||
# "device_type": device_config["class"],
|
||||
# "action_name": action_name,
|
||||
@@ -591,21 +618,15 @@ class HostNode(BaseROS2DeviceNode):
|
||||
if hasattr(bridge, "publish_device_status"):
|
||||
bridge.publish_device_status(self.device_status, device_id, property_name)
|
||||
if bCreate:
|
||||
self.lab_logger().trace(
|
||||
f"Status created: {device_id}.{property_name} = {msg.data}"
|
||||
)
|
||||
self.lab_logger().trace(f"Status created: {device_id}.{property_name} = {msg.data}")
|
||||
else:
|
||||
self.lab_logger().debug(
|
||||
f"Status updated: {device_id}.{property_name} = {msg.data}"
|
||||
)
|
||||
self.lab_logger().debug(f"Status updated: {device_id}.{property_name} = {msg.data}")
|
||||
|
||||
def send_goal(
|
||||
self,
|
||||
device_id: str,
|
||||
item: "QueueItem",
|
||||
action_type: str,
|
||||
action_name: str,
|
||||
action_kwargs: Dict[str, Any],
|
||||
goal_uuid: Optional[str] = None,
|
||||
server_info: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
"""
|
||||
@@ -619,15 +640,20 @@ class HostNode(BaseROS2DeviceNode):
|
||||
goal_uuid: 目标UUID,如果为None则自动生成
|
||||
server_info: 服务器发送信息,包含发送时间戳等
|
||||
"""
|
||||
u = uuid.UUID(item.job_id)
|
||||
device_id = item.device_id
|
||||
action_name = item.action_name
|
||||
if action_type.startswith("UniLabJsonCommand"):
|
||||
if action_name.startswith("auto-"):
|
||||
action_name = action_name[5:]
|
||||
action_id = f"/devices/{device_id}/_execute_driver_command"
|
||||
action_kwargs = {
|
||||
"string": json.dumps({
|
||||
"function_name": action_name,
|
||||
"function_args": action_kwargs,
|
||||
})
|
||||
"string": json.dumps(
|
||||
{
|
||||
"function_name": action_name,
|
||||
"function_args": action_kwargs,
|
||||
}
|
||||
)
|
||||
}
|
||||
if action_type.startswith("UniLabJsonCommandAsync"):
|
||||
action_id = f"/devices/{device_id}/_execute_driver_command_async"
|
||||
@@ -644,53 +670,47 @@ class HostNode(BaseROS2DeviceNode):
|
||||
|
||||
self.lab_logger().info(f"[Host Node] Sending goal for {action_id}: {goal_msg}")
|
||||
action_client.wait_for_server()
|
||||
|
||||
uuid_str = goal_uuid
|
||||
if goal_uuid is not None:
|
||||
u = uuid.UUID(goal_uuid)
|
||||
goal_uuid_obj = UUID(uuid=list(u.bytes))
|
||||
else:
|
||||
goal_uuid_obj = None
|
||||
goal_uuid_obj = UUID(uuid=list(u.bytes))
|
||||
|
||||
future = action_client.send_goal_async(
|
||||
goal_msg,
|
||||
feedback_callback=lambda feedback_msg: self.feedback_callback(action_id, uuid_str, feedback_msg),
|
||||
feedback_callback=lambda feedback_msg: self.feedback_callback(item, action_id, feedback_msg),
|
||||
goal_uuid=goal_uuid_obj,
|
||||
)
|
||||
future.add_done_callback(lambda future: self.goal_response_callback(action_id, uuid_str, future))
|
||||
future.add_done_callback(
|
||||
lambda future: self.goal_response_callback(item, action_id, future)
|
||||
)
|
||||
|
||||
def goal_response_callback(self, action_id: str, uuid_str: Optional[str], future) -> None:
|
||||
def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None:
|
||||
"""目标响应回调"""
|
||||
goal_handle = future.result()
|
||||
if not goal_handle.accepted:
|
||||
self.lab_logger().warning(f"[Host Node] Goal {action_id} ({uuid_str}) rejected")
|
||||
self.lab_logger().warning(f"[Host Node] Goal {item.action_name} ({item.job_id}) rejected")
|
||||
return
|
||||
|
||||
self.lab_logger().info(f"[Host Node] Goal {action_id} ({uuid_str}) accepted")
|
||||
if uuid_str:
|
||||
self._goals[uuid_str] = goal_handle
|
||||
goal_handle.get_result_async().add_done_callback(
|
||||
lambda future: self.get_result_callback(action_id, uuid_str, future)
|
||||
)
|
||||
self.lab_logger().info(f"[Host Node] Goal {action_id} ({item.job_id}) accepted")
|
||||
self._goals[item.job_id] = goal_handle
|
||||
goal_handle.get_result_async().add_done_callback(
|
||||
lambda future: self.get_result_callback(item, action_id, future)
|
||||
)
|
||||
|
||||
def feedback_callback(self, action_id: str, uuid_str: Optional[str], feedback_msg) -> None:
|
||||
def feedback_callback(self, item: "QueueItem", action_id: str, feedback_msg) -> None:
|
||||
"""反馈回调"""
|
||||
feedback_data = convert_from_ros_msg(feedback_msg)
|
||||
feedback_data.pop("goal_id")
|
||||
self.lab_logger().debug(f"[Host Node] Feedback for {action_id} ({uuid_str}): {feedback_data}")
|
||||
self.lab_logger().trace(f"[Host Node] Feedback for {action_id} ({item.job_id}): {feedback_data}")
|
||||
|
||||
if uuid_str:
|
||||
for bridge in self.bridges:
|
||||
if hasattr(bridge, "publish_job_status"):
|
||||
bridge.publish_job_status(feedback_data, uuid_str, "running")
|
||||
for bridge in self.bridges:
|
||||
if hasattr(bridge, "publish_job_status"):
|
||||
bridge.publish_job_status(feedback_data, item, "running")
|
||||
|
||||
def get_result_callback(self, action_id: str, uuid_str: Optional[str], future) -> None:
|
||||
def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None:
|
||||
"""获取结果回调"""
|
||||
job_id = item.job_id
|
||||
result_msg = future.result().result
|
||||
result_data = convert_from_ros_msg(result_msg)
|
||||
status = "success"
|
||||
return_info_str = result_data.get("return_info")
|
||||
|
||||
if return_info_str is not None:
|
||||
try:
|
||||
ret = json.loads(return_info_str)
|
||||
@@ -710,13 +730,13 @@ class HostNode(BaseROS2DeviceNode):
|
||||
status = "failed"
|
||||
return_info_str = serialize_result_info("缺少return_info", False, result_data)
|
||||
|
||||
self.lab_logger().info(f"[Host Node] Result for {action_id} ({uuid_str}): {status}")
|
||||
self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id}): {status}")
|
||||
self.lab_logger().debug(f"[Host Node] Result data: {result_data}")
|
||||
|
||||
if uuid_str:
|
||||
if job_id:
|
||||
for bridge in self.bridges:
|
||||
if hasattr(bridge, "publish_job_status"):
|
||||
bridge.publish_job_status(result_data, uuid_str, status, return_info_str)
|
||||
bridge.publish_job_status(result_data, item, status, return_info_str)
|
||||
|
||||
def cancel_goal(self, goal_uuid: str) -> None:
|
||||
"""取消目标"""
|
||||
@@ -726,14 +746,14 @@ class HostNode(BaseROS2DeviceNode):
|
||||
else:
|
||||
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid} not found, cannot cancel")
|
||||
|
||||
def get_goal_status(self, uuid_str: str) -> int:
|
||||
def get_goal_status(self, job_id: str) -> int:
|
||||
"""获取目标状态"""
|
||||
if uuid_str in self._goals:
|
||||
g = self._goals[uuid_str]
|
||||
if job_id in self._goals:
|
||||
g = self._goals[job_id]
|
||||
status = g.status
|
||||
self.lab_logger().debug(f"[Host Node] Goal status for {uuid_str}: {status}")
|
||||
self.lab_logger().debug(f"[Host Node] Goal status for {job_id}: {status}")
|
||||
return status
|
||||
self.lab_logger().warning(f"[Host Node] Goal {uuid_str} not found, status unknown")
|
||||
self.lab_logger().warning(f"[Host Node] Goal {job_id} not found, status unknown")
|
||||
return GoalStatus.STATUS_UNKNOWN
|
||||
|
||||
"""Controller Node"""
|
||||
@@ -802,7 +822,7 @@ class HostNode(BaseROS2DeviceNode):
|
||||
"""
|
||||
self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
|
||||
try:
|
||||
from unilabos.app.mq import mqtt_client
|
||||
from unilabos.app.communication import get_communication_client
|
||||
|
||||
info = json.loads(request.command)
|
||||
if "SYNC_SLAVE_NODE_INFO" in info:
|
||||
@@ -811,9 +831,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
edge_device_id = info["edge_device_id"]
|
||||
self.device_machine_names[edge_device_id] = machine_name
|
||||
else:
|
||||
comm_client = get_communication_client()
|
||||
registry_config = info["registry_config"]
|
||||
for device_config in registry_config:
|
||||
mqtt_client.publish_registry(device_config["id"], device_config)
|
||||
comm_client.publish_registry(device_config["id"], device_config)
|
||||
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
|
||||
response.response = "OK"
|
||||
except Exception as e:
|
||||
@@ -840,6 +861,7 @@ class HostNode(BaseROS2DeviceNode):
|
||||
success = False
|
||||
if len(self.bridges) > 0: # 边的提交待定
|
||||
from unilabos.app.web.client import HTTPClient
|
||||
|
||||
client: HTTPClient = self.bridges[-1]
|
||||
r = client.resource_add(add_schema(resources), False)
|
||||
success = bool(r)
|
||||
@@ -848,6 +870,7 @@ class HostNode(BaseROS2DeviceNode):
|
||||
|
||||
if success:
|
||||
from unilabos.resources.graphio import physical_setup_graph
|
||||
|
||||
for resource in resources:
|
||||
if resource.get("id") not in physical_setup_graph.nodes:
|
||||
physical_setup_graph.add_node(resource["id"], **resource)
|
||||
@@ -988,9 +1011,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
send_timestamp = time.time()
|
||||
|
||||
# 发送ping
|
||||
from unilabos.app.mq import mqtt_client
|
||||
from unilabos.app.communication import get_communication_client
|
||||
|
||||
mqtt_client.send_ping(ping_id, send_timestamp)
|
||||
comm_client = get_communication_client()
|
||||
comm_client.send_ping(ping_id, send_timestamp)
|
||||
|
||||
# 等待pong响应
|
||||
timeout = 10.0
|
||||
|
||||
Reference in New Issue
Block a user