diff --git a/test/experiments/HPLC.json b/test/experiments/HPLC.json index 9e511b3c..51433f67 100644 --- a/test/experiments/HPLC.json +++ b/test/experiments/HPLC.json @@ -5,7 +5,7 @@ "name": "HPLC", "parent": null, "type": "device", - "class": "hplc", + "class": "hplc.agilent", "position": { "x": 620.6111111111111, "y": 171, diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 6d651ca1..a9dd8d50 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -59,6 +59,18 @@ def parse_args(): default=None, help="配置文件路径,支持.py格式的Python配置文件", ) + parser.add_argument( + "--port", + type=int, + default=8002, + help="信息页web服务的启动端口", + ) + parser.add_argument( + "--open_browser", + type=bool, + default=True, + help="是否在启动时打开信息页", + ) parser.add_argument( "--visual", choices=["rviz", "web","None"], diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index 8ed35f95..f604c72d 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -146,7 +146,7 @@ class MQTTClient: if self.mqtt_disable: return status = {"data": device_status.get(device_id, {}), "device_id": device_id} - address = f"labs/{MQConfig.lab_id}/devices" + address = f"labs/{MQConfig.lab_id}/devices/" self.client.publish(address, json.dumps(status), qos=2) logger.critical(f"Device status published: address: {address}, {status}") diff --git a/unilabos/app/web/pages.py b/unilabos/app/web/pages.py index 7216a66c..a08cebb5 100644 --- a/unilabos/app/web/pages.py +++ b/unilabos/app/web/pages.py @@ -92,19 +92,7 @@ def setup_web_pages(router: APIRouter) -> None: # 获取已加载的设备 if lab_registry: - # 设备类型 - for device_id, device_info in lab_registry.device_type_registry.items(): - msg = { - "id": device_id, - "name": device_info.get("name", "未命名"), - "file_path": device_info.get("file_path", ""), - "class_json": json.dumps( - device_info.get("class", {}), indent=4, ensure_ascii=False, cls=TypeEncoder - ), - } - mqtt_client.publish_registry(device_id, device_info) - devices.append(msg) - + devices = json.loads(json.dumps(lab_registry.obtain_registry_device_info(), ensure_ascii=False, cls=TypeEncoder)) # 资源类型 for resource_id, resource_info in lab_registry.resource_type_registry.items(): resources.append( diff --git a/unilabos/app/web/templates/status.html b/unilabos/app/web/templates/status.html index 30c3e6b8..f4ef011a 100644 --- a/unilabos/app/web/templates/status.html +++ b/unilabos/app/web/templates/status.html @@ -96,17 +96,19 @@ 设备ID 命名空间 + 机器名称 状态 {% for device_id, device_info in host_node_info.devices.items() %} {{ device_id }} {{ device_info.namespace }} + {{ device_info.machine_name }} {{ "在线" if device_info.is_online else "离线" }} {% else %} - 没有发现已管理的设备 + 没有发现已管理的设备 {% endfor %} @@ -218,6 +220,7 @@ Device ID 节点名称 命名空间 + 机器名称 状态项 动作数 @@ -227,6 +230,7 @@ {{ device_id }} {{ device_info.node_name }} {{ device_info.namespace }} + {{ device_info.machine_name|default("本地") }} {{ ros_node_info.device_topics.get(device_id, {})|length }} {{ ros_node_info.device_actions.get(device_id, {})|length }} @@ -329,7 +333,7 @@
-
{{ device.class_json }}
+
{{ device.class|tojson(indent=4) }}
{% if device.is_online %}
在线
diff --git a/unilabos/app/web/utils/host_utils.py b/unilabos/app/web/utils/host_utils.py index 0df5e816..6ceb068a 100644 --- a/unilabos/app/web/utils/host_utils.py +++ b/unilabos/app/web/utils/host_utils.py @@ -34,6 +34,7 @@ def get_host_node_info() -> Dict[str, Any]: "namespace": namespace, "is_online": f"{namespace}/{device_id}" in host_node._online_devices, "key": f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}", + "machine_name": host_node.device_machine_names.get(device_id, "未知"), } for device_id, namespace in host_node.devices_names.items() } @@ -41,9 +42,7 @@ def get_host_node_info() -> Dict[str, Any]: host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics)) # 获取动作客户端信息 for action_id, client in host_node._action_clients.items(): - host_info["action_clients"] = { - action_id: get_action_info(client, full_name=action_id) - } + host_info["action_clients"] = {action_id: get_action_info(client, full_name=action_id)} # 获取设备状态 host_info["device_status"] = host_node.device_status diff --git a/unilabos/app/web/utils/ros_utils.py b/unilabos/app/web/utils/ros_utils.py index 01733510..8e1c767c 100644 --- a/unilabos/app/web/utils/ros_utils.py +++ b/unilabos/app/web/utils/ros_utils.py @@ -12,6 +12,7 @@ from unilabos.app.web.utils.action_utils import get_action_info # 存储 ROS 节点信息的全局变量 ros_node_info = {"online_devices": {}, "device_topics": {}, "device_actions": {}} + def get_ros_node_info() -> Dict[str, Any]: """获取 ROS 节点信息,包括设备节点、发布的状态和动作 @@ -35,6 +36,13 @@ def update_ros_node_info() -> Dict[str, Any]: try: from unilabos.ros.nodes.base_device_node import registered_devices + from unilabos.ros.nodes.presets.host_node import HostNode + + # 尝试获取主机节点实例 + host_node = HostNode.get_instance(0) + device_machine_names = {} + if host_node: + device_machine_names = host_node.device_machine_names for device_id, device_info in registered_devices.items(): # 设备基本信息 @@ -42,6 +50,7 @@ def update_ros_node_info() -> Dict[str, Any]: "node_name": device_info["node_name"], "namespace": device_info["namespace"], "uuid": device_info["uuid"], + "machine_name": device_machine_names.get(device_id, "本地"), } # 设备话题(状态)信息 @@ -55,10 +64,7 @@ def update_ros_node_info() -> Dict[str, Any]: } # 设备动作信息 - result["device_actions"][device_id] = { - k: get_action_info(v, k) - for k, v in device_info["actions"].items() - } + result["device_actions"][device_id] = {k: get_action_info(v, k) for k, v in device_info["actions"].items()} # 更新全局变量 ros_node_info = result except Exception as e: diff --git a/unilabos/compile/clean_protocol.py b/unilabos/compile/clean_protocol.py index b2ab1414..655ce217 100644 --- a/unilabos/compile/clean_protocol.py +++ b/unilabos/compile/clean_protocol.py @@ -6,7 +6,7 @@ def generate_clean_protocol( G: nx.DiGraph, vessel: str, # Vessel to clean. solvent: str, # Solvent to clean vessel with. - volume: float = 25000.0, # Optional. Volume of solvent to clean vessel with. + volume: float = 25.0, # Optional. Volume of solvent to clean vessel with. temp: float = 25, # Optional. Temperature to heat vessel to while cleaning. repeats: int = 1, # Optional. Number of cleaning cycles to perform. ) -> list[dict]: @@ -27,7 +27,7 @@ def generate_clean_protocol( from_vessel = f"flask_{solvent}" waste_vessel = f"waste_workup" - transfer_flowrate = flowrate = 2500.0 + transfer_flowrate = flowrate = 2.5 # 生成泵操作的动作序列 for i in range(repeats): diff --git a/unilabos/compile/evaporate_protocol.py b/unilabos/compile/evaporate_protocol.py index 8c729666..15af5e11 100644 --- a/unilabos/compile/evaporate_protocol.py +++ b/unilabos/compile/evaporate_protocol.py @@ -24,8 +24,8 @@ def generate_evaporate_protocol( # 生成泵操作的动作序列 pump_action_sequence = [] - reactor_volume = 500000.0 - transfer_flowrate = flowrate = 2500.0 + reactor_volume = 500.0 + transfer_flowrate = flowrate = 2.5 # 开启冷凝器 pump_action_sequence.append({ diff --git a/unilabos/compile/pump_protocol.py b/unilabos/compile/pump_protocol.py index 9b4c2884..60670286 100644 --- a/unilabos/compile/pump_protocol.py +++ b/unilabos/compile/pump_protocol.py @@ -7,7 +7,7 @@ def generate_pump_protocol( from_vessel: str, to_vessel: str, volume: float, - flowrate: float = 500.0, + flowrate: float = 0.5, transfer_flowrate: float = 0, ) -> list[dict]: """ @@ -141,11 +141,11 @@ def generate_pump_protocol_with_rinsing( time: float = 0, viscous: bool = False, rinsing_solvent: str = "air", - rinsing_volume: float = 5000.0, + rinsing_volume: float = 5.0, rinsing_repeats: int = 2, solid: bool = False, - flowrate: float = 2500.0, - transfer_flowrate: float = 500.0, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, ) -> list[dict]: """ Generates a pump protocol for transferring a specified volume between vessels, including rinsing steps with a chosen solvent. This function constructs a sequence of pump actions based on the provided parameters and the shortest path in a directed graph. @@ -159,11 +159,11 @@ def generate_pump_protocol_with_rinsing( time (float, optional): Time over which to perform the transfer (default is 0). viscous (bool, optional): Indicates if the fluid is viscous (default is False). rinsing_solvent (str, optional): The solvent to use for rinsing (default is "air"). - rinsing_volume (float, optional): The volume of rinsing solvent to use (default is 5000.0). + rinsing_volume (float, optional): The volume of rinsing solvent to use (default is 5.0). rinsing_repeats (int, optional): The number of times to repeat rinsing (default is 2). solid (bool, optional): Indicates if the transfer involves a solid (default is False). - flowrate (float, optional): The flow rate for the transfer (default is 2500.0). 最终注入容器B时的流速 - transfer_flowrate (float, optional): The flow rate for the transfer action (default is 500.0). 泵骨架中转移流速(若不指定,默认与注入流速相同) + flowrate (float, optional): The flow rate for the transfer (default is 2.5). 最终注入容器B时的流速 + transfer_flowrate (float, optional): The flow rate for the transfer action (default is 0.5). 泵骨架中转移流速(若不指定,默认与注入流速相同) Returns: list[dict]: A sequence of pump actions to be executed for the transfer and rinsing process. 泵操作的动作序列. @@ -172,7 +172,7 @@ def generate_pump_protocol_with_rinsing( AssertionError: If the number of rinsing solvents does not match the number of rinsing repeats. Examples: - pump_protocol = generate_pump_protocol_with_rinsing(G, "vessel_A", "vessel_B", 100.0, rinsing_solvent="water") + pump_protocol = generate_pump_protocol_with_rinsing(G, "vessel_A", "vessel_B", 0.1, rinsing_solvent="water") """ air_vessel = "flask_air" waste_vessel = f"waste_workup" diff --git a/unilabos/compile/separate_protocol.py b/unilabos/compile/separate_protocol.py index 0ba0d1c8..cbb028cb 100644 --- a/unilabos/compile/separate_protocol.py +++ b/unilabos/compile/separate_protocol.py @@ -11,7 +11,7 @@ def generate_separate_protocol( to_vessel: str, # Vessel to send product phase to. waste_phase_to_vessel: str, # Optional. Vessel to send waste phase to. solvent: str, # Optional. Solvent to add to separation vessel after contents of from_vessel has been transferred to create two phases. - solvent_volume: float = 50000, # Optional. Volume of solvent to add. + solvent_volume: float = 50, # Optional. Volume of solvent to add (mL). through: str = "", # Optional. Solid chemical to send product phase through on way to to_vessel, e.g. 'celite'. repeats: int = 1, # Optional. Number of separations to perform. stir_time: float = 30, # Optional. Time stir for after adding solvent, before separation of phases. @@ -32,7 +32,7 @@ def generate_separate_protocol( # 生成泵操作的动作序列 pump_action_sequence = [] - reactor_volume = 500000.0 + reactor_volume = 500.0 waste_vessel = waste_phase_to_vessel # TODO:通过物料管理系统找到溶剂的容器 @@ -46,7 +46,7 @@ def generate_separate_protocol( separator_controller = f"{separation_vessel}_controller" separation_vessel_bottom = f"flask_{separation_vessel}" - transfer_flowrate = flowrate = 2500.0 + transfer_flowrate = flowrate = 2.5 if from_vessel != separation_vessel: pump_action_sequence.append( @@ -140,8 +140,8 @@ def generate_separate_protocol( "action_kwargs": { "from_vessel": separation_vessel_bottom, "to_vessel": to_vessel, - "volume": 250000.0, - "time": 250000.0 / flowrate, + "volume": 250.0, + "time": 250.0 / flowrate, # "transfer_flowrate": transfer_flowrate, } } @@ -164,8 +164,8 @@ def generate_separate_protocol( "action_kwargs": { "from_vessel": separation_vessel_bottom, "to_vessel": waste_vessel, - "volume": 250000.0, - "time": 250000.0 / flowrate, + "volume": 250.0, + "time": 250.0 / flowrate, # "transfer_flowrate": transfer_flowrate, } } @@ -179,8 +179,8 @@ def generate_separate_protocol( "action_kwargs": { "from_vessel": separation_vessel_bottom, "to_vessel": waste_vessel, - "volume": 250000.0, - "time": 250000.0 / flowrate, + "volume": 250.0, + "time": 250.0 / flowrate, # "transfer_flowrate": transfer_flowrate, } } @@ -203,8 +203,8 @@ def generate_separate_protocol( "action_kwargs": { "from_vessel": separation_vessel_bottom, "to_vessel": to_vessel, - "volume": 250000.0, - "time": 250000.0 / flowrate, + "volume": 250.0, + "time": 250.0 / flowrate, # "transfer_flowrate": transfer_flowrate, } } @@ -221,8 +221,8 @@ def generate_separate_protocol( "action_kwargs": { "from_vessel": to_vessel, "to_vessel": separation_vessel, - "volume": 250000.0, - "time": 250000.0 / flowrate, + "volume": 250.0, + "time": 250.0 / flowrate, # "transfer_flowrate": transfer_flowrate, } } diff --git a/unilabos/registry/device_comms/serial.yaml b/unilabos/registry/device_comms/serial.yaml index 3a0d72a5..86cf7e50 100644 --- a/unilabos/registry/device_comms/serial.yaml +++ b/unilabos/registry/device_comms/serial.yaml @@ -1,7 +1,7 @@ serial: description: Serial communication interface, used when sharing same serial port for multiple devices class: - module: unilabos.ros.nodes.presets:ROS2SerialNode + module: unilabos.ros.nodes.presets.serial_node:ROS2SerialNode type: ros2 schema: properties: {} \ No newline at end of file diff --git a/unilabos/registry/devices/characterization_optic.yaml b/unilabos/registry/devices/characterization_optic.yaml index 0164ae4a..52ffc87a 100644 --- a/unilabos/registry/devices/characterization_optic.yaml +++ b/unilabos/registry/devices/characterization_optic.yaml @@ -19,6 +19,49 @@ raman_home_made: status: type: string required: - - status + - status additionalProperties: false - type: object \ No newline at end of file + type: object +hplc.agilent: + description: HPLC device + class: + module: unilabos.devices.hplc.AgilentHPLC:HPLCDriver + type: python + status_types: + device_status: String + could_run: Bool + driver_init_ok: Bool + is_running: Bool + finish_status: String + status_text: String + action_value_mappings: + execute_command_from_outer: + type: SendCmd + goal: + command: command + feedback: {} + result: + success: success + schema: + properties: + device_status: + type: string + could_run: + type: boolean + driver_init_ok: + type: boolean + is_running: + type: boolean + finish_status: + type: string + status_text: + type: string + required: + - device_status + - could_run + - driver_init_ok + - is_running + - finish_status + - status_text + additionalProperties: false + type: object diff --git a/unilabos/registry/devices/organic_miscellaneous.yaml b/unilabos/registry/devices/organic_miscellaneous.yaml index 3085efac..a3f6f0e3 100644 --- a/unilabos/registry/devices/organic_miscellaneous.yaml +++ b/unilabos/registry/devices/organic_miscellaneous.yaml @@ -1,7 +1,7 @@ separator.homemade: description: Separator device with homemade grbl controller class: - module: unilabos.devices.separator.homemade_grbl_conductivity:Separator_Controller + module: unilabos.devices.separator.homemade_grbl_conductivity:SeparatorController type: python status_types: sensordata: Float64 diff --git a/unilabos/registry/devices/pump_and_valve.yaml b/unilabos/registry/devices/pump_and_valve.yaml index ba35700d..5fdd8606 100644 --- a/unilabos/registry/devices/pump_and_valve.yaml +++ b/unilabos/registry/devices/pump_and_valve.yaml @@ -3,6 +3,10 @@ syringe_pump_with_valve.runze: class: module: unilabos.devices.pump_and_valve.runze_backbone:RunzeSyringePump type: python + hardware_interface: + name: hardware_interface + read: send_command + write: send_command schema: type: object properties: diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 9c7a95b9..feba3fef 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -1,3 +1,4 @@ +import json import os import sys from pathlib import Path @@ -6,8 +7,9 @@ from typing import Any import yaml from unilabos.utils import logger -from unilabos.ros.msgs.message_converter import msg_converter_manager +from unilabos.ros.msgs.message_converter import msg_converter_manager, ros_action_to_json_schema from unilabos.utils.decorator import singleton +from unilabos.utils.type_check import TypeEncoder DEFAULT_PATHS = [Path(__file__).absolute().parent] @@ -129,6 +131,7 @@ class Registry: action_config["type"] = self._replace_type_with_class( action_config["type"], device_id, f"动作 {action_name}" ) + action_config["schema"] = ros_action_to_json_schema(action_config["type"]) self.device_type_registry.update(data) @@ -143,6 +146,16 @@ class Registry: f"[UniLab Registry] Device File-{i+1}/{len(files)} Not Valid YAML File: {file.absolute()}" ) + def obtain_registry_device_info(self): + devices = [] + for device_id, device_info in self.device_type_registry.items(): + msg = { + "id": device_id, + **device_info + } + devices.append(msg) + return devices + # 全局单例实例 lab_registry = Registry() diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index e5c16bc4..71e4aaeb 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -1,4 +1,5 @@ import importlib +import inspect import json from typing import Union import numpy as np @@ -384,7 +385,11 @@ def resource_ulab_to_plr(resource: dict, plr_model=False) -> "ResourcePLR": d = resource_ulab_to_plr_inner(resource) """无法通过Resource进行反序列化,例如TipSpot必须内部序列化好,直接用TipSpot序列化会多参数,导致出错""" from pylabrobot.utils.object_parsing import find_subclass - resource_plr = find_subclass(d["type"], ResourcePLR).deserialize(d, allow_marshal=True) + sub_cls = find_subclass(d["type"], ResourcePLR) + spect = inspect.signature(sub_cls) + if "category" not in spect.parameters: + d.pop("category") + resource_plr = sub_cls.deserialize(d, allow_marshal=True) resource_plr.load_all_state(all_states) return resource_plr diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py index 1ae37906..974359a3 100644 --- a/unilabos/ros/main_slave_run.py +++ b/unilabos/ros/main_slave_run.py @@ -1,14 +1,17 @@ +import copy +import json import os import traceback from typing import Optional, Dict, Any, List import rclpy from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceAdd # type: ignore +from unilabos_msgs.srv import ResourceAdd, SerialCommand # type: ignore from rclpy.executors import MultiThreadedExecutor from rclpy.node import Node from rclpy.timer import Timer +from unilabos.registry.registry import lab_registry from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( convert_to_ros_msg, @@ -17,6 +20,7 @@ from unilabos.ros.nodes.presets.host_node import HostNode from unilabos.ros.x.rclpyx import run_event_loop_in_thread from unilabos.utils import logger from unilabos.config.config import BasicConfig +from unilabos.utils.type_check import TypeEncoder def exit() -> None: @@ -82,7 +86,7 @@ def slave( """从节点函数""" rclpy.init(args=args) rclpy.__executor = executor = MultiThreadedExecutor() - + devices_config_copy = copy.deepcopy(devices_config) for device_id, device_config in devices_config.items(): d = initialize_device_from_dict(device_id, device_config) if d is None: @@ -98,17 +102,28 @@ def slave( n = Node(f"slaveMachine_{machine_name}", parameter_overrides=[]) executor.add_node(n) - if BasicConfig.slave_no_host: - # 确保ResourceAdd存在 - if "ResourceAdd" in globals(): - rclient = n.create_client(ResourceAdd, "/resources/add") - rclient.wait_for_service() # FIXME 可能一直等待,加一个参数 + if not BasicConfig.slave_no_host: + sclient = n.create_client(SerialCommand, "/node_info_update") + sclient.wait_for_service() + + request = SerialCommand.Request() + request.command = json.dumps({ + "machine_name": machine_name, + "type": "slave", + "devices_config": devices_config_copy, + "registry_config": lab_registry.obtain_registry_device_info() + }, ensure_ascii=False, cls=TypeEncoder) + response = sclient.call_async(request).result() + logger.info(f"Slave node info updated.") + + rclient = n.create_client(ResourceAdd, "/resources/add") + rclient.wait_for_service() # FIXME 可能一直等待,加一个参数 + + request = ResourceAdd.Request() + request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources_config] + response = rclient.call_async(request).result() + logger.info(f"Slave resource added.") - request = ResourceAdd.Request() - request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources_config] - response = rclient.call_async(request) - else: - print("Warning: ResourceAdd service not available") run_event_loop_in_thread() diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 1424b0c6..0ff03a68 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -342,9 +342,15 @@ class BaseROS2DeviceNode(Node, Generic[T]): else: return getattr(self.driver_instance, attr_name) except AttributeError as ex: - self.lab_logger().error( - f"publish error, {str(type(self.driver_instance))[8:-2]} has no attribute '{attr_name}'" - ) + if ex.args[0].startswith(f"AttributeError: '{self.driver_instance.__class__.__name__}' object"): + self.lab_logger().error( + f"publish error, {str(type(self.driver_instance))[8:-2]} has no attribute '{attr_name}'" + ) + else: + self.lab_logger().error( + f"publish error, when {str(type(self.driver_instance))[8:-2]} getting attribute '{attr_name}'" + ) + self.lab_logger().error(traceback.format_exc()) self._property_publishers[attr_name] = PropertyPublisher( self, attr_name, get_device_attr, msg_type, initial_period, self._print_publish @@ -430,6 +436,15 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().info(f"同步执行动作 {ACTION}") future = self._executor.submit(ACTION, **action_kwargs) + def _handle_future_exception(fut): + try: + fut.result() + except Exception as e: + error(f"同步任务 {ACTION.__name__} 报错了") + error(traceback.format_exc()) + + future.add_done_callback(_handle_future_exception) + action_type = action_value_mapping["type"] feedback_msg_types = action_type.Feedback.get_fields_and_field_types() result_msg_types = action_type.Result.get_fields_and_field_types() diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index d5280a42..2c114412 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -1,17 +1,20 @@ import copy +import json import threading import time +import traceback import uuid from typing import Optional, Dict, Any, List, ClassVar, Set from action_msgs.msg import GoalStatus from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList # type: ignore +from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, SerialCommand # type: ignore from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.service import Service from unique_identifier_msgs.msg import UUID +from unilabos.registry.registry import lab_registry from unilabos.resources.registry import add_schema from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( @@ -19,7 +22,8 @@ from unilabos.ros.msgs.message_converter import ( get_ros_type_by_msgname, convert_from_ros_msg, convert_to_ros_msg, - msg_converter_manager, ros_action_to_json_schema, + msg_converter_manager, + ros_action_to_json_schema, ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.presets.controller_node import ControllerNode @@ -94,6 +98,7 @@ class HostNode(BaseROS2DeviceNode): # 创建设备、动作客户端和目标存储 self.devices_names: Dict[str, str] = {} # 存储设备名称和命名空间的映射 self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例 + self.device_machine_names: Dict[str, str] = {} # 存储设备ID到机器名称的映射 self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例 self._action_value_mappings: Dict[str, Dict] = ( {} @@ -105,18 +110,24 @@ class HostNode(BaseROS2DeviceNode): self._subscribed_topics = set() # 用于跟踪已订阅的话题 # 创建物料增删改查服务(非客户端) - self._init_resource_service() + self._init_host_service() self.device_status = {} # 用来存储设备状态 self.device_status_timestamps = {} # 用来存储设备状态最后更新时间 + from unilabos.app.mq import mqtt_client + for device_config in lab_registry.obtain_registry_device_info(): + mqtt_client.publish_registry(device_config["id"], device_config) + # 首次发现网络中的设备 self._discover_devices() # 初始化所有本机设备节点,多一次过滤,防止重复初始化 for device_id, device_config in devices_config.items(): if device_config.get("type", "device") != "device": - self.lab_logger().debug(f"[Host Node] Skipping type {device_config['type']} {device_id} already existed, skipping.") + self.lab_logger().debug( + f"[Host Node] Skipping type {device_config['type']} {device_id} already existed, skipping." + ) continue if device_id not in self.devices_names: self.initialize_device(device_id, device_config) @@ -132,10 +143,14 @@ class HostNode(BaseROS2DeviceNode): controller_config["update_rate"] = update_rate self.initialize_controller(controller_id, controller_config) - for bridge in self.bridges: - if hasattr(bridge, "resource_add"): - self.lab_logger().info("[Host Node-Resource] Adding resources to bridge.") - bridge.resource_add(add_schema(resources_config)) + try: + for bridge in self.bridges: + if hasattr(bridge, "resource_add"): + self.lab_logger().info("[Host Node-Resource] Adding resources to bridge.") + bridge.resource_add(add_schema(resources_config)) + except Exception as ex: + self.lab_logger().error("[Host Node-Resource] 添加物料出错!") + self.lab_logger().error(traceback.format_exc()) # 创建定时器,定期发现设备 self._discovery_timer = self.create_timer( @@ -221,6 +236,7 @@ class HostNode(BaseROS2DeviceNode): ) self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") from unilabos.app.mq import mqtt_client + info_with_schema = ros_action_to_json_schema(action_type) mqtt_client.publish_actions(action_id, info_with_schema) except Exception as e: @@ -245,6 +261,7 @@ class HostNode(BaseROS2DeviceNode): return # noinspection PyProtectedMember self.devices_names[device_id] = d._ros_node.namespace + self.device_machine_names[device_id] = "本地" self.devices_instances[device_id] = d # noinspection PyProtectedMember for action_name, action_value_mapping in d._ros_node._action_value_mappings.items(): @@ -254,6 +271,7 @@ class HostNode(BaseROS2DeviceNode): self._action_clients[action_id] = ActionClient(self, action_type, action_id) self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") from unilabos.app.mq import mqtt_client + info_with_schema = ros_action_to_json_schema(action_type) mqtt_client.publish_actions(action_id, info_with_schema) else: @@ -468,7 +486,7 @@ class HostNode(BaseROS2DeviceNode): """Resource""" - def _init_resource_service(self): + def _init_host_service(self): self._resource_services: Dict[str, Service] = { "resource_add": self.create_service( ResourceAdd, "/resources/add", self._resource_add_callback, callback_group=ReentrantCallbackGroup() @@ -491,8 +509,41 @@ class HostNode(BaseROS2DeviceNode): "resource_list": self.create_service( ResourceList, "/resources/list", self._resource_list_callback, callback_group=ReentrantCallbackGroup() ), + "node_info_update": self.create_service( + SerialCommand, + "/node_info_update", + self._node_info_update_callback, + callback_group=ReentrantCallbackGroup(), + ), } + def _node_info_update_callback(self, request, response): + """ + 更新节点信息回调 + """ + self.lab_logger().info(f"[Host Node] Node info update request received: {request}") + try: + from unilabos.app.mq import mqtt_client + + info = json.loads(request.command) + machine_name = info["machine_name"] + devices_config = info["devices_config"] + registry_config = info["registry_config"] + + # 更新设备机器名称映射 + for device_id in devices_config.keys(): + self.device_machine_names[device_id] = machine_name + self.lab_logger().debug(f"[Host Node] Updated machine name for device {device_id}: {machine_name}") + + for device_config in registry_config: + mqtt_client.publish_registry(device_config["id"], device_config) + self.lab_logger().info(f"[Host Node] Node info update: {info}") + response.response = "OK" + except Exception as e: + self.lab_logger().error(f"[Host Node] Error updating node info: {str(e)}") + response.response = "ERROR" + return response + def _resource_add_callback(self, request, response): """ 添加资源回调 diff --git a/unilabos/ros/nodes/presets/protocol_node.py b/unilabos/ros/nodes/presets/protocol_node.py index 323f6880..c4dfd083 100644 --- a/unilabos/ros/nodes/presets/protocol_node.py +++ b/unilabos/ros/nodes/presets/protocol_node.py @@ -20,7 +20,7 @@ from unilabos.ros.msgs.message_converter import ( convert_from_ros_msg, convert_from_ros_msg_with_mapping, ) -from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode class ROS2ProtocolNode(BaseROS2DeviceNode): @@ -55,34 +55,39 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): ) # 初始化子设备 - communication_node_id = None + self.communication_node_id_to_instance = {} + for device_id, device_config in self.children.items(): if device_config.get("type", "device") != "device": self.lab_logger().debug(f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping.") continue - d = self.initialize_device(device_id, device_config) + try: + d = self.initialize_device(device_id, device_config) + except Exception as ex: + self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}") + d = None if d is None: continue if "serial_" in device_id or "io_" in device_id: - communication_node_id = device_id + self.communication_node_id_to_instance[device_id] = d continue # 设置硬件接口代理 - if d and hasattr(d, "_hardware_interface"): + if d: if ( - hasattr(d, d._hardware_interface["name"]) - and hasattr(d, d._hardware_interface["write"]) - and (d._hardware_interface["read"] is None or hasattr(d, d._hardware_interface["read"])) + hasattr(d.driver_instance, d.ros_node_instance._hardware_interface["name"]) + and hasattr(d.driver_instance, d.ros_node_instance._hardware_interface["write"]) + and (d.ros_node_instance._hardware_interface["read"] is None or hasattr(d.driver_instance, d.ros_node_instance._hardware_interface["read"])) ): - name = getattr(d, d._hardware_interface["name"]) - read = d._hardware_interface.get("read", None) - write = d._hardware_interface.get("write", None) + name = getattr(d.driver_instance, d.ros_node_instance._hardware_interface["name"]) + read = d.ros_node_instance._hardware_interface.get("read", None) + write = d.ros_node_instance._hardware_interface.get("write", None) # 如果硬件接口是字符串,通过通信设备提供 - if isinstance(name, str) and communication_node_id in self.sub_devices: - self._setup_hardware_proxy(d, self.sub_devices[communication_node_id], read, write) + if isinstance(name, str) and name in self.sub_devices: + self._setup_hardware_proxy(d, self.sub_devices[name], read, write) def _setup_protocol_names(self, protocol_type): # 处理协议类型 @@ -234,11 +239,11 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): """还没有改过的部分""" - def _setup_hardware_proxy(self, device, communication_device, read_method, write_method): + def _setup_hardware_proxy(self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method): """为设备设置硬件接口代理""" - extra_info = [getattr(device, info) for info in communication_device._hardware_interface.get("extra_info", [])] - write_func = getattr(communication_device, communication_device._hardware_interface["write"]) - read_func = getattr(communication_device, communication_device._hardware_interface["read"]) + extra_info = [getattr(device.driver_instance, info) for info in communication_device.ros_node_instance._hardware_interface.get("extra_info", [])] + write_func = getattr(communication_device.ros_node_instance, communication_device.ros_node_instance._hardware_interface["write"]) + read_func = getattr(communication_device.ros_node_instance, communication_device.ros_node_instance._hardware_interface["read"]) def _read(): return read_func(*extra_info) @@ -247,9 +252,9 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): return write_func(*extra_info, command) if read_method: - setattr(device, read_method, _read) + setattr(device.driver_instance, read_method, _read) if write_method: - setattr(device, write_method, _write) + setattr(device.driver_instance, write_method, _write) async def _update_resources(self, goal, protocol_kwargs): diff --git a/unilabos/ros/nodes/presets/serial_node.py b/unilabos/ros/nodes/presets/serial_node.py index 6f01a07a..545682bd 100644 --- a/unilabos/ros/nodes/presets/serial_node.py +++ b/unilabos/ros/nodes/presets/serial_node.py @@ -21,6 +21,7 @@ class ROS2SerialNode(BaseROS2DeviceNode): self.hardware_interface = Serial(baudrate=baudrate, port=port) except (OSError, SerialException) as e: # 因为还没调用父类初始化,无法使用日志,直接抛出异常 + # print(f"Failed to connect to serial port {port} at {baudrate} baudrate.") raise RuntimeError(f"Failed to connect to serial port {port} at {baudrate} baudrate.") from e # 初始化BaseROS2DeviceNode,使用自身作为driver_instance @@ -46,7 +47,7 @@ class ROS2SerialNode(BaseROS2DeviceNode): self.lab_logger().info(f"【ROS2SerialNode.__init__】创建串口写入服务: serialwrite") def send_command(self, command: str): - self.lab_logger().info(f"【ROS2SerialNode.send_command】发送命令: {command}") + # self.lab_logger().debug(f"【ROS2SerialNode.send_command】发送命令: {command}") with self._query_lock: if self._closing: self.lab_logger().error(f"【ROS2SerialNode.send_command】设备正在关闭,无法发送命令") @@ -58,23 +59,23 @@ class ROS2SerialNode(BaseROS2DeviceNode): response = self.hardware_interface.write(full_command_data) # time.sleep(0.05) output = self._receive(self.hardware_interface.read_until(b"\n")) - self.lab_logger().info(f"【ROS2SerialNode.send_command】接收响应: {output}") + # self.lab_logger().debug(f"【ROS2SerialNode.send_command】接收响应: {output}") return output def read_data(self): - self.lab_logger().debug(f"【ROS2SerialNode.read_data】读取数据") + # self.lab_logger().debug(f"【ROS2SerialNode.read_data】读取数据") with self._query_lock: if self._closing: self.lab_logger().error(f"【ROS2SerialNode.read_data】设备正在关闭,无法读取数据") raise RuntimeError data = self.hardware_interface.read_until(b"\n") result = self._receive(data) - self.lab_logger().debug(f"【ROS2SerialNode.read_data】读取到数据: {result}") + # self.lab_logger().debug(f"【ROS2SerialNode.read_data】读取到数据: {result}") return result def _receive(self, data: bytes): ascii_string = "".join(chr(byte) for byte in data) - self.lab_logger().debug(f"【ROS2SerialNode._receive】接收数据: {ascii_string}") + # self.lab_logger().debug(f"【ROS2SerialNode._receive】接收数据: {ascii_string}") return ascii_string def handle_serial_request(self, request, response):