mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-07 15:35:10 +00:00
Merge branch 'dev' into feat/add_custom_widget
This commit is contained in:
@@ -24,7 +24,7 @@ extensions = [
|
|||||||
"sphinx.ext.autodoc",
|
"sphinx.ext.autodoc",
|
||||||
"sphinx.ext.napoleon", # 如果您使用 Google 或 NumPy 风格的 docstrings
|
"sphinx.ext.napoleon", # 如果您使用 Google 或 NumPy 风格的 docstrings
|
||||||
"sphinx_rtd_theme",
|
"sphinx_rtd_theme",
|
||||||
"sphinxcontrib.mermaid"
|
"sphinxcontrib.mermaid",
|
||||||
]
|
]
|
||||||
|
|
||||||
source_suffix = {
|
source_suffix = {
|
||||||
@@ -58,7 +58,7 @@ html_theme = "sphinx_rtd_theme"
|
|||||||
|
|
||||||
# sphinx-book-theme 主题选项
|
# sphinx-book-theme 主题选项
|
||||||
html_theme_options = {
|
html_theme_options = {
|
||||||
"repository_url": "https://github.com/用户名/Uni-Lab",
|
"repository_url": "https://github.com/deepmodeling/Uni-Lab-OS",
|
||||||
"use_repository_button": True,
|
"use_repository_button": True,
|
||||||
"use_issues_button": True,
|
"use_issues_button": True,
|
||||||
"use_edit_page_button": True,
|
"use_edit_page_button": True,
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -12,3 +12,7 @@ sphinx-copybutton>=0.5.0
|
|||||||
|
|
||||||
# 用于自动摘要生成
|
# 用于自动摘要生成
|
||||||
sphinx-autobuild>=2024.2.4
|
sphinx-autobuild>=2024.2.4
|
||||||
|
|
||||||
|
# 用于PDF导出 (rinohtype方案,纯Python无需LaTeX)
|
||||||
|
rinohtype>=0.5.4
|
||||||
|
sphinx-simplepdf>=1.6.0
|
||||||
@@ -19,6 +19,11 @@ if unilabos_dir not in sys.path:
|
|||||||
|
|
||||||
from unilabos.utils.banner_print import print_status, print_unilab_banner
|
from unilabos.utils.banner_print import print_status, print_unilab_banner
|
||||||
from unilabos.config.config import load_config, BasicConfig, HTTPConfig
|
from unilabos.config.config import load_config, BasicConfig, HTTPConfig
|
||||||
|
from unilabos.app.utils import cleanup_for_restart
|
||||||
|
|
||||||
|
# Global restart flags (used by ws_client and web/server)
|
||||||
|
_restart_requested: bool = False
|
||||||
|
_restart_reason: str = ""
|
||||||
|
|
||||||
|
|
||||||
def load_config_from_file(config_path):
|
def load_config_from_file(config_path):
|
||||||
@@ -156,6 +161,11 @@ def parse_args():
|
|||||||
default=False,
|
default=False,
|
||||||
help="Complete registry information",
|
help="Complete registry information",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--no_update_feedback",
|
||||||
|
action="store_true",
|
||||||
|
help="Disable sending update feedback to server",
|
||||||
|
)
|
||||||
# workflow upload subcommand
|
# workflow upload subcommand
|
||||||
workflow_parser = subparsers.add_parser(
|
workflow_parser = subparsers.add_parser(
|
||||||
"workflow_upload",
|
"workflow_upload",
|
||||||
@@ -297,6 +307,7 @@ def main():
|
|||||||
BasicConfig.is_host_mode = not args_dict.get("is_slave", False)
|
BasicConfig.is_host_mode = not args_dict.get("is_slave", False)
|
||||||
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
|
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
|
||||||
BasicConfig.upload_registry = args_dict.get("upload_registry", False)
|
BasicConfig.upload_registry = args_dict.get("upload_registry", False)
|
||||||
|
BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False)
|
||||||
BasicConfig.communication_protocol = "websocket"
|
BasicConfig.communication_protocol = "websocket"
|
||||||
machine_name = os.popen("hostname").read().strip()
|
machine_name = os.popen("hostname").read().strip()
|
||||||
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
|
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
|
||||||
@@ -497,13 +508,19 @@ def main():
|
|||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
else:
|
else:
|
||||||
start_backend(**args_dict)
|
start_backend(**args_dict)
|
||||||
start_server(
|
restart_requested = start_server(
|
||||||
open_browser=not args_dict["disable_browser"],
|
open_browser=not args_dict["disable_browser"],
|
||||||
port=BasicConfig.port,
|
port=BasicConfig.port,
|
||||||
)
|
)
|
||||||
|
if restart_requested:
|
||||||
|
print_status("[Main] Restart requested, cleaning up...", "info")
|
||||||
|
cleanup_for_restart()
|
||||||
|
return
|
||||||
else:
|
else:
|
||||||
start_backend(**args_dict)
|
start_backend(**args_dict)
|
||||||
start_server(
|
|
||||||
|
# 启动服务器(默认支持WebSocket触发重启)
|
||||||
|
restart_requested = start_server(
|
||||||
open_browser=not args_dict["disable_browser"],
|
open_browser=not args_dict["disable_browser"],
|
||||||
port=BasicConfig.port,
|
port=BasicConfig.port,
|
||||||
)
|
)
|
||||||
|
|||||||
144
unilabos/app/utils.py
Normal file
144
unilabos/app/utils.py
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
"""
|
||||||
|
UniLabOS 应用工具函数
|
||||||
|
|
||||||
|
提供清理、重启等工具函数
|
||||||
|
"""
|
||||||
|
|
||||||
|
import gc
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from unilabos.utils.banner_print import print_status
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_for_restart() -> bool:
|
||||||
|
"""
|
||||||
|
Clean up all resources for restart without exiting the process.
|
||||||
|
|
||||||
|
This function prepares the system for re-initialization by:
|
||||||
|
1. Stopping all communication clients
|
||||||
|
2. Destroying ROS nodes
|
||||||
|
3. Resetting singletons
|
||||||
|
4. Waiting for threads to finish
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if cleanup was successful, False otherwise
|
||||||
|
"""
|
||||||
|
print_status("[Restart] Starting cleanup for restart...", "info")
|
||||||
|
|
||||||
|
# Step 1: Stop WebSocket communication client
|
||||||
|
print_status("[Restart] Step 1: Stopping WebSocket client...", "info")
|
||||||
|
try:
|
||||||
|
from unilabos.app.communication import get_communication_client
|
||||||
|
|
||||||
|
comm_client = get_communication_client()
|
||||||
|
if comm_client is not None:
|
||||||
|
comm_client.stop()
|
||||||
|
print_status("[Restart] WebSocket client stopped", "info")
|
||||||
|
except Exception as e:
|
||||||
|
print_status(f"[Restart] Error stopping WebSocket: {e}", "warning")
|
||||||
|
|
||||||
|
# Step 2: Get HostNode and cleanup ROS
|
||||||
|
print_status("[Restart] Step 2: Cleaning up ROS nodes...", "info")
|
||||||
|
try:
|
||||||
|
from unilabos.ros.nodes.presets.host_node import HostNode
|
||||||
|
import rclpy
|
||||||
|
from rclpy.timer import Timer
|
||||||
|
|
||||||
|
host_instance = HostNode.get_instance(timeout=5)
|
||||||
|
if host_instance is not None:
|
||||||
|
print_status(f"[Restart] Found HostNode: {host_instance.device_id}", "info")
|
||||||
|
|
||||||
|
# Gracefully shutdown background threads
|
||||||
|
print_status("[Restart] Shutting down background threads...", "info")
|
||||||
|
HostNode.shutdown_background_threads(timeout=5.0)
|
||||||
|
print_status("[Restart] Background threads shutdown complete", "info")
|
||||||
|
|
||||||
|
# Stop discovery timer
|
||||||
|
if hasattr(host_instance, "_discovery_timer") and isinstance(host_instance._discovery_timer, Timer):
|
||||||
|
host_instance._discovery_timer.cancel()
|
||||||
|
print_status("[Restart] Discovery timer cancelled", "info")
|
||||||
|
|
||||||
|
# Destroy device nodes
|
||||||
|
device_count = len(host_instance.devices_instances)
|
||||||
|
print_status(f"[Restart] Destroying {device_count} device instances...", "info")
|
||||||
|
for device_id, device_node in list(host_instance.devices_instances.items()):
|
||||||
|
try:
|
||||||
|
if hasattr(device_node, "ros_node_instance") and device_node.ros_node_instance is not None:
|
||||||
|
device_node.ros_node_instance.destroy_node()
|
||||||
|
print_status(f"[Restart] Device {device_id} destroyed", "info")
|
||||||
|
except Exception as e:
|
||||||
|
print_status(f"[Restart] Error destroying device {device_id}: {e}", "warning")
|
||||||
|
|
||||||
|
# Clear devices instances
|
||||||
|
host_instance.devices_instances.clear()
|
||||||
|
host_instance.devices_names.clear()
|
||||||
|
|
||||||
|
# Destroy host node
|
||||||
|
try:
|
||||||
|
host_instance.destroy_node()
|
||||||
|
print_status("[Restart] HostNode destroyed", "info")
|
||||||
|
except Exception as e:
|
||||||
|
print_status(f"[Restart] Error destroying HostNode: {e}", "warning")
|
||||||
|
|
||||||
|
# Reset HostNode state
|
||||||
|
HostNode.reset_state()
|
||||||
|
print_status("[Restart] HostNode state reset", "info")
|
||||||
|
|
||||||
|
# Shutdown executor first (to stop executor.spin() gracefully)
|
||||||
|
if hasattr(rclpy, "__executor") and rclpy.__executor is not None:
|
||||||
|
try:
|
||||||
|
rclpy.__executor.shutdown()
|
||||||
|
rclpy.__executor = None # Clear for restart
|
||||||
|
print_status("[Restart] ROS executor shutdown complete", "info")
|
||||||
|
except Exception as e:
|
||||||
|
print_status(f"[Restart] Error shutting down executor: {e}", "warning")
|
||||||
|
|
||||||
|
# Shutdown rclpy
|
||||||
|
if rclpy.ok():
|
||||||
|
rclpy.shutdown()
|
||||||
|
print_status("[Restart] rclpy shutdown complete", "info")
|
||||||
|
|
||||||
|
except ImportError as e:
|
||||||
|
print_status(f"[Restart] ROS modules not available: {e}", "warning")
|
||||||
|
except Exception as e:
|
||||||
|
print_status(f"[Restart] Error in ROS cleanup: {e}", "warning")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Step 3: Reset communication client singleton
|
||||||
|
print_status("[Restart] Step 3: Resetting singletons...", "info")
|
||||||
|
try:
|
||||||
|
from unilabos.app import communication
|
||||||
|
|
||||||
|
if hasattr(communication, "_communication_client"):
|
||||||
|
communication._communication_client = None
|
||||||
|
print_status("[Restart] Communication client singleton reset", "info")
|
||||||
|
except Exception as e:
|
||||||
|
print_status(f"[Restart] Error resetting communication singleton: {e}", "warning")
|
||||||
|
|
||||||
|
# Step 4: Wait for threads to finish
|
||||||
|
print_status("[Restart] Step 4: Waiting for threads to finish...", "info")
|
||||||
|
time.sleep(3) # Give threads time to finish
|
||||||
|
|
||||||
|
# Check remaining threads
|
||||||
|
remaining_threads = []
|
||||||
|
for t in threading.enumerate():
|
||||||
|
if t.name != "MainThread" and t.is_alive():
|
||||||
|
remaining_threads.append(t.name)
|
||||||
|
|
||||||
|
if remaining_threads:
|
||||||
|
print_status(
|
||||||
|
f"[Restart] Warning: {len(remaining_threads)} threads still running: {remaining_threads}", "warning"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print_status("[Restart] All threads stopped", "info")
|
||||||
|
|
||||||
|
# Step 5: Force garbage collection
|
||||||
|
print_status("[Restart] Step 5: Running garbage collection...", "info")
|
||||||
|
gc.collect()
|
||||||
|
gc.collect() # Run twice for weak references
|
||||||
|
print_status("[Restart] Garbage collection complete", "info")
|
||||||
|
|
||||||
|
print_status("[Restart] Cleanup complete. Ready for re-initialization.", "info")
|
||||||
|
return True
|
||||||
@@ -6,7 +6,6 @@ Web服务器模块
|
|||||||
|
|
||||||
import webbrowser
|
import webbrowser
|
||||||
|
|
||||||
import uvicorn
|
|
||||||
from fastapi import FastAPI, Request
|
from fastapi import FastAPI, Request
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from starlette.responses import Response
|
from starlette.responses import Response
|
||||||
@@ -96,7 +95,7 @@ def setup_server() -> FastAPI:
|
|||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
||||||
def start_server(host: str = "0.0.0.0", port: int = 8002, open_browser: bool = True) -> None:
|
def start_server(host: str = "0.0.0.0", port: int = 8002, open_browser: bool = True) -> bool:
|
||||||
"""
|
"""
|
||||||
启动服务器
|
启动服务器
|
||||||
|
|
||||||
@@ -104,7 +103,14 @@ def start_server(host: str = "0.0.0.0", port: int = 8002, open_browser: bool = T
|
|||||||
host: 服务器主机
|
host: 服务器主机
|
||||||
port: 服务器端口
|
port: 服务器端口
|
||||||
open_browser: 是否自动打开浏览器
|
open_browser: 是否自动打开浏览器
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if restart was requested, False otherwise
|
||||||
"""
|
"""
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from uvicorn import Config, Server
|
||||||
|
|
||||||
# 设置服务器
|
# 设置服务器
|
||||||
setup_server()
|
setup_server()
|
||||||
|
|
||||||
@@ -123,7 +129,37 @@ def start_server(host: str = "0.0.0.0", port: int = 8002, open_browser: bool = T
|
|||||||
|
|
||||||
# 启动服务器
|
# 启动服务器
|
||||||
info(f"[Web] 启动FastAPI服务器: {host}:{port}")
|
info(f"[Web] 启动FastAPI服务器: {host}:{port}")
|
||||||
uvicorn.run(app, host=host, port=port, log_config=log_config)
|
|
||||||
|
# 使用支持重启的模式
|
||||||
|
config = Config(app=app, host=host, port=port, log_config=log_config)
|
||||||
|
server = Server(config)
|
||||||
|
|
||||||
|
# 启动服务器线程
|
||||||
|
server_thread = threading.Thread(target=server.run, daemon=True, name="uvicorn_server")
|
||||||
|
server_thread.start()
|
||||||
|
|
||||||
|
info("[Web] Server started, monitoring for restart requests...")
|
||||||
|
|
||||||
|
# 监控重启标志
|
||||||
|
import unilabos.app.main as main_module
|
||||||
|
|
||||||
|
while server_thread.is_alive():
|
||||||
|
if hasattr(main_module, "_restart_requested") and main_module._restart_requested:
|
||||||
|
info(
|
||||||
|
f"[Web] Restart requested via WebSocket, reason: {getattr(main_module, '_restart_reason', 'unknown')}"
|
||||||
|
)
|
||||||
|
main_module._restart_requested = False
|
||||||
|
|
||||||
|
# 停止服务器
|
||||||
|
server.should_exit = True
|
||||||
|
server_thread.join(timeout=5)
|
||||||
|
|
||||||
|
info("[Web] Server stopped, ready for restart")
|
||||||
|
return True
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
# 当脚本直接运行时启动服务器
|
# 当脚本直接运行时启动服务器
|
||||||
|
|||||||
@@ -359,7 +359,7 @@ class MessageProcessor:
|
|||||||
self.device_manager = device_manager
|
self.device_manager = device_manager
|
||||||
self.queue_processor = None # 延迟设置
|
self.queue_processor = None # 延迟设置
|
||||||
self.websocket_client = None # 延迟设置
|
self.websocket_client = None # 延迟设置
|
||||||
self.session_id = ""
|
self.session_id = str(uuid.uuid4())[:6] # 产生一个随机的session_id
|
||||||
|
|
||||||
# WebSocket连接
|
# WebSocket连接
|
||||||
self.websocket = None
|
self.websocket = None
|
||||||
@@ -488,7 +488,16 @@ class MessageProcessor:
|
|||||||
async for message in self.websocket:
|
async for message in self.websocket:
|
||||||
try:
|
try:
|
||||||
data = json.loads(message)
|
data = json.loads(message)
|
||||||
await self._process_message(data)
|
message_type = data.get("action", "")
|
||||||
|
message_data = data.get("data")
|
||||||
|
if self.session_id and self.session_id == data.get("edge_session"):
|
||||||
|
await self._process_message(message_type, message_data)
|
||||||
|
else:
|
||||||
|
if message_type.endswith("_material"):
|
||||||
|
logger.trace(f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}")
|
||||||
|
logger.debug(f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}")
|
||||||
|
else:
|
||||||
|
await self._process_message(message_type, message_data)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
|
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -554,11 +563,8 @@ class MessageProcessor:
|
|||||||
finally:
|
finally:
|
||||||
logger.debug("[MessageProcessor] Send handler stopped")
|
logger.debug("[MessageProcessor] Send handler stopped")
|
||||||
|
|
||||||
async def _process_message(self, data: Dict[str, Any]):
|
async def _process_message(self, message_type: str, message_data: Dict[str, Any]):
|
||||||
"""处理收到的消息"""
|
"""处理收到的消息"""
|
||||||
message_type = data.get("action", "")
|
|
||||||
message_data = data.get("data")
|
|
||||||
|
|
||||||
logger.debug(f"[MessageProcessor] Processing message: {message_type}")
|
logger.debug(f"[MessageProcessor] Processing message: {message_type}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -571,16 +577,19 @@ class MessageProcessor:
|
|||||||
elif message_type == "cancel_action" or message_type == "cancel_task":
|
elif message_type == "cancel_action" or message_type == "cancel_task":
|
||||||
await self._handle_cancel_action(message_data)
|
await self._handle_cancel_action(message_data)
|
||||||
elif message_type == "add_material":
|
elif message_type == "add_material":
|
||||||
|
# noinspection PyTypeChecker
|
||||||
await self._handle_resource_tree_update(message_data, "add")
|
await self._handle_resource_tree_update(message_data, "add")
|
||||||
elif message_type == "update_material":
|
elif message_type == "update_material":
|
||||||
|
# noinspection PyTypeChecker
|
||||||
await self._handle_resource_tree_update(message_data, "update")
|
await self._handle_resource_tree_update(message_data, "update")
|
||||||
elif message_type == "remove_material":
|
elif message_type == "remove_material":
|
||||||
|
# noinspection PyTypeChecker
|
||||||
await self._handle_resource_tree_update(message_data, "remove")
|
await self._handle_resource_tree_update(message_data, "remove")
|
||||||
elif message_type == "session_id":
|
# elif message_type == "session_id":
|
||||||
self.session_id = message_data.get("session_id")
|
# self.session_id = message_data.get("session_id")
|
||||||
logger.info(f"[MessageProcessor] Session ID: {self.session_id}")
|
# logger.info(f"[MessageProcessor] Session ID: {self.session_id}")
|
||||||
elif message_type == "request_reload":
|
elif message_type == "request_restart":
|
||||||
await self._handle_request_reload(message_data)
|
await self._handle_request_restart(message_data)
|
||||||
else:
|
else:
|
||||||
logger.debug(f"[MessageProcessor] Unknown message type: {message_type}")
|
logger.debug(f"[MessageProcessor] Unknown message type: {message_type}")
|
||||||
|
|
||||||
@@ -839,7 +848,7 @@ class MessageProcessor:
|
|||||||
device_action_groups[key_add].append(item["uuid"])
|
device_action_groups[key_add].append(item["uuid"])
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[MessageProcessor] Resource migrated: {item['uuid'][:8]} from {device_old_id} to {device_id}"
|
f"[资源同步] 跨站Transfer: {item['uuid'][:8]} from {device_old_id} to {device_id}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# 正常update
|
# 正常update
|
||||||
@@ -854,11 +863,11 @@ class MessageProcessor:
|
|||||||
device_action_groups[key] = []
|
device_action_groups[key] = []
|
||||||
device_action_groups[key].append(item["uuid"])
|
device_action_groups[key].append(item["uuid"])
|
||||||
|
|
||||||
logger.info(f"触发物料更新 {action} 分组数量: {len(device_action_groups)}, 总数量: {len(resource_uuid_list)}")
|
logger.trace(f"[资源同步] 动作 {action} 分组数量: {len(device_action_groups)}, 总数量: {len(resource_uuid_list)}")
|
||||||
|
|
||||||
# 为每个(device_id, action)创建独立的更新线程
|
# 为每个(device_id, action)创建独立的更新线程
|
||||||
for (device_id, actual_action), items in device_action_groups.items():
|
for (device_id, actual_action), items in device_action_groups.items():
|
||||||
logger.info(f"设备 {device_id} 物料更新 {actual_action} 数量: {len(items)}")
|
logger.trace(f"[资源同步] {device_id} 物料动作 {actual_action} 数量: {len(items)}")
|
||||||
|
|
||||||
def _notify_resource_tree(dev_id, act, item_list):
|
def _notify_resource_tree(dev_id, act, item_list):
|
||||||
try:
|
try:
|
||||||
@@ -890,19 +899,48 @@ class MessageProcessor:
|
|||||||
)
|
)
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
async def _handle_request_reload(self, data: Dict[str, Any]):
|
async def _handle_request_restart(self, data: Dict[str, Any]):
|
||||||
"""
|
"""
|
||||||
处理重载请求
|
处理重启请求
|
||||||
|
|
||||||
当LabGo发送request_reload时,重新发送设备注册信息
|
当LabGo发送request_restart时,执行清理并触发重启
|
||||||
"""
|
"""
|
||||||
reason = data.get("reason", "unknown")
|
reason = data.get("reason", "unknown")
|
||||||
logger.info(f"[MessageProcessor] Received reload request, reason: {reason}")
|
delay = data.get("delay", 2) # 默认延迟2秒
|
||||||
|
logger.info(f"[MessageProcessor] Received restart request, reason: {reason}, delay: {delay}s")
|
||||||
|
|
||||||
# 重新发送host_node_ready信息
|
# 发送确认消息
|
||||||
if self.websocket_client:
|
if self.websocket_client:
|
||||||
self.websocket_client.publish_host_ready()
|
await self.websocket_client.send_message({
|
||||||
logger.info("[MessageProcessor] Re-sent host_node_ready after reload request")
|
"action": "restart_acknowledged",
|
||||||
|
"data": {"reason": reason, "delay": delay}
|
||||||
|
})
|
||||||
|
|
||||||
|
# 设置全局重启标志
|
||||||
|
import unilabos.app.main as main_module
|
||||||
|
main_module._restart_requested = True
|
||||||
|
main_module._restart_reason = reason
|
||||||
|
|
||||||
|
# 延迟后执行清理
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
|
||||||
|
# 在新线程中执行清理,避免阻塞当前事件循环
|
||||||
|
def do_cleanup():
|
||||||
|
import time
|
||||||
|
time.sleep(0.5) # 给当前消息处理完成的时间
|
||||||
|
logger.info(f"[MessageProcessor] Starting cleanup for restart, reason: {reason}")
|
||||||
|
try:
|
||||||
|
from unilabos.app.utils import cleanup_for_restart
|
||||||
|
if cleanup_for_restart():
|
||||||
|
logger.info("[MessageProcessor] Cleanup successful, main() will restart")
|
||||||
|
else:
|
||||||
|
logger.error("[MessageProcessor] Cleanup failed")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[MessageProcessor] Error during cleanup: {e}")
|
||||||
|
|
||||||
|
cleanup_thread = threading.Thread(target=do_cleanup, name="RestartCleanupThread", daemon=True)
|
||||||
|
cleanup_thread.start()
|
||||||
|
logger.info(f"[MessageProcessor] Restart cleanup scheduled")
|
||||||
|
|
||||||
async def _send_action_state_response(
|
async def _send_action_state_response(
|
||||||
self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int
|
self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ class BasicConfig:
|
|||||||
upload_registry = False
|
upload_registry = False
|
||||||
machine_name = "undefined"
|
machine_name = "undefined"
|
||||||
vis_2d_enable = False
|
vis_2d_enable = False
|
||||||
|
no_update_feedback = False
|
||||||
enable_resource_load = True
|
enable_resource_load = True
|
||||||
communication_protocol = "websocket"
|
communication_protocol = "websocket"
|
||||||
startup_json_path = None # 填写绝对路径
|
startup_json_path = None # 填写绝对路径
|
||||||
|
|||||||
@@ -9278,7 +9278,13 @@ liquid_handler.prcxi:
|
|||||||
z: 0.0
|
z: 0.0
|
||||||
sample_id: ''
|
sample_id: ''
|
||||||
type: ''
|
type: ''
|
||||||
handles: {}
|
handles:
|
||||||
|
input:
|
||||||
|
- data_key: wells
|
||||||
|
data_source: handle
|
||||||
|
data_type: resource
|
||||||
|
handler_key: input_wells
|
||||||
|
label: InputWells
|
||||||
placeholder_keys:
|
placeholder_keys:
|
||||||
wells: unilabos_resources
|
wells: unilabos_resources
|
||||||
result: {}
|
result: {}
|
||||||
|
|||||||
@@ -124,11 +124,25 @@ class Registry:
|
|||||||
"output": [
|
"output": [
|
||||||
{
|
{
|
||||||
"handler_key": "labware",
|
"handler_key": "labware",
|
||||||
"label": "Labware",
|
|
||||||
"data_type": "resource",
|
"data_type": "resource",
|
||||||
"data_source": "handle",
|
"label": "Labware",
|
||||||
"data_key": "liquid",
|
"data_source": "executor",
|
||||||
}
|
"data_key": "created_resource_tree.@flatten",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"handler_key": "liquid_slots",
|
||||||
|
"data_type": "resource",
|
||||||
|
"label": "LiquidSlots",
|
||||||
|
"data_source": "executor",
|
||||||
|
"data_key": "liquid_input_resource_tree.@flatten",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"handler_key": "materials",
|
||||||
|
"data_type": "resource",
|
||||||
|
"label": "AllMaterials",
|
||||||
|
"data_source": "executor",
|
||||||
|
"data_key": "[created_resource_tree,liquid_input_resource_tree].@flatten.@flatten",
|
||||||
|
},
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"placeholder_keys": {
|
"placeholder_keys": {
|
||||||
@@ -187,7 +201,17 @@ class Registry:
|
|||||||
"resources": "unilabos_resources",
|
"resources": "unilabos_resources",
|
||||||
},
|
},
|
||||||
"goal_default": {},
|
"goal_default": {},
|
||||||
"handles": {},
|
"handles": {
|
||||||
|
"input": [
|
||||||
|
{
|
||||||
|
"handler_key": "input_resources",
|
||||||
|
"data_type": "resource",
|
||||||
|
"label": "InputResources",
|
||||||
|
"data_source": "handle",
|
||||||
|
"data_key": "resources", # 不为空
|
||||||
|
},
|
||||||
|
]
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ class RegularContainer(Container):
|
|||||||
def get_regular_container(name="container"):
|
def get_regular_container(name="container"):
|
||||||
r = RegularContainer(name=name)
|
r = RegularContainer(name=name)
|
||||||
r.category = "container"
|
r.category = "container"
|
||||||
return RegularContainer(name=name)
|
return r
|
||||||
|
|
||||||
#
|
#
|
||||||
# class RegularContainer(object):
|
# class RegularContainer(object):
|
||||||
|
|||||||
@@ -1151,11 +1151,7 @@ def initialize_resource(resource_config: dict, resource_type: Any = None) -> Uni
|
|||||||
if resource_class_config["type"] == "pylabrobot":
|
if resource_class_config["type"] == "pylabrobot":
|
||||||
resource_plr = RESOURCE(name=resource_config["name"])
|
resource_plr = RESOURCE(name=resource_config["name"])
|
||||||
if resource_type != ResourcePLR:
|
if resource_type != ResourcePLR:
|
||||||
tree_sets = ResourceTreeSet.from_plr_resources([resource_plr])
|
tree_sets = ResourceTreeSet.from_plr_resources([resource_plr], known_newly_created=True)
|
||||||
# r = resource_plr_to_ulab(resource_plr=resource_plr, parent_name=resource_config.get("parent", None))
|
|
||||||
# # r = resource_plr_to_ulab(resource_plr=resource_plr)
|
|
||||||
# if resource_config.get("position") is not None:
|
|
||||||
# r["position"] = resource_config["position"]
|
|
||||||
r = tree_sets.dump()
|
r = tree_sets.dump()
|
||||||
else:
|
else:
|
||||||
r = resource_plr
|
r = resource_plr
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import inspect
|
import inspect
|
||||||
import traceback
|
import traceback
|
||||||
import uuid
|
import uuid
|
||||||
from pydantic import BaseModel, field_serializer, field_validator
|
from pydantic import BaseModel, field_serializer, field_validator, ValidationError
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
|
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
|
||||||
|
|
||||||
@@ -147,20 +147,24 @@ class ResourceDictInstance(object):
|
|||||||
if not content.get("extra"): # MagicCode
|
if not content.get("extra"): # MagicCode
|
||||||
content["extra"] = {}
|
content["extra"] = {}
|
||||||
if "position" in content:
|
if "position" in content:
|
||||||
pose = content.get("pose",{})
|
pose = content.get("pose", {})
|
||||||
if "position" not in pose :
|
if "position" not in pose:
|
||||||
if "position" in content["position"]:
|
if "position" in content["position"]:
|
||||||
pose["position"] = content["position"]["position"]
|
pose["position"] = content["position"]["position"]
|
||||||
else:
|
else:
|
||||||
pose["position"] = {"x": 0, "y": 0, "z": 0}
|
pose["position"] = {"x": 0, "y": 0, "z": 0}
|
||||||
if "size" not in pose:
|
if "size" not in pose:
|
||||||
pose["size"] = {
|
pose["size"] = {
|
||||||
"width": content["config"].get("size_x", 0),
|
"width": content["config"].get("size_x", 0),
|
||||||
"height": content["config"].get("size_y", 0),
|
"height": content["config"].get("size_y", 0),
|
||||||
"depth": content["config"].get("size_z", 0)
|
"depth": content["config"].get("size_z", 0),
|
||||||
}
|
}
|
||||||
content["pose"] = pose
|
content["pose"] = pose
|
||||||
return ResourceDictInstance(ResourceDict.model_validate(content))
|
try:
|
||||||
|
res_dict = ResourceDict.model_validate(content)
|
||||||
|
return ResourceDictInstance(res_dict)
|
||||||
|
except ValidationError as err:
|
||||||
|
raise err
|
||||||
|
|
||||||
def get_plr_nested_dict(self) -> Dict[str, Any]:
|
def get_plr_nested_dict(self) -> Dict[str, Any]:
|
||||||
"""获取资源实例的嵌套字典表示"""
|
"""获取资源实例的嵌套字典表示"""
|
||||||
@@ -322,7 +326,7 @@ class ResourceTreeSet(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_plr_resources(cls, resources: List["PLRResource"]) -> "ResourceTreeSet":
|
def from_plr_resources(cls, resources: List["PLRResource"], known_newly_created=False) -> "ResourceTreeSet":
|
||||||
"""
|
"""
|
||||||
从plr资源创建ResourceTreeSet
|
从plr资源创建ResourceTreeSet
|
||||||
"""
|
"""
|
||||||
@@ -339,6 +343,8 @@ class ResourceTreeSet(object):
|
|||||||
}
|
}
|
||||||
if source in replace_info:
|
if source in replace_info:
|
||||||
return replace_info[source]
|
return replace_info[source]
|
||||||
|
elif source is None:
|
||||||
|
return ""
|
||||||
else:
|
else:
|
||||||
print("转换pylabrobot的时候,出现未知类型", source)
|
print("转换pylabrobot的时候,出现未知类型", source)
|
||||||
return source
|
return source
|
||||||
@@ -349,7 +355,8 @@ class ResourceTreeSet(object):
|
|||||||
if not uid:
|
if not uid:
|
||||||
uid = str(uuid.uuid4())
|
uid = str(uuid.uuid4())
|
||||||
res.unilabos_uuid = uid
|
res.unilabos_uuid = uid
|
||||||
logger.warning(f"{res}没有uuid,请设置后再传入,默认填充{uid}!\n{traceback.format_exc()}")
|
if not known_newly_created:
|
||||||
|
logger.warning(f"{res}没有uuid,请设置后再传入,默认填充{uid}!\n{traceback.format_exc()}")
|
||||||
|
|
||||||
# 获取unilabos_extra,默认为空字典
|
# 获取unilabos_extra,默认为空字典
|
||||||
extra = getattr(res, "unilabos_extra", {})
|
extra = getattr(res, "unilabos_extra", {})
|
||||||
@@ -448,7 +455,13 @@ class ResourceTreeSet(object):
|
|||||||
from pylabrobot.utils.object_parsing import find_subclass
|
from pylabrobot.utils.object_parsing import find_subclass
|
||||||
|
|
||||||
# 类型映射
|
# 类型映射
|
||||||
TYPE_MAP = {"plate": "Plate", "well": "Well", "deck": "Deck", "container": "RegularContainer", "tip_spot": "TipSpot"}
|
TYPE_MAP = {
|
||||||
|
"plate": "Plate",
|
||||||
|
"well": "Well",
|
||||||
|
"deck": "Deck",
|
||||||
|
"container": "RegularContainer",
|
||||||
|
"tip_spot": "TipSpot",
|
||||||
|
}
|
||||||
|
|
||||||
def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict, name_to_extra: dict):
|
def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict, name_to_extra: dict):
|
||||||
"""一次遍历收集 name_to_uuid, all_states 和 name_to_extra"""
|
"""一次遍历收集 name_to_uuid, all_states 和 name_to_extra"""
|
||||||
@@ -918,6 +931,33 @@ class DeviceNodeResourceTracker(object):
|
|||||||
|
|
||||||
return self._traverse_and_process(resource, process)
|
return self._traverse_and_process(resource, process)
|
||||||
|
|
||||||
|
def loop_find_with_uuid(self, resource, target_uuid: str):
|
||||||
|
"""
|
||||||
|
递归遍历资源树,根据 uuid 查找并返回对应的资源
|
||||||
|
|
||||||
|
Args:
|
||||||
|
resource: 资源对象(可以是list、dict或实例)
|
||||||
|
target_uuid: 要查找的uuid
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
找到的资源对象,未找到则返回None
|
||||||
|
"""
|
||||||
|
found_resource = None
|
||||||
|
|
||||||
|
def process(res):
|
||||||
|
nonlocal found_resource
|
||||||
|
if found_resource is not None:
|
||||||
|
return 0 # 已找到,跳过后续处理
|
||||||
|
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
|
||||||
|
if current_uuid and current_uuid == target_uuid:
|
||||||
|
found_resource = res
|
||||||
|
logger.trace(f"找到资源UUID: {target_uuid}")
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
self._traverse_and_process(resource, process)
|
||||||
|
return found_resource
|
||||||
|
|
||||||
def loop_set_extra(self, resource, name_to_extra_map: Dict[str, dict]) -> int:
|
def loop_set_extra(self, resource, name_to_extra_map: Dict[str, dict]) -> int:
|
||||||
"""
|
"""
|
||||||
递归遍历资源树,根据 name 设置所有节点的 extra
|
递归遍历资源树,根据 name 设置所有节点的 extra
|
||||||
@@ -1103,7 +1143,7 @@ class DeviceNodeResourceTracker(object):
|
|||||||
for key in keys_to_remove:
|
for key in keys_to_remove:
|
||||||
self.resource2parent_resource.pop(key, None)
|
self.resource2parent_resource.pop(key, None)
|
||||||
|
|
||||||
logger.debug(f"成功移除资源: {resource}")
|
logger.trace(f"[ResourceTracker] 成功移除资源: {resource}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def clear_resource(self):
|
def clear_resource(self):
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
# from nt import device_encoding
|
# from nt import device_encoding
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
@@ -55,7 +56,11 @@ def main(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""主函数"""
|
"""主函数"""
|
||||||
|
|
||||||
rclpy.init(args=rclpy_init_args)
|
# Support restart - check if rclpy is already initialized
|
||||||
|
if not rclpy.ok():
|
||||||
|
rclpy.init(args=rclpy_init_args)
|
||||||
|
else:
|
||||||
|
logger.info("[ROS] rclpy already initialized, reusing context")
|
||||||
executor = rclpy.__executor = MultiThreadedExecutor()
|
executor = rclpy.__executor = MultiThreadedExecutor()
|
||||||
# 创建主机节点
|
# 创建主机节点
|
||||||
host_node = HostNode(
|
host_node = HostNode(
|
||||||
@@ -88,7 +93,7 @@ def main(
|
|||||||
joint_republisher = JointRepublisher("joint_republisher", host_node.resource_tracker)
|
joint_republisher = JointRepublisher("joint_republisher", host_node.resource_tracker)
|
||||||
# lh_joint_pub = LiquidHandlerJointPublisher(
|
# lh_joint_pub = LiquidHandlerJointPublisher(
|
||||||
# resources_config=resources_list, resource_tracker=host_node.resource_tracker
|
# resources_config=resources_list, resource_tracker=host_node.resource_tracker
|
||||||
# )
|
# )
|
||||||
executor.add_node(resource_mesh_manager)
|
executor.add_node(resource_mesh_manager)
|
||||||
executor.add_node(joint_republisher)
|
executor.add_node(joint_republisher)
|
||||||
# executor.add_node(lh_joint_pub)
|
# executor.add_node(lh_joint_pub)
|
||||||
|
|||||||
@@ -159,10 +159,14 @@ _msg_converter: Dict[Type, Any] = {
|
|||||||
else Pose()
|
else Pose()
|
||||||
),
|
),
|
||||||
config=json.dumps(x.get("config", {})),
|
config=json.dumps(x.get("config", {})),
|
||||||
data=json.dumps(x.get("data", {})),
|
data=json.dumps(obtain_data_with_uuid(x)),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def obtain_data_with_uuid(x: dict):
|
||||||
|
data = x.get("data", {})
|
||||||
|
data["unilabos_uuid"] = x.get("uuid", None)
|
||||||
|
return data
|
||||||
|
|
||||||
def json_or_yaml_loads(data: str) -> Any:
|
def json_or_yaml_loads(data: str) -> Any:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ from rclpy.callback_groups import ReentrantCallbackGroup
|
|||||||
from rclpy.service import Service
|
from rclpy.service import Service
|
||||||
from unilabos_msgs.action import SendCmd
|
from unilabos_msgs.action import SendCmd
|
||||||
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
|
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
|
||||||
|
|
||||||
|
from unilabos.config.config import BasicConfig
|
||||||
from unilabos.utils.decorator import get_topic_config, get_all_subscriptions
|
from unilabos.utils.decorator import get_topic_config, get_all_subscriptions
|
||||||
|
|
||||||
from unilabos.resources.container import RegularContainer
|
from unilabos.resources.container import RegularContainer
|
||||||
@@ -390,9 +392,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
parent_resource = self.resource_tracker.figure_resource(
|
parent_resource = self.resource_tracker.figure_resource(
|
||||||
{"name": bind_parent_id}
|
{"name": bind_parent_id}
|
||||||
)
|
)
|
||||||
for r in rts.root_nodes:
|
for r in rts.root_nodes:
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
r.res_content.parent_uuid = parent_resource.unilabos_uuid
|
r.res_content.parent_uuid = parent_resource.unilabos_uuid
|
||||||
|
else:
|
||||||
|
for r in rts.root_nodes:
|
||||||
|
r.res_content.parent_uuid = self.uuid
|
||||||
|
|
||||||
if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer):
|
if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer):
|
||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
@@ -428,11 +433,14 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
})
|
})
|
||||||
tree_response: SerialCommand.Response = await client.call_async(request)
|
tree_response: SerialCommand.Response = await client.call_async(request)
|
||||||
uuid_maps = json.loads(tree_response.response)
|
uuid_maps = json.loads(tree_response.response)
|
||||||
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
|
plr_instances = rts.to_plr_resources()
|
||||||
|
for plr_instance in plr_instances:
|
||||||
|
self.resource_tracker.loop_update_uuid(plr_instance, uuid_maps)
|
||||||
|
rts: ResourceTreeSet = ResourceTreeSet.from_plr_resources(plr_instances)
|
||||||
self.lab_logger().info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
|
self.lab_logger().info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
|
||||||
final_response = {
|
final_response = {
|
||||||
"created_resources": rts.dump(),
|
"created_resource_tree": rts.dump(),
|
||||||
"liquid_input_resources": [],
|
"liquid_input_resource_tree": [],
|
||||||
}
|
}
|
||||||
res.response = json.dumps(final_response)
|
res.response = json.dumps(final_response)
|
||||||
# 如果driver自己就有assign的方法,那就使用driver自己的assign方法
|
# 如果driver自己就有assign的方法,那就使用driver自己的assign方法
|
||||||
@@ -458,7 +466,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
return res
|
return res
|
||||||
try:
|
try:
|
||||||
if len(rts.root_nodes) == 1 and parent_resource is not None:
|
if len(rts.root_nodes) == 1 and parent_resource is not None:
|
||||||
plr_instance = rts.to_plr_resources()[0]
|
plr_instance = plr_instances[0]
|
||||||
if isinstance(plr_instance, Plate):
|
if isinstance(plr_instance, Plate):
|
||||||
empty_liquid_info_in: List[Tuple[Optional[str], float]] = [(None, 0)] * plr_instance.num_items
|
empty_liquid_info_in: List[Tuple[Optional[str], float]] = [(None, 0)] * plr_instance.num_items
|
||||||
if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1:
|
if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1:
|
||||||
@@ -483,7 +491,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
input_wells = []
|
input_wells = []
|
||||||
for r in LIQUID_INPUT_SLOT:
|
for r in LIQUID_INPUT_SLOT:
|
||||||
input_wells.append(plr_instance.children[r])
|
input_wells.append(plr_instance.children[r])
|
||||||
final_response["liquid_input_resources"] = ResourceTreeSet.from_plr_resources(input_wells).dump()
|
final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(input_wells).dump()
|
||||||
res.response = json.dumps(final_response)
|
res.response = json.dumps(final_response)
|
||||||
if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param:
|
if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param:
|
||||||
other_calling_param["slot"] = int(other_calling_param["slot"])
|
other_calling_param["slot"] = int(other_calling_param["slot"])
|
||||||
@@ -651,61 +659,71 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
|
|
||||||
def transfer_to_new_resource(
|
def transfer_to_new_resource(
|
||||||
self, plr_resource: "ResourcePLR", tree: ResourceTreeInstance, additional_add_params: Dict[str, Any]
|
self, plr_resource: "ResourcePLR", tree: ResourceTreeInstance, additional_add_params: Dict[str, Any]
|
||||||
):
|
) -> Optional["ResourcePLR"]:
|
||||||
parent_uuid = tree.root_node.res_content.parent_uuid
|
parent_uuid = tree.root_node.res_content.parent_uuid
|
||||||
if parent_uuid:
|
if not parent_uuid:
|
||||||
parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid)
|
self.lab_logger().warning(
|
||||||
if parent_resource is None:
|
f"物料{plr_resource} parent未知,挂载到当前节点下,额外参数:{additional_add_params}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
if parent_uuid == self.uuid:
|
||||||
|
self.lab_logger().warning(
|
||||||
|
f"物料{plr_resource}请求挂载到{self.identifier},额外参数:{additional_add_params}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid)
|
||||||
|
if parent_resource is None:
|
||||||
|
self.lab_logger().warning(
|
||||||
|
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
# 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步
|
||||||
|
additional_params = {}
|
||||||
|
extra = getattr(plr_resource, "unilabos_extra", {})
|
||||||
|
if len(extra):
|
||||||
|
self.lab_logger().info(f"发现物料{plr_resource}额外参数: " + str(extra))
|
||||||
|
if "update_resource_site" in extra:
|
||||||
|
additional_add_params["site"] = extra["update_resource_site"]
|
||||||
|
site = additional_add_params.get("site", None)
|
||||||
|
spec = inspect.signature(parent_resource.assign_child_resource)
|
||||||
|
if "spot" in spec.parameters:
|
||||||
|
ordering_dict: Dict[str, Any] = getattr(parent_resource, "_ordering")
|
||||||
|
if ordering_dict:
|
||||||
|
site = list(ordering_dict.keys()).index(site)
|
||||||
|
additional_params["spot"] = site
|
||||||
|
old_parent = plr_resource.parent
|
||||||
|
if old_parent is not None:
|
||||||
|
# plr并不支持同一个deck的加载和卸载
|
||||||
|
self.lab_logger().warning(f"物料{plr_resource}请求从{old_parent}卸载")
|
||||||
|
old_parent.unassign_child_resource(plr_resource)
|
||||||
self.lab_logger().warning(
|
self.lab_logger().warning(
|
||||||
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在"
|
f"物料{plr_resource}请求挂载到{parent_resource},额外参数:{additional_params}"
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
try:
|
|
||||||
# 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步
|
|
||||||
additional_params = {}
|
|
||||||
extra = getattr(plr_resource, "unilabos_extra", {})
|
|
||||||
if len(extra):
|
|
||||||
self.lab_logger().info(f"发现物料{plr_resource}额外参数: " + str(extra))
|
|
||||||
if "update_resource_site" in extra:
|
|
||||||
additional_add_params["site"] = extra["update_resource_site"]
|
|
||||||
site = additional_add_params.get("site", None)
|
|
||||||
spec = inspect.signature(parent_resource.assign_child_resource)
|
|
||||||
if "spot" in spec.parameters:
|
|
||||||
ordering_dict: Dict[str, Any] = getattr(parent_resource, "_ordering")
|
|
||||||
if ordering_dict:
|
|
||||||
site = list(ordering_dict.keys()).index(site)
|
|
||||||
additional_params["spot"] = site
|
|
||||||
old_parent = plr_resource.parent
|
|
||||||
if old_parent is not None:
|
|
||||||
# plr并不支持同一个deck的加载和卸载
|
|
||||||
self.lab_logger().warning(f"物料{plr_resource}请求从{old_parent}卸载")
|
|
||||||
old_parent.unassign_child_resource(plr_resource)
|
|
||||||
self.lab_logger().warning(
|
|
||||||
f"物料{plr_resource}请求挂载到{parent_resource},额外参数:{additional_params}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ⭐ assign 之前,需要从 resources 列表中移除
|
# ⭐ assign 之前,需要从 resources 列表中移除
|
||||||
# 因为资源将不再是顶级资源,而是成为 parent_resource 的子资源
|
# 因为资源将不再是顶级资源,而是成为 parent_resource 的子资源
|
||||||
# 如果不移除,figure_resource 会找到两次:一次在 resources,一次在 parent 的 children
|
# 如果不移除,figure_resource 会找到两次:一次在 resources,一次在 parent 的 children
|
||||||
resource_id = id(plr_resource)
|
resource_id = id(plr_resource)
|
||||||
for i, r in enumerate(self.resource_tracker.resources):
|
for i, r in enumerate(self.resource_tracker.resources):
|
||||||
if id(r) == resource_id:
|
if id(r) == resource_id:
|
||||||
self.resource_tracker.resources.pop(i)
|
self.resource_tracker.resources.pop(i)
|
||||||
self.lab_logger().debug(
|
self.lab_logger().debug(
|
||||||
f"从顶级资源列表中移除 {plr_resource.name}(即将成为 {parent_resource.name} 的子资源)"
|
f"从顶级资源列表中移除 {plr_resource.name}(即将成为 {parent_resource.name} 的子资源)"
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
parent_resource.assign_child_resource(plr_resource, location=None, **additional_params)
|
parent_resource.assign_child_resource(plr_resource, location=None, **additional_params)
|
||||||
|
|
||||||
func = getattr(self.driver_instance, "resource_tree_transfer", None)
|
func = getattr(self.driver_instance, "resource_tree_transfer", None)
|
||||||
if callable(func):
|
if callable(func):
|
||||||
# 分别是 物料的原来父节点,当前物料的状态,物料的新父节点(此时物料已经重新assign了)
|
# 分别是 物料的原来父节点,当前物料的状态,物料的新父节点(此时物料已经重新assign了)
|
||||||
func(old_parent, plr_resource, parent_resource)
|
func(old_parent, plr_resource, parent_resource)
|
||||||
except Exception as e:
|
return parent_resource
|
||||||
self.lab_logger().warning(
|
except Exception as e:
|
||||||
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}"
|
self.lab_logger().warning(
|
||||||
)
|
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}"
|
||||||
|
)
|
||||||
|
|
||||||
async def s2c_resource_tree(self, req: SerialCommand_Request, res: SerialCommand_Response):
|
async def s2c_resource_tree(self, req: SerialCommand_Request, res: SerialCommand_Response):
|
||||||
"""
|
"""
|
||||||
@@ -720,7 +738,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
|
|
||||||
def _handle_add(
|
def _handle_add(
|
||||||
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
|
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
|
||||||
) -> Dict[str, Any]:
|
) -> Tuple[Dict[str, Any], List[ResourcePLR]]:
|
||||||
"""
|
"""
|
||||||
处理资源添加操作的内部函数
|
处理资源添加操作的内部函数
|
||||||
|
|
||||||
@@ -732,15 +750,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
Returns:
|
Returns:
|
||||||
操作结果字典
|
操作结果字典
|
||||||
"""
|
"""
|
||||||
|
parents = [] # 放的是被变更的物料 / 被变更的物料父级
|
||||||
for plr_resource, tree in zip(plr_resources, tree_set.trees):
|
for plr_resource, tree in zip(plr_resources, tree_set.trees):
|
||||||
self.resource_tracker.add_resource(plr_resource)
|
self.resource_tracker.add_resource(plr_resource)
|
||||||
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
|
parent = self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
|
||||||
|
if parent is not None:
|
||||||
|
parents.append(parent)
|
||||||
|
else:
|
||||||
|
parents.append(plr_resource)
|
||||||
|
|
||||||
func = getattr(self.driver_instance, "resource_tree_add", None)
|
func = getattr(self.driver_instance, "resource_tree_add", None)
|
||||||
if callable(func):
|
if callable(func):
|
||||||
func(plr_resources)
|
func(plr_resources)
|
||||||
|
|
||||||
return {"success": True, "action": "add"}
|
return {"success": True, "action": "add"}, parents
|
||||||
|
|
||||||
def _handle_remove(resources_uuid: List[str]) -> Dict[str, Any]:
|
def _handle_remove(resources_uuid: List[str]) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -775,11 +798,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
if plr_resource.parent is not None:
|
if plr_resource.parent is not None:
|
||||||
plr_resource.parent.unassign_child_resource(plr_resource)
|
plr_resource.parent.unassign_child_resource(plr_resource)
|
||||||
self.resource_tracker.remove_resource(plr_resource)
|
self.resource_tracker.remove_resource(plr_resource)
|
||||||
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
|
self.lab_logger().info(f"[资源同步] 移除物料 {plr_resource} 及其子节点")
|
||||||
|
|
||||||
for other_plr_resource in other_plr_resources:
|
for other_plr_resource in other_plr_resources:
|
||||||
self.resource_tracker.remove_resource(other_plr_resource)
|
self.resource_tracker.remove_resource(other_plr_resource)
|
||||||
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
|
self.lab_logger().info(f"[资源同步] 移除物料 {other_plr_resource} 及其子节点")
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"success": True,
|
"success": True,
|
||||||
@@ -811,11 +834,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
|
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
|
||||||
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
|
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
|
||||||
)
|
)
|
||||||
|
original_parent_resource = original_instance.parent
|
||||||
|
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
|
||||||
|
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
|
||||||
|
not_same_parent = original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None
|
||||||
|
old_name = original_instance.name
|
||||||
|
new_name = plr_resource.name
|
||||||
|
parent_appended = False
|
||||||
|
|
||||||
# Update操作中包含改名:需要先remove再add
|
# Update操作中包含改名:需要先remove再add,这里更新父节点即可
|
||||||
if original_instance.name != plr_resource.name:
|
if not not_same_parent and old_name != new_name:
|
||||||
old_name = original_instance.name
|
|
||||||
new_name = plr_resource.name
|
|
||||||
self.lab_logger().info(f"物料改名操作:{old_name} -> {new_name}")
|
self.lab_logger().info(f"物料改名操作:{old_name} -> {new_name}")
|
||||||
|
|
||||||
# 收集所有相关的uuid(包括子节点)
|
# 收集所有相关的uuid(包括子节点)
|
||||||
@@ -824,12 +852,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
_handle_add([original_instance], tree_set, additional_add_params)
|
_handle_add([original_instance], tree_set, additional_add_params)
|
||||||
|
|
||||||
self.lab_logger().info(f"物料改名完成:{old_name} -> {new_name}")
|
self.lab_logger().info(f"物料改名完成:{old_name} -> {new_name}")
|
||||||
|
original_instances.append(original_parent_resource)
|
||||||
|
parent_appended = True
|
||||||
|
|
||||||
# 常规更新:不涉及改名
|
# 常规更新:不涉及改名
|
||||||
original_parent_resource = original_instance.parent
|
|
||||||
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
|
|
||||||
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
|
|
||||||
|
|
||||||
self.lab_logger().info(
|
self.lab_logger().info(
|
||||||
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} "
|
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} "
|
||||||
f"目标父节点{target_parent_resource_uuid} 更新"
|
f"目标父节点{target_parent_resource_uuid} 更新"
|
||||||
@@ -840,13 +866,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") # type: ignore # noqa: E501
|
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") # type: ignore # noqa: E501
|
||||||
|
|
||||||
# 如果父节点变化,需要重新挂载
|
# 如果父节点变化,需要重新挂载
|
||||||
if (
|
if not_same_parent:
|
||||||
original_parent_resource_uuid != target_parent_resource_uuid
|
parent = self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
||||||
and original_parent_resource is not None
|
original_instances.append(parent)
|
||||||
):
|
parent_appended = True
|
||||||
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
|
||||||
else:
|
else:
|
||||||
# 判断是否变更了resource_site
|
# 判断是否变更了resource_site,重新登记
|
||||||
target_site = original_instance.unilabos_extra.get("update_resource_site")
|
target_site = original_instance.unilabos_extra.get("update_resource_site")
|
||||||
sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None
|
sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None
|
||||||
site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else []
|
site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else []
|
||||||
@@ -854,7 +879,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
site_index = sites.index(original_instance)
|
site_index = sites.index(original_instance)
|
||||||
site_name = site_names[site_index]
|
site_name = site_names[site_index]
|
||||||
if site_name != target_site:
|
if site_name != target_site:
|
||||||
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
parent = self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
||||||
|
if parent is not None:
|
||||||
|
original_instances.append(parent)
|
||||||
|
parent_appended = True
|
||||||
|
|
||||||
# 加载状态
|
# 加载状态
|
||||||
original_instance.load_all_state(states)
|
original_instance.load_all_state(states)
|
||||||
@@ -862,7 +890,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
self.lab_logger().info(
|
self.lab_logger().info(
|
||||||
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] " f"及其子节点 {child_count} 个"
|
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] " f"及其子节点 {child_count} 个"
|
||||||
)
|
)
|
||||||
original_instances.append(original_instance)
|
if not parent_appended:
|
||||||
|
original_instances.append(original_instance)
|
||||||
|
|
||||||
# 调用driver的update回调
|
# 调用driver的update回调
|
||||||
func = getattr(self.driver_instance, "resource_tree_update", None)
|
func = getattr(self.driver_instance, "resource_tree_update", None)
|
||||||
@@ -879,8 +908,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
action = i.get("action") # remove, add, update
|
action = i.get("action") # remove, add, update
|
||||||
resources_uuid: List[str] = i.get("data") # 资源数据
|
resources_uuid: List[str] = i.get("data") # 资源数据
|
||||||
additional_add_params = i.get("additional_add_params", {}) # 额外参数
|
additional_add_params = i.get("additional_add_params", {}) # 额外参数
|
||||||
self.lab_logger().info(
|
self.lab_logger().trace(
|
||||||
f"[Resource Tree Update] Processing {action} operation, " f"resources count: {len(resources_uuid)}"
|
f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}"
|
||||||
)
|
)
|
||||||
tree_set = None
|
tree_set = None
|
||||||
if action in ["add", "update"]:
|
if action in ["add", "update"]:
|
||||||
@@ -892,8 +921,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
if tree_set is None:
|
if tree_set is None:
|
||||||
raise ValueError("tree_set不能为None")
|
raise ValueError("tree_set不能为None")
|
||||||
plr_resources = tree_set.to_plr_resources()
|
plr_resources = tree_set.to_plr_resources()
|
||||||
result = _handle_add(plr_resources, tree_set, additional_add_params)
|
result, parents = _handle_add(plr_resources, tree_set, additional_add_params)
|
||||||
new_tree_set = ResourceTreeSet.from_plr_resources(plr_resources)
|
parents: List[Optional["ResourcePLR"]] = [i for i in parents if i is not None]
|
||||||
|
de_dupe_parents = list(set(parents))
|
||||||
|
new_tree_set = ResourceTreeSet.from_plr_resources(de_dupe_parents) # 去重
|
||||||
|
for tree in new_tree_set.trees:
|
||||||
|
if tree.root_node.res_content.uuid_parent is None and self.node_name != "host_node":
|
||||||
|
tree.root_node.res_content.parent_uuid = self.uuid
|
||||||
r = SerialCommand.Request()
|
r = SerialCommand.Request()
|
||||||
r.command = json.dumps(
|
r.command = json.dumps(
|
||||||
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
|
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
|
||||||
@@ -911,13 +945,17 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
else:
|
else:
|
||||||
plr_resources.append(ResourceTreeSet([tree]).to_plr_resources()[0])
|
plr_resources.append(ResourceTreeSet([tree]).to_plr_resources()[0])
|
||||||
result, original_instances = _handle_update(plr_resources, tree_set, additional_add_params)
|
result, original_instances = _handle_update(plr_resources, tree_set, additional_add_params)
|
||||||
# new_tree_set = ResourceTreeSet.from_plr_resources(original_instances)
|
if not BasicConfig.no_update_feedback:
|
||||||
# r = SerialCommand.Request()
|
new_tree_set = ResourceTreeSet.from_plr_resources(original_instances) # 去重
|
||||||
# r.command = json.dumps(
|
for tree in new_tree_set.trees:
|
||||||
# {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
|
if tree.root_node.res_content.uuid_parent is None and self.node_name != "host_node":
|
||||||
# response: SerialCommand_Response = await self._resource_clients[
|
tree.root_node.res_content.parent_uuid = self.uuid
|
||||||
# "c2s_update_resource_tree"].call_async(r) # type: ignore
|
r = SerialCommand.Request()
|
||||||
# self.lab_logger().info(f"确认资源云端 Update 结果: {response.response}")
|
r.command = json.dumps(
|
||||||
|
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
|
||||||
|
response: SerialCommand_Response = await self._resource_clients[
|
||||||
|
"c2s_update_resource_tree"].call_async(r) # type: ignore
|
||||||
|
self.lab_logger().info(f"确认资源云端 Update 结果: {response.response}")
|
||||||
results.append(result)
|
results.append(result)
|
||||||
elif action == "remove":
|
elif action == "remove":
|
||||||
result = _handle_remove(resources_uuid)
|
result = _handle_remove(resources_uuid)
|
||||||
@@ -931,15 +969,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
# 返回处理结果
|
# 返回处理结果
|
||||||
result_json = {"results": results, "total": len(data)}
|
result_json = {"results": results, "total": len(data)}
|
||||||
res.response = json.dumps(result_json, ensure_ascii=False, cls=TypeEncoder)
|
res.response = json.dumps(result_json, ensure_ascii=False, cls=TypeEncoder)
|
||||||
self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations")
|
# self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations")
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
error_msg = f"Invalid JSON format: {str(e)}"
|
error_msg = f"Invalid JSON format: {str(e)}"
|
||||||
self.lab_logger().error(f"[Resource Tree Update] {error_msg}")
|
self.lab_logger().error(f"[资源同步] {error_msg}")
|
||||||
res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
|
res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_msg = f"Unexpected error: {str(e)}"
|
error_msg = f"Unexpected error: {str(e)}"
|
||||||
self.lab_logger().error(f"[Resource Tree Update] {error_msg}")
|
self.lab_logger().error(f"[资源同步] {error_msg}")
|
||||||
self.lab_logger().error(traceback.format_exc())
|
self.lab_logger().error(traceback.format_exc())
|
||||||
res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
|
res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
|
||||||
|
|
||||||
@@ -1260,7 +1298,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
ACTION, action_paramtypes = self.get_real_function(self.driver_instance, action_name)
|
ACTION, action_paramtypes = self.get_real_function(self.driver_instance, action_name)
|
||||||
|
|
||||||
action_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"])
|
action_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"])
|
||||||
self.lab_logger().debug(f"任务 {ACTION.__name__} 接收到原始目标: {action_kwargs}")
|
self.lab_logger().debug(f"任务 {ACTION.__name__} 接收到原始目标: {str(action_kwargs)[:1000]}")
|
||||||
|
self.lab_logger().trace(f"任务 {ACTION.__name__} 接收到原始目标: {action_kwargs}")
|
||||||
error_skip = False
|
error_skip = False
|
||||||
# 向Host查询物料当前状态,如果是host本身的增加物料的请求,则直接跳过
|
# 向Host查询物料当前状态,如果是host本身的增加物料的请求,则直接跳过
|
||||||
if action_name not in ["create_resource_detailed", "create_resource"]:
|
if action_name not in ["create_resource_detailed", "create_resource"]:
|
||||||
@@ -1276,9 +1315,14 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
# 批量查询资源
|
# 批量查询资源
|
||||||
queried_resources = []
|
queried_resources = []
|
||||||
for resource_data in resource_inputs:
|
for resource_data in resource_inputs:
|
||||||
plr_resource = await self.get_resource_with_dir(
|
unilabos_uuid = resource_data.get("data", {}).get("unilabos_uuid")
|
||||||
resource_id=resource_data["id"], with_children=True
|
if unilabos_uuid is None:
|
||||||
)
|
plr_resource = await self.get_resource_with_dir(
|
||||||
|
resource_id=resource_data["id"], with_children=True
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
resource_tree = await self.get_resource([unilabos_uuid])
|
||||||
|
plr_resource = resource_tree.to_plr_resources()[0]
|
||||||
if "sample_id" in resource_data:
|
if "sample_id" in resource_data:
|
||||||
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
|
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
|
||||||
queried_resources.append(plr_resource)
|
queried_resources.append(plr_resource)
|
||||||
@@ -1327,9 +1371,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
execution_success = True
|
execution_success = True
|
||||||
except Exception as _:
|
except Exception as _:
|
||||||
execution_error = traceback.format_exc()
|
execution_error = traceback.format_exc()
|
||||||
error(
|
error(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}")
|
||||||
f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
|
trace(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}")
|
||||||
)
|
|
||||||
|
|
||||||
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs)
|
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs)
|
||||||
future.add_done_callback(_handle_future_exception)
|
future.add_done_callback(_handle_future_exception)
|
||||||
@@ -1349,8 +1392,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
except Exception as _:
|
except Exception as _:
|
||||||
execution_error = traceback.format_exc()
|
execution_error = traceback.format_exc()
|
||||||
error(
|
error(
|
||||||
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
|
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}")
|
||||||
)
|
trace(
|
||||||
|
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}")
|
||||||
|
|
||||||
future.add_done_callback(_handle_future_exception)
|
future.add_done_callback(_handle_future_exception)
|
||||||
|
|
||||||
@@ -1418,7 +1462,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
for r in rs:
|
for r in rs:
|
||||||
res = self.resource_tracker.parent_resource(r) # 获取 resource 对象
|
res = self.resource_tracker.parent_resource(r) # 获取 resource 对象
|
||||||
else:
|
else:
|
||||||
res = self.resource_tracker.parent_resource(r)
|
res = self.resource_tracker.parent_resource(rs)
|
||||||
if id(res) not in seen:
|
if id(res) not in seen:
|
||||||
seen.add(id(res))
|
seen.add(id(res))
|
||||||
unique_resources.append(res)
|
unique_resources.append(res)
|
||||||
@@ -1494,8 +1538,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
resource_data = function_args[arg_name]
|
resource_data = function_args[arg_name]
|
||||||
if isinstance(resource_data, dict) and "id" in resource_data:
|
if isinstance(resource_data, dict) and "id" in resource_data:
|
||||||
try:
|
try:
|
||||||
converted_resource = self._convert_resource_sync(resource_data)
|
function_args[arg_name] = self._convert_resources_sync(resource_data["uuid"])[0]
|
||||||
function_args[arg_name] = converted_resource
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.lab_logger().error(
|
self.lab_logger().error(
|
||||||
f"转换ResourceSlot参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
|
f"转换ResourceSlot参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
|
||||||
@@ -1509,12 +1552,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
resource_list = function_args[arg_name]
|
resource_list = function_args[arg_name]
|
||||||
if isinstance(resource_list, list):
|
if isinstance(resource_list, list):
|
||||||
try:
|
try:
|
||||||
converted_resources = []
|
uuids = [r["uuid"] for r in resource_list if isinstance(r, dict) and "id" in r]
|
||||||
for resource_data in resource_list:
|
function_args[arg_name] = self._convert_resources_sync(*uuids) if uuids else []
|
||||||
if isinstance(resource_data, dict) and "id" in resource_data:
|
|
||||||
converted_resource = self._convert_resource_sync(resource_data)
|
|
||||||
converted_resources.append(converted_resource)
|
|
||||||
function_args[arg_name] = converted_resources
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.lab_logger().error(
|
self.lab_logger().error(
|
||||||
f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
|
f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
|
||||||
@@ -1527,20 +1566,27 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
|
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _convert_resource_sync(self, resource_data: Dict[str, Any]):
|
def _convert_resources_sync(self, *uuids: str) -> List["ResourcePLR"]:
|
||||||
"""同步转换资源数据为实例"""
|
"""同步转换资源 UUID 为实例
|
||||||
# 创建资源查询请求
|
|
||||||
r = SerialCommand.Request()
|
Args:
|
||||||
r.command = json.dumps(
|
*uuids: 一个或多个资源 UUID
|
||||||
{
|
|
||||||
"id": resource_data.get("id", None),
|
Returns:
|
||||||
"uuid": resource_data.get("uuid", None),
|
单个 UUID 时返回单个资源实例,多个 UUID 时返回资源实例列表
|
||||||
"with_children": True,
|
"""
|
||||||
}
|
if not uuids:
|
||||||
)
|
raise ValueError("至少需要提供一个 UUID")
|
||||||
|
|
||||||
# 同步调用资源查询服务
|
uuids_list = list(uuids)
|
||||||
future = self._resource_clients["resource_get"].call_async(r)
|
future = self._resource_clients["c2s_update_resource_tree"].call_async(SerialCommand.Request(
|
||||||
|
command=json.dumps(
|
||||||
|
{
|
||||||
|
"data": {"data": uuids_list, "with_children": True},
|
||||||
|
"action": "get",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
))
|
||||||
|
|
||||||
# 等待结果(使用while循环,每次sleep 0.05秒,最多等待30秒)
|
# 等待结果(使用while循环,每次sleep 0.05秒,最多等待30秒)
|
||||||
timeout = 30.0
|
timeout = 30.0
|
||||||
@@ -1550,27 +1596,40 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
elapsed += 0.05
|
elapsed += 0.05
|
||||||
|
|
||||||
if not future.done():
|
if not future.done():
|
||||||
raise Exception(f"资源查询超时: {resource_data}")
|
raise Exception(f"资源查询超时: {uuids_list}")
|
||||||
|
|
||||||
response = future.result()
|
response = future.result()
|
||||||
if response is None:
|
if response is None:
|
||||||
raise Exception(f"资源查询返回空结果: {resource_data}")
|
raise Exception(f"资源查询返回空结果: {uuids_list}")
|
||||||
|
|
||||||
raw_data = json.loads(response.response)
|
raw_data = json.loads(response.response)
|
||||||
|
|
||||||
# 转换为 PLR 资源
|
# 转换为 PLR 资源
|
||||||
tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
|
tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
|
||||||
plr_resource = tree_set.to_plr_resources()[0]
|
if not len(tree_set.trees):
|
||||||
|
raise Exception(f"资源查询返回空树: {raw_data}")
|
||||||
|
plr_resources = tree_set.to_plr_resources()
|
||||||
|
|
||||||
# 通过资源跟踪器获取本地实例
|
# 通过资源跟踪器获取本地实例
|
||||||
res = self.resource_tracker.figure_resource(plr_resource, try_mode=True)
|
figured_resources: List[ResourcePLR] = []
|
||||||
if len(res) == 0:
|
for plr_resource, tree in zip(plr_resources, tree_set.trees):
|
||||||
self.lab_logger().warning(f"资源转换未能索引到实例: {resource_data},返回新建实例")
|
res = self.resource_tracker.figure_resource(plr_resource, try_mode=True)
|
||||||
return plr_resource
|
if len(res) == 0:
|
||||||
elif len(res) == 1:
|
self.lab_logger().warning(f"资源转换未能索引到实例: {tree.root_node.res_content},返回新建实例")
|
||||||
return res[0]
|
figured_resources.append(plr_resource)
|
||||||
else:
|
elif len(res) == 1:
|
||||||
raise ValueError(f"资源转换得到多个实例: {res}")
|
figured_resources.append(res[0])
|
||||||
|
else:
|
||||||
|
raise ValueError(f"资源转换得到多个实例: {res}")
|
||||||
|
|
||||||
|
mapped_plr_resources = []
|
||||||
|
for uuid in uuids_list:
|
||||||
|
for plr_resource in figured_resources:
|
||||||
|
r = self.resource_tracker.loop_find_with_uuid(plr_resource, uuid)
|
||||||
|
mapped_plr_resources.append(r)
|
||||||
|
break
|
||||||
|
|
||||||
|
return mapped_plr_resources
|
||||||
|
|
||||||
async def _execute_driver_command_async(self, string: str):
|
async def _execute_driver_command_async(self, string: str):
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialComma
|
|||||||
from unique_identifier_msgs.msg import UUID
|
from unique_identifier_msgs.msg import UUID
|
||||||
|
|
||||||
from unilabos.registry.registry import lab_registry
|
from unilabos.registry.registry import lab_registry
|
||||||
|
from unilabos.resources.container import RegularContainer
|
||||||
from unilabos.resources.graphio import initialize_resource
|
from unilabos.resources.graphio import initialize_resource
|
||||||
from unilabos.resources.registry import add_schema
|
from unilabos.resources.registry import add_schema
|
||||||
from unilabos.ros.initialize_device import initialize_device_from_dict
|
from unilabos.ros.initialize_device import initialize_device_from_dict
|
||||||
@@ -70,6 +71,8 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
|
|
||||||
_instance: ClassVar[Optional["HostNode"]] = None
|
_instance: ClassVar[Optional["HostNode"]] = None
|
||||||
_ready_event: ClassVar[threading.Event] = threading.Event()
|
_ready_event: ClassVar[threading.Event] = threading.Event()
|
||||||
|
_shutting_down: ClassVar[bool] = False # Flag to signal shutdown to background threads
|
||||||
|
_background_threads: ClassVar[List[threading.Thread]] = [] # Track all background threads for cleanup
|
||||||
_device_action_status: ClassVar[collections.defaultdict[str, DeviceActionStatus]] = collections.defaultdict(
|
_device_action_status: ClassVar[collections.defaultdict[str, DeviceActionStatus]] = collections.defaultdict(
|
||||||
DeviceActionStatus
|
DeviceActionStatus
|
||||||
)
|
)
|
||||||
@@ -81,6 +84,48 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
return cls._instance
|
return cls._instance
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def shutdown_background_threads(cls, timeout: float = 5.0) -> None:
|
||||||
|
"""
|
||||||
|
Gracefully shutdown all background threads for clean exit or restart.
|
||||||
|
|
||||||
|
This method:
|
||||||
|
1. Sets shutdown flag to stop background operations
|
||||||
|
2. Waits for background threads to finish with timeout
|
||||||
|
3. Cleans up finished threads from tracking list
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: Maximum time to wait for each thread (seconds)
|
||||||
|
"""
|
||||||
|
cls._shutting_down = True
|
||||||
|
|
||||||
|
# Wait for background threads to finish
|
||||||
|
active_threads = []
|
||||||
|
for t in cls._background_threads:
|
||||||
|
if t.is_alive():
|
||||||
|
t.join(timeout=timeout)
|
||||||
|
if t.is_alive():
|
||||||
|
active_threads.append(t.name)
|
||||||
|
|
||||||
|
if active_threads:
|
||||||
|
logger.warning(f"[Host Node] Some background threads still running: {active_threads}")
|
||||||
|
|
||||||
|
# Clear the thread list
|
||||||
|
cls._background_threads.clear()
|
||||||
|
logger.info(f"[Host Node] Background threads shutdown complete")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def reset_state(cls) -> None:
|
||||||
|
"""
|
||||||
|
Reset the HostNode singleton state for restart or clean exit.
|
||||||
|
Call this after destroying the instance.
|
||||||
|
"""
|
||||||
|
cls._instance = None
|
||||||
|
cls._ready_event.clear()
|
||||||
|
cls._shutting_down = False
|
||||||
|
cls._background_threads.clear()
|
||||||
|
logger.info("[Host Node] State reset complete")
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
device_id: str,
|
device_id: str,
|
||||||
@@ -294,12 +339,36 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
bridge.publish_host_ready()
|
bridge.publish_host_ready()
|
||||||
self.lab_logger().debug(f"Host ready signal sent via {bridge.__class__.__name__}")
|
self.lab_logger().debug(f"Host ready signal sent via {bridge.__class__.__name__}")
|
||||||
|
|
||||||
def _send_re_register(self, sclient):
|
def _send_re_register(self, sclient, device_namespace: str):
|
||||||
sclient.wait_for_service()
|
"""
|
||||||
request = SerialCommand.Request()
|
Send re-register command to a device. This is a one-time operation.
|
||||||
request.command = ""
|
|
||||||
future = sclient.call_async(request)
|
Args:
|
||||||
response = future.result()
|
sclient: The service client
|
||||||
|
device_namespace: The device namespace for logging
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Use timeout to prevent indefinite blocking
|
||||||
|
if not sclient.wait_for_service(timeout_sec=10.0):
|
||||||
|
self.lab_logger().debug(f"[Host Node] Re-register timeout for {device_namespace}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check shutdown flag after wait
|
||||||
|
if self._shutting_down:
|
||||||
|
self.lab_logger().debug(f"[Host Node] Re-register aborted for {device_namespace} (shutdown)")
|
||||||
|
return
|
||||||
|
|
||||||
|
request = SerialCommand.Request()
|
||||||
|
request.command = ""
|
||||||
|
future = sclient.call_async(request)
|
||||||
|
# Use timeout for result as well
|
||||||
|
future.result()
|
||||||
|
except Exception as e:
|
||||||
|
# Gracefully handle destruction during shutdown
|
||||||
|
if "destruction was requested" in str(e) or self._shutting_down:
|
||||||
|
self.lab_logger().debug(f"[Host Node] Re-register aborted for {device_namespace} (cleanup)")
|
||||||
|
else:
|
||||||
|
self.lab_logger().warning(f"[Host Node] Re-register failed for {device_namespace}: {e}")
|
||||||
|
|
||||||
def _discover_devices(self) -> None:
|
def _discover_devices(self) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -331,23 +400,27 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
self._create_action_clients_for_device(device_id, namespace)
|
self._create_action_clients_for_device(device_id, namespace)
|
||||||
self._online_devices.add(device_key)
|
self._online_devices.add(device_key)
|
||||||
sclient = self.create_client(SerialCommand, f"/srv{namespace}/re_register_device")
|
sclient = self.create_client(SerialCommand, f"/srv{namespace}/re_register_device")
|
||||||
threading.Thread(
|
t = threading.Thread(
|
||||||
target=self._send_re_register,
|
target=self._send_re_register,
|
||||||
args=(sclient,),
|
args=(sclient, namespace),
|
||||||
daemon=True,
|
daemon=True,
|
||||||
name=f"ROSDevice{self.device_id}_re_register_device_{namespace}",
|
name=f"ROSDevice{self.device_id}_re_register_device_{namespace}",
|
||||||
).start()
|
)
|
||||||
|
self._background_threads.append(t)
|
||||||
|
t.start()
|
||||||
elif device_key not in self._online_devices:
|
elif device_key not in self._online_devices:
|
||||||
# 设备重新上线
|
# 设备重新上线
|
||||||
self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}")
|
self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}")
|
||||||
self._online_devices.add(device_key)
|
self._online_devices.add(device_key)
|
||||||
sclient = self.create_client(SerialCommand, f"/srv{namespace}/re_register_device")
|
sclient = self.create_client(SerialCommand, f"/srv{namespace}/re_register_device")
|
||||||
threading.Thread(
|
t = threading.Thread(
|
||||||
target=self._send_re_register,
|
target=self._send_re_register,
|
||||||
args=(sclient,),
|
args=(sclient, namespace),
|
||||||
daemon=True,
|
daemon=True,
|
||||||
name=f"ROSDevice{self.device_id}_re_register_device_{namespace}",
|
name=f"ROSDevice{self.device_id}_re_register_device_{namespace}",
|
||||||
).start()
|
)
|
||||||
|
self._background_threads.append(t)
|
||||||
|
t.start()
|
||||||
|
|
||||||
# 检测离线设备
|
# 检测离线设备
|
||||||
offline_devices = self._online_devices - current_devices
|
offline_devices = self._online_devices - current_devices
|
||||||
@@ -513,11 +586,10 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
new_li = []
|
assert len(response) == 1, "Create Resource应当只返回一个结果"
|
||||||
for i in response:
|
for i in response:
|
||||||
res = json.loads(i)
|
res = json.loads(i)
|
||||||
new_li.append(res)
|
return res
|
||||||
return {"resources": new_li, "liquid_input_resources": new_li}
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
pass
|
pass
|
||||||
_n = "\n"
|
_n = "\n"
|
||||||
@@ -705,13 +777,14 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
raise ValueError(f"ActionClient {action_id} not found.")
|
raise ValueError(f"ActionClient {action_id} not found.")
|
||||||
|
|
||||||
action_client: ActionClient = self._action_clients[action_id]
|
action_client: ActionClient = self._action_clients[action_id]
|
||||||
|
|
||||||
# 遍历action_kwargs下的所有子dict,将"sample_uuid"的值赋给"sample_id"
|
# 遍历action_kwargs下的所有子dict,将"sample_uuid"的值赋给"sample_id"
|
||||||
def assign_sample_id(obj):
|
def assign_sample_id(obj):
|
||||||
if isinstance(obj, dict):
|
if isinstance(obj, dict):
|
||||||
if "sample_uuid" in obj:
|
if "sample_uuid" in obj:
|
||||||
obj["sample_id"] = obj["sample_uuid"]
|
obj["sample_id"] = obj["sample_uuid"]
|
||||||
obj.pop("sample_uuid")
|
obj.pop("sample_uuid")
|
||||||
for k,v in obj.items():
|
for k, v in obj.items():
|
||||||
if k != "unilabos_extra":
|
if k != "unilabos_extra":
|
||||||
assign_sample_id(v)
|
assign_sample_id(v)
|
||||||
elif isinstance(obj, list):
|
elif isinstance(obj, list):
|
||||||
@@ -721,7 +794,8 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
assign_sample_id(action_kwargs)
|
assign_sample_id(action_kwargs)
|
||||||
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
|
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
|
||||||
|
|
||||||
self.lab_logger().info(f"[Host Node] Sending goal for {action_id}: {goal_msg}")
|
self.lab_logger().info(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}")
|
||||||
|
self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {goal_msg}")
|
||||||
action_client.wait_for_server()
|
action_client.wait_for_server()
|
||||||
goal_uuid_obj = UUID(uuid=list(u.bytes))
|
goal_uuid_obj = UUID(uuid=list(u.bytes))
|
||||||
|
|
||||||
@@ -742,9 +816,7 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
self.lab_logger().info(f"[Host Node] Goal {action_id} ({item.job_id}) accepted")
|
self.lab_logger().info(f"[Host Node] Goal {action_id} ({item.job_id}) accepted")
|
||||||
self._goals[item.job_id] = goal_handle
|
self._goals[item.job_id] = goal_handle
|
||||||
goal_future = goal_handle.get_result_async()
|
goal_future = goal_handle.get_result_async()
|
||||||
goal_future.add_done_callback(
|
goal_future.add_done_callback(lambda f: self.get_result_callback(item, action_id, f))
|
||||||
lambda f: self.get_result_callback(item, action_id, f)
|
|
||||||
)
|
|
||||||
goal_future.result()
|
goal_future.result()
|
||||||
|
|
||||||
def feedback_callback(self, item: "QueueItem", action_id: str, feedback_msg) -> None:
|
def feedback_callback(self, item: "QueueItem", action_id: str, feedback_msg) -> None:
|
||||||
@@ -1061,11 +1133,11 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
|
|
||||||
接收序列化的 ResourceTreeSet 数据并进行处理
|
接收序列化的 ResourceTreeSet 数据并进行处理
|
||||||
"""
|
"""
|
||||||
self.lab_logger().info(f"[Host Node-Resource] Resource tree add request received")
|
|
||||||
try:
|
try:
|
||||||
# 解析请求数据
|
# 解析请求数据
|
||||||
data = json.loads(request.command)
|
data = json.loads(request.command)
|
||||||
action = data["action"]
|
action = data["action"]
|
||||||
|
self.lab_logger().info(f"[Host Node-Resource] Resource tree {action} request received")
|
||||||
data = data["data"]
|
data = data["data"]
|
||||||
if action == "add":
|
if action == "add":
|
||||||
await self._resource_tree_action_add_callback(data, response)
|
await self._resource_tree_action_add_callback(data, response)
|
||||||
@@ -1167,10 +1239,11 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
from unilabos.app.web import http_client
|
from unilabos.app.web import http_client
|
||||||
|
|
||||||
data = json.loads(request.command)
|
data = json.loads(request.command)
|
||||||
if "uuid" in data and data["uuid"] is not None:
|
if "uuid" in data and data["uuid"] is not None:
|
||||||
http_req = http_client.resource_tree_get([data["uuid"]], data["with_children"])
|
http_req = http_client.resource_tree_get([data["uuid"]], data["with_children"])
|
||||||
elif "id" in data and data["id"].startswith("/"):
|
elif "id" in data:
|
||||||
http_req = http_client.resource_get(data["id"], data["with_children"])
|
http_req = http_client.resource_get(data["id"], data["with_children"])
|
||||||
else:
|
else:
|
||||||
raise ValueError("没有使用正确的物料 id 或 uuid")
|
raise ValueError("没有使用正确的物料 id 或 uuid")
|
||||||
@@ -1380,10 +1453,16 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def test_resource(
|
def test_resource(
|
||||||
self, resource: ResourceSlot, resources: List[ResourceSlot], device: DeviceSlot, devices: List[DeviceSlot]
|
self, resource: ResourceSlot = None, resources: List[ResourceSlot] = None, device: DeviceSlot = None, devices: List[DeviceSlot] = None
|
||||||
) -> TestResourceReturn:
|
) -> TestResourceReturn:
|
||||||
|
if resources is None:
|
||||||
|
resources = []
|
||||||
|
if devices is None:
|
||||||
|
devices = []
|
||||||
|
if resource is None:
|
||||||
|
resource = RegularContainer("test_resource传入None")
|
||||||
return {
|
return {
|
||||||
"resources": ResourceTreeSet.from_plr_resources([resource, *resources]).dump(),
|
"resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(),
|
||||||
"devices": [device, *devices],
|
"devices": [device, *devices],
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1435,7 +1514,7 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
|
|
||||||
# 构建服务地址
|
# 构建服务地址
|
||||||
srv_address = f"/srv{namespace}/s2c_resource_tree"
|
srv_address = f"/srv{namespace}/s2c_resource_tree"
|
||||||
self.lab_logger().info(f"[Host Node-Resource] Notifying {device_id} for resource tree {action} operation")
|
self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation started -------")
|
||||||
|
|
||||||
# 创建服务客户端
|
# 创建服务客户端
|
||||||
sclient = self.create_client(SerialCommand, srv_address)
|
sclient = self.create_client(SerialCommand, srv_address)
|
||||||
@@ -1470,9 +1549,7 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
|
|
||||||
response = future.result()
|
response = future.result()
|
||||||
self.lab_logger().info(
|
self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation completed -------")
|
||||||
f"[Host Node-Resource] Resource tree {action} notification completed for {device_id}"
|
|
||||||
)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user