mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 04:51:10 +00:00
移动内部action以兼容host node
This commit is contained in:
@@ -1078,6 +1078,57 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
|
||||
return execute_callback
|
||||
|
||||
def _execute_driver_command(self, string: str):
|
||||
try:
|
||||
target = json.loads(string)
|
||||
except Exception as ex:
|
||||
try:
|
||||
target = yaml.safe_load(io.StringIO(string))
|
||||
except Exception as ex2:
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
try:
|
||||
function_name = target["function_name"]
|
||||
function_args = target["function_args"]
|
||||
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
|
||||
function = getattr(self.driver_instance, function_name)
|
||||
assert callable(
|
||||
function
|
||||
), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}"
|
||||
return function(**function_args)
|
||||
except KeyError as ex:
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
async def _execute_driver_command_async(self, string: str):
|
||||
try:
|
||||
target = json.loads(string)
|
||||
except Exception as ex:
|
||||
try:
|
||||
target = yaml.safe_load(io.StringIO(string))
|
||||
except Exception as ex2:
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
try:
|
||||
function_name = target["function_name"]
|
||||
function_args = target["function_args"]
|
||||
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
|
||||
function = getattr(self.driver_instance, function_name)
|
||||
assert callable(
|
||||
function
|
||||
), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}"
|
||||
assert asyncio.iscoroutinefunction(
|
||||
function
|
||||
), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}"
|
||||
return await function(**function_args)
|
||||
except KeyError as ex:
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
# 异步上下文管理方法
|
||||
async def __aenter__(self):
|
||||
"""进入异步上下文"""
|
||||
@@ -1244,65 +1295,14 @@ class ROS2DeviceNode:
|
||||
self._ros_node: BaseROS2DeviceNode
|
||||
self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}")
|
||||
self.driver_instance._ros_node = self._ros_node # type: ignore
|
||||
self.driver_instance._execute_driver_command = self._execute_driver_command # type: ignore
|
||||
self.driver_instance._execute_driver_command_async = self._execute_driver_command_async # type: ignore
|
||||
self.driver_instance._execute_driver_command = self._ros_node._execute_driver_command # type: ignore
|
||||
self.driver_instance._execute_driver_command_async = self._ros_node._execute_driver_command_async # type: ignore
|
||||
if hasattr(self.driver_instance, "post_init"):
|
||||
try:
|
||||
self.driver_instance.post_init(self._ros_node) # type: ignore
|
||||
except Exception as e:
|
||||
self._ros_node.lab_logger().error(f"设备后初始化失败: {e}")
|
||||
|
||||
def _execute_driver_command(self, string: str):
|
||||
try:
|
||||
target = json.loads(string)
|
||||
except Exception as ex:
|
||||
try:
|
||||
target = yaml.safe_load(io.StringIO(string))
|
||||
except Exception as ex2:
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
try:
|
||||
function_name = target["function_name"]
|
||||
function_args = target["function_args"]
|
||||
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
|
||||
function = getattr(self.driver_instance, function_name)
|
||||
assert callable(
|
||||
function
|
||||
), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}"
|
||||
return function(**function_args)
|
||||
except KeyError as ex:
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
async def _execute_driver_command_async(self, string: str):
|
||||
try:
|
||||
target = json.loads(string)
|
||||
except Exception as ex:
|
||||
try:
|
||||
target = yaml.safe_load(io.StringIO(string))
|
||||
except Exception as ex2:
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
try:
|
||||
function_name = target["function_name"]
|
||||
function_args = target["function_args"]
|
||||
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
|
||||
function = getattr(self.driver_instance, function_name)
|
||||
assert callable(
|
||||
function
|
||||
), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}"
|
||||
assert asyncio.iscoroutinefunction(
|
||||
function
|
||||
), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}"
|
||||
return await function(**function_args)
|
||||
except KeyError as ex:
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
def _start_loop(self):
|
||||
def run_event_loop():
|
||||
loop = asyncio.new_event_loop()
|
||||
|
||||
Reference in New Issue
Block a user