diff --git a/unilabos/app/communication.py b/unilabos/app/communication.py index 60b93818..e1ad228f 100644 --- a/unilabos/app/communication.py +++ b/unilabos/app/communication.py @@ -52,7 +52,7 @@ class BaseCommunicationClient(ABC): @abstractmethod def publish_job_status( - self, feedback_data: dict, job_id: str, status: str, return_info: Optional[str] = None + self, feedback_data: dict, job_id: str, status: str, return_info: Optional[dict] = None ) -> None: """ 发布作业状态信息 diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index 65d0ab42..adbee0ea 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -171,11 +171,11 @@ class MQTTClient(BaseCommunicationClient): self.client.publish(address, json.dumps(status), qos=2) # logger.info(f"Device {device_id} status published: address: {address}, {status}") - def publish_job_status(self, feedback_data: dict, job_id: str, status: str, return_info: Optional[str] = None): + def publish_job_status(self, feedback_data: dict, job_id: str, status: str, return_info: Optional[dict] = None): if self.mqtt_disable: return if return_info is None: - return_info = "{}" + return_info = {} jobdata = {"job_id": job_id, "data": feedback_data, "status": status, "return_info": return_info} self.client.publish(f"labs/{MQConfig.lab_id}/job/list/", json.dumps(jobdata), qos=2) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index fecb01c7..f4092cc8 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -243,6 +243,7 @@ class TaskScheduler: ) return False + # noinspection PyProtectedMember action_jobs = len(host_node._device_action_status[item.device_action_key].job_ids) free = not bool(action_jobs) @@ -539,7 +540,7 @@ class TaskScheduler: # 外部接口方法 def publish_job_status( - self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None + self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[dict] = None ) -> None: """发布作业状态,拦截最终结果(给HostNode调用的接口)""" if not self.message_sender.is_connected(): @@ -983,7 +984,7 @@ class WebSocketClient(BaseCommunicationClient): logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}") def publish_job_status( - self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None + self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[dict] = None ) -> None: """发布作业状态(转发给TaskScheduler)""" if self.task_scheduler: diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 872bc62e..42af1ca4 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -53,7 +53,7 @@ from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.utils.async_util import run_async_func from unilabos.utils.log import info, debug, warning, error, critical, logger, trace -from unilabos.utils.type_check import get_type_class, TypeEncoder, serialize_result_info +from unilabos.utils.type_check import get_type_class, TypeEncoder, serialize_result_info, get_result_info_str T = TypeVar("T") @@ -416,10 +416,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): liquid_volume=LIQUID_VOLUME, slot_on_deck=slot, ) - res.response = serialize_result_info("", True, ret) + res.response = get_result_info_str("", True, ret) except Exception as e: traceback.print_exc() - res.response = serialize_result_info(traceback.format_exc(), False, {}) + res.response = get_result_info_str(traceback.format_exc(), False, {}) return res # 接下来该根据bind_parent_id进行assign了,目前只有plr可以进行assign,不然没有办法输入到物料系统中 if bind_parent_id != self.node_name: @@ -837,7 +837,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): if attr_name in ["success", "reached_goal"]: setattr(result_msg, attr_name, True) elif attr_name == "return_info": - setattr(result_msg, attr_name, serialize_result_info(execution_error, execution_success, action_return_value)) + setattr(result_msg, attr_name, get_result_info_str(execution_error, execution_success, action_return_value)) ##### self.lab_logger().info(f"动作 {action_name} 完成并返回结果") return result_msg diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index a39dfa30..94b9f62d 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -713,22 +713,24 @@ class HostNode(BaseROS2DeviceNode): return_info_str = result_data.get("return_info") if return_info_str is not None: try: - ret = json.loads(return_info_str) - suc = ret.get("suc", False) + return_info = json.loads(return_info_str) + suc = return_info.get("suc", False) if not suc: status = "failed" except json.JSONDecodeError: status = "failed" + return_info = serialize_result_info("", False, result_data) + self.lab_logger().critical("错误的return_info类型,请断点修复") else: # 无 return_info 字段时,回退到 success 字段(若存在) suc_field = result_data.get("success") if isinstance(suc_field, bool): status = "success" if suc_field else "failed" - return_info_str = serialize_result_info("", suc_field, result_data) + return_info = serialize_result_info("", suc_field, result_data) else: # 最保守的回退:标记失败并返回空JSON status = "failed" - return_info_str = serialize_result_info("缺少return_info", False, result_data) + return_info = serialize_result_info("缺少return_info", False, result_data) self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id}): {status}") self.lab_logger().debug(f"[Host Node] Result data: {result_data}") @@ -736,7 +738,7 @@ class HostNode(BaseROS2DeviceNode): if job_id: for bridge in self.bridges: if hasattr(bridge, "publish_job_status"): - bridge.publish_job_status(result_data, item, status, return_info_str) + bridge.publish_job_status(result_data, item, status, return_info) def cancel_goal(self, goal_uuid: str) -> None: """取消目标""" diff --git a/unilabos/ros/nodes/presets/protocol_node.py b/unilabos/ros/nodes/presets/protocol_node.py index 23462142..857f8133 100644 --- a/unilabos/ros/nodes/presets/protocol_node.py +++ b/unilabos/ros/nodes/presets/protocol_node.py @@ -25,7 +25,7 @@ from unilabos.ros.msgs.message_converter import ( ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode from unilabos.utils.log import error -from unilabos.utils.type_check import serialize_result_info +from unilabos.utils.type_check import serialize_result_info, get_result_info_str class ROS2ProtocolNode(BaseROS2DeviceNode): @@ -314,7 +314,7 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): setattr( result, attr_name, - serialize_result_info(execution_error, execution_success, protocol_return_value), + get_result_info_str(execution_error, execution_success, protocol_return_value), ) self.lab_logger().info(f"协议 {protocol_name} 完成并返回结果") diff --git a/unilabos/utils/type_check.py b/unilabos/utils/type_check.py index c22de3c7..cffb4465 100644 --- a/unilabos/utils/type_check.py +++ b/unilabos/utils/type_check.py @@ -66,7 +66,7 @@ class ResultInfoEncoder(json.JSONEncoder): return str(obj) -def serialize_result_info(error: str, suc: bool, return_value=None) -> str: +def get_result_info_str(error: str, suc: bool, return_value=None) -> str: """ 序列化任务执行结果信息 @@ -81,3 +81,21 @@ def serialize_result_info(error: str, suc: bool, return_value=None) -> str: result_info = {"error": error, "suc": suc, "return_value": return_value} return json.dumps(result_info, ensure_ascii=False, cls=ResultInfoEncoder) + + + +def serialize_result_info(error: str, suc: bool, return_value=None) -> dict: + """ + 序列化任务执行结果信息 + + Args: + error: 错误信息字符串 + suc: 是否成功的布尔值 + return_value: 返回值,可以是任何类型 + + Returns: + JSON字符串格式的结果信息 + """ + result_info = {"error": error, "suc": suc, "return_value": return_value} + + return json.loads(json.dumps(result_info, ensure_ascii=False, cls=ResultInfoEncoder))