From 19027350fb90c02f905181c2621610950a2c6f03 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Fri, 29 Aug 2025 02:47:20 +0800 Subject: [PATCH] feat: workstation example --- .../comprehensive_slim.json | 1689 ++++++++++++++++- unilabos/devices/workstation/__init__.py | 0 .../coin_cell_assembly/__init__.py | 0 .../devices/workstation/workstation_base.py | 266 ++- unilabos/registry/devices/work_station.yaml | 6 +- unilabos/resources/graphio.py | 12 +- unilabos/ros/nodes/base_device_node.py | 67 +- unilabos/ros/nodes/presets/workstation.py | 676 ++++++- .../ros/nodes/presets/workstation_node.py | 603 ------ unilabos/ros/nodes/resource_tracker.py | 35 +- unilabos/ros/utils/driver_creator.py | 6 +- 11 files changed, 2533 insertions(+), 827 deletions(-) create mode 100644 unilabos/devices/workstation/__init__.py create mode 100644 unilabos/devices/workstation/coin_cell_assembly/__init__.py delete mode 100644 unilabos/ros/nodes/presets/workstation_node.py diff --git a/test/experiments/comprehensive_protocol/comprehensive_slim.json b/test/experiments/comprehensive_protocol/comprehensive_slim.json index d9dd773d..2eed369c 100644 --- a/test/experiments/comprehensive_protocol/comprehensive_slim.json +++ b/test/experiments/comprehensive_protocol/comprehensive_slim.json @@ -4,11 +4,12 @@ "id": "OrganicSynthesisStation", "name": "有机化学流程综合测试工作站", "children": [ - "heater_1" + "heater_1", + "deck" ], "parent": null, "type": "device", - "class": "workstation", + "class": "workstation.example", "position": { "x": 600, "y": 400, @@ -40,7 +41,13 @@ "DryProtocol", "HydrogenateProtocol", "RecrystallizeProtocol" - ] + ], + "station_resource": { + "data": { + "_resource_child_name": "deck", + "_resource_type": "pylabrobot.resources.opentrons.deck:OTDeck" + } + } }, "data": {} }, @@ -64,6 +71,1682 @@ "status": "Idle", "current_temp": 25.0 } + }, + { + "id": "deck", + "name": "deck", + "sample_id": null, + "children": [ + "tip_rack", + "plate_well" + ], + "parent": "OrganicSynthesisStation", + "type": "deck", + "class": "OTDeck", + "position": { + "x": 0, + "y": 0, + "z": 0 + }, + "config": { + "type": "OTDeck", + "with_trash": false, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + } + }, + "data": {} + }, + { + "id": "tip_rack", + "name": "tip_rack", + "sample_id": null, + "children": [ + "tip_rack_A1", + "tip_rack_B1", + "tip_rack_C1", + "tip_rack_D1", + "tip_rack_E1", + "tip_rack_F1", + "tip_rack_G1", + "tip_rack_H1", + "tip_rack_A2", + "tip_rack_B2", + "tip_rack_C2", + "tip_rack_D2", + "tip_rack_E2", + "tip_rack_F2", + "tip_rack_G2", + "tip_rack_H2" + ], + "parent": "deck", + "type": "plate", + "class": "opentrons_96_filtertiprack_1000ul", + "position": { + "x": 0, + "y": 0, + "z": 69 + }, + "config": { + "type": "TipRack", + "size_x": 122.4, + "size_y": 82.6, + "size_z": 20.0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_rack", + "model": "HTF", + "ordering": [ + "A1", + "B1", + "C1", + "D1", + "E1", + "F1", + "G1", + "H1", + "A2", + "B2", + "C2", + "D2", + "E2", + "F2", + "G2", + "H2" + ] + }, + "data": {} + }, + { + "id": "tip_rack_A1", + "name": "tip_rack_A1", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 7.2, + "y": 68.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_B1", + "name": "tip_rack_B1", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 7.2, + "y": 59.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_C1", + "name": "tip_rack_C1", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 7.2, + "y": 50.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_D1", + "name": "tip_rack_D1", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 7.2, + "y": 41.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_E1", + "name": "tip_rack_E1", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 7.2, + "y": 32.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_F1", + "name": "tip_rack_F1", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 7.2, + "y": 23.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_G1", + "name": "tip_rack_G1", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 7.2, + "y": 14.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_H1", + "name": "tip_rack_H1", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 7.2, + "y": 5.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_A2", + "name": "tip_rack_A2", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 16.2, + "y": 68.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_B2", + "name": "tip_rack_B2", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 16.2, + "y": 59.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_C2", + "name": "tip_rack_C2", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 16.2, + "y": 50.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_D2", + "name": "tip_rack_D2", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 16.2, + "y": 41.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_E2", + "name": "tip_rack_E2", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 16.2, + "y": 32.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_F2", + "name": "tip_rack_F2", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 16.2, + "y": 23.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_G2", + "name": "tip_rack_G2", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 16.2, + "y": 14.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "tip_rack_H2", + "name": "tip_rack_H2", + "sample_id": null, + "children": [], + "parent": "tip_rack", + "type": "device", + "class": "", + "position": { + "x": 16.2, + "y": 5.3, + "z": 9.47 + }, + "config": { + "type": "TipSpot", + "size_x": 9.0, + "size_y": 9.0, + "size_z": 0, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "tip_spot", + "model": null, + "prototype_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + }, + "data": { + "tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + }, + "tip_state": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + }, + "pending_tip": { + "type": "Tip", + "total_tip_length": 39.2, + "has_filter": true, + "maximal_volume": 20.0, + "fitting_depth": 3.29 + } + } + }, + { + "id": "plate_well", + "name": "plate_well", + "sample_id": null, + "children": [ + "plate_well_A1", + "plate_well_B1", + "plate_well_C1", + "plate_well_D1", + "plate_well_E1", + "plate_well_F1", + "plate_well_G1", + "plate_well_H1", + "plate_well_A11", + "plate_well_B11", + "plate_well_C11", + "plate_well_D11", + "plate_well_E11", + "plate_well_F11", + "plate_well_G11", + "plate_well_H11" + ], + "parent": "deck", + "type": "plate", + "class": "nest_96_wellplate_2ml_deep", + "position": { + "x": 265.0, + "y": 0, + "z": 69 + }, + "config": { + "type": "Plate", + "size_x": 127.76, + "size_y": 85.48, + "size_z": 14.2, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "plate", + "model": "Cor_96_wellplate_360ul_Fb", + "ordering": [ + "A1", + "B1", + "C1", + "D1", + "E1", + "F1", + "G1", + "H1", + "A11", + "B11", + "C11", + "D11", + "E11", + "F11", + "G11", + "H11" + ] + }, + "data": {} + }, + { + "id": "plate_well_A1", + "name": "plate_well_A1", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 10.87, + "y": 70.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_B1", + "name": "plate_well_B1", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 10.87, + "y": 61.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_C1", + "name": "plate_well_C1", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 10.87, + "y": 52.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_D1", + "name": "plate_well_D1", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 10.87, + "y": 43.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_E1", + "name": "plate_well_E1", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 10.87, + "y": 34.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_F1", + "name": "plate_well_F1", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 10.87, + "y": 25.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_G1", + "name": "plate_well_G1", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 10.87, + "y": 16.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_H1", + "name": "plate_well_H1", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 10.87, + "y": 7.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_A11", + "name": "plate_well_A11", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 100.87, + "y": 70.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_B11", + "name": "plate_well_B11", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 100.87, + "y": 61.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_C11", + "name": "plate_well_C11", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 100.87, + "y": 52.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_D11", + "name": "plate_well_D11", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 100.87, + "y": 43.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_E11", + "name": "plate_well_E11", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 100.87, + "y": 34.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_F11", + "name": "plate_well_F11", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 100.87, + "y": 25.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_G11", + "name": "plate_well_G11", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 100.87, + "y": 16.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } + }, + { + "id": "plate_well_H11", + "name": "plate_well_H11", + "sample_id": null, + "children": [], + "parent": "plate_well", + "type": "device", + "class": "", + "position": { + "x": 100.87, + "y": 7.77, + "z": 3.03 + }, + "config": { + "type": "Well", + "size_x": 6.86, + "size_y": 6.86, + "size_z": 10.67, + "rotation": { + "x": 0, + "y": 0, + "z": 0, + "type": "Rotation" + }, + "category": "well", + "model": null, + "max_volume": 360, + "material_z_thickness": 0.5, + "compute_volume_from_height": null, + "compute_height_from_volume": null, + "bottom_type": "flat", + "cross_section_type": "circle" + }, + "data": { + "liquids": [], + "pending_liquids": [], + "liquid_history": [] + } } ], "links": [] diff --git a/unilabos/devices/workstation/__init__.py b/unilabos/devices/workstation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/devices/workstation/coin_cell_assembly/__init__.py b/unilabos/devices/workstation/coin_cell_assembly/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/devices/workstation/workstation_base.py b/unilabos/devices/workstation/workstation_base.py index 529acfd3..5337f6da 100644 --- a/unilabos/devices/workstation/workstation_base.py +++ b/unilabos/devices/workstation/workstation_base.py @@ -5,27 +5,26 @@ Workstation Base Class - 简化版 基于PLR Deck的简化工作站架构 专注于核心物料系统和工作流管理 """ + +import collections import time from typing import Dict, Any, List, Optional, Union from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum +from pylabrobot.resources import Deck, Plate, Resource as PLRResource -try: - from pylabrobot.resources import Deck, Resource as PLRResource - PYLABROBOT_AVAILABLE = True -except ImportError: - PYLABROBOT_AVAILABLE = False - class Deck: pass - class PLRResource: pass +from pylabrobot.resources.coordinate import Coordinate +from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode from unilabos.utils.log import logger class WorkflowStatus(Enum): """工作流状态""" + IDLE = "idle" - INITIALIZING = "initializing" + INITIALIZING = "initializing" RUNNING = "running" PAUSED = "paused" STOPPING = "stopping" @@ -37,6 +36,7 @@ class WorkflowStatus(Enum): @dataclass class WorkflowInfo: """工作流信息""" + name: str description: str estimated_duration: float # 预估持续时间(秒) @@ -45,25 +45,82 @@ class WorkflowInfo: parameters_schema: Dict[str, Any] # 参数架构 +class WorkStationContainer(Plate): + """ + WorkStation 专用 Container 类,继承自 Plate和TipRack + 注意这个物料必须通过plr_additional_res_reg.py注册到edge,才能正常序列化 + """ + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + category: str, + ordering: collections.OrderedDict, + model: Optional[str] = None, + ): + """ + 这里的初始化入参要和plr的保持一致 + """ + super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model) + self._unilabos_state = {} # 必须有此行,自己的类描述的是物料的 + + def load_state(self, state: Dict[str, Any]) -> None: + """从给定的状态加载工作台信息。""" + super().load_state(state) + self._unilabos_state = state + + def serialize_state(self) -> Dict[str, Dict[str, Any]]: + data = super().serialize_state() + data.update( + self._unilabos_state + ) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) + return data + + +def get_workstation_plate_resource(name: str) -> PLRResource: # 要给定一个返回plr的方法 + """ + 用于获取一些模板,例如返回一个带有特定信息/子物料的 Plate,这里需要到注册表注册,例如unilabos/registry/resources/organic/workstation.yaml + 可以直接运行该函数或者利用注册表补全机制,来检查是否资源出错 + :param name: 资源名称 + :return: Resource对象 + """ + plate = WorkStationContainer( + name, size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict() + ) + tip_rack = WorkStationContainer( + "tip_rack_inside_plate", + size_x=50, + size_y=50, + size_z=10, + category="tip_rack", + ordering=collections.OrderedDict(), + ) + plate.assign_child_resource(tip_rack, Coordinate.zero()) + return plate + + class ResourceSynchronizer(ABC): """资源同步器基类 - + 负责与外部物料系统的同步,并对 self.deck 做修改 """ - - def __init__(self, workstation: 'WorkstationBase'): + + def __init__(self, workstation: "WorkstationBase"): self.workstation = workstation - + @abstractmethod async def sync_from_external(self) -> bool: """从外部系统同步物料到本地deck""" pass - + @abstractmethod async def sync_to_external(self, plr_resource: PLRResource) -> bool: """将本地物料同步到外部系统""" pass - + @abstractmethod async def handle_external_change(self, change_info: Dict[str, Any]) -> bool: """处理外部系统的变更通知""" @@ -72,81 +129,80 @@ class ResourceSynchronizer(ABC): class WorkstationBase(ABC): """工作站基类 - 简化版 - + 核心功能: 1. 基于 PLR Deck 的物料系统,支持格式转换 2. 可选的资源同步器支持外部物料系统 3. 简化的工作流管理 """ + _ros_node: ROS2WorkstationNode + + @property + def _children(self) -> Dict[str, Any]: # 不要删除这个下划线,不然会自动导入注册表,后面改成装饰器识别 + return self._ros_node.children + + async def update_resource_example(self): + return await self._ros_node.update_resource([get_workstation_plate_resource("test")]) + def __init__( self, - device_id: str, - deck_config: Dict[str, Any], - children: Optional[Dict[str, Any]] = None, - resource_synchronizer: Optional[ResourceSynchronizer] = None, + station_resource: PLRResource, *args, - **kwargs, + **kwargs, # 必须有kwargs ): - if not PYLABROBOT_AVAILABLE: - raise ImportError("PyLabRobot 未安装,无法创建工作站") - # 基本配置 - self.device_id = device_id - self.deck_config = deck_config - self.children = children or {} - + print(station_resource) + self.deck_config = station_resource + # PLR 物料系统 self.deck: Optional[Deck] = None self.plr_resources: Dict[str, PLRResource] = {} - + # 资源同步器(可选) - self.resource_synchronizer = resource_synchronizer - + self.resource_synchronizer = ResourceSynchronizer(self) # 要在driver中自行初始化,只有workstation用 + # 硬件接口 self.hardware_interface: Union[Any, str] = None - - # 协议节点引用(用于代理模式) - self._workstation_node: Optional['ROS2WorkstationNode'] = None - + # 工作流状态 self.current_workflow_status = WorkflowStatus.IDLE self.current_workflow_info = None self.workflow_start_time = None self.workflow_parameters = {} - + # 支持的工作流(静态预定义) self.supported_workflows: Dict[str, WorkflowInfo] = {} - + # 初始化物料系统 self._initialize_material_system() - + # 注册支持的工作流 self._register_supported_workflows() - + logger.info(f"工作站 {device_id} 初始化完成(简化版)") def _initialize_material_system(self): """初始化物料系统 - 使用 graphio 转换""" try: from unilabos.resources.graphio import resource_ulab_to_plr - + # 1. 合并 deck_config 和 children 创建完整的资源树 complete_resource_config = self._create_complete_resource_config() - + # 2. 使用 graphio 转换为 PLR 资源 self.deck = resource_ulab_to_plr(complete_resource_config, plr_model=True) - + # 3. 建立资源映射 self._build_resource_mappings(self.deck) - + # 4. 如果有资源同步器,执行初始同步 if self.resource_synchronizer: # 这里可以异步执行,暂时跳过 pass - + logger.info(f"工作站 {self.device_id} 物料系统初始化成功,创建了 {len(self.plr_resources)} 个资源") - + except Exception as e: logger.error(f"工作站 {self.device_id} 物料系统初始化失败: {e}") raise @@ -163,21 +219,21 @@ class WorkstationBase(ABC): "size_x": self.deck_config.get("size_x", 1000.0), "size_y": self.deck_config.get("size_y", 1000.0), "size_z": self.deck_config.get("size_z", 100.0), - **{k: v for k, v in self.deck_config.items() if k not in ["size_x", "size_y", "size_z"]} + **{k: v for k, v in self.deck_config.items() if k not in ["size_x", "size_y", "size_z"]}, }, "data": {}, "children": [], - "parent": None + "parent": None, } - + # 添加子资源 - if self.children: + if self._children: children_list = [] - for child_id, child_config in self.children.items(): + for child_id, child_config in self._children.items(): child_resource = self._normalize_child_resource(child_id, child_config, deck_resource["id"]) children_list.append(child_resource) deck_resource["children"] = children_list - + return deck_resource def _normalize_child_resource(self, resource_id: str, config: Dict[str, Any], parent_id: str) -> Dict[str, Any]: @@ -190,7 +246,7 @@ class WorkstationBase(ABC): "config": config.get("config", {}), "data": config.get("data", {}), "children": [], # 简化版本:只支持一层子资源 - "parent": parent_id + "parent": parent_id, } def _normalize_position(self, position: Any) -> Dict[str, float]: @@ -199,70 +255,71 @@ class WorkstationBase(ABC): return { "x": float(position.get("x", 0)), "y": float(position.get("y", 0)), - "z": float(position.get("z", 0)) + "z": float(position.get("z", 0)), } elif isinstance(position, (list, tuple)) and len(position) >= 2: return { "x": float(position[0]), "y": float(position[1]), - "z": float(position[2]) if len(position) > 2 else 0.0 + "z": float(position[2]) if len(position) > 2 else 0.0, } else: return {"x": 0.0, "y": 0.0, "z": 0.0} def _build_resource_mappings(self, deck: Deck): """递归构建资源映射""" + def add_resource_recursive(resource: PLRResource): - if hasattr(resource, 'name'): + if hasattr(resource, "name"): self.plr_resources[resource.name] = resource - - if hasattr(resource, 'children'): + + if hasattr(resource, "children"): for child in resource.children: add_resource_recursive(child) - + add_resource_recursive(deck) # ============ 硬件接口管理 ============ - + def set_hardware_interface(self, hardware_interface: Union[Any, str]): """设置硬件接口""" self.hardware_interface = hardware_interface logger.info(f"工作站 {self.device_id} 硬件接口设置: {type(hardware_interface).__name__}") - def set_workstation_node(self, workstation_node: 'ROS2WorkstationNode'): + def set_workstation_node(self, workstation_node: "ROS2WorkstationNode"): """设置协议节点引用(用于代理模式)""" - self._workstation_node = workstation_node + self._ros_node = workstation_node logger.info(f"工作站 {self.device_id} 关联协议节点") # ============ 设备操作接口 ============ - + def call_device_method(self, method: str, *args, **kwargs) -> Any: """调用设备方法的统一接口""" # 1. 代理模式:通过协议节点转发 if isinstance(self.hardware_interface, str) and self.hardware_interface.startswith("proxy:"): - if not self._workstation_node: + if not self._ros_node: raise RuntimeError("代理模式需要设置workstation_node") - + device_id = self.hardware_interface[6:] # 移除 "proxy:" 前缀 - return self._workstation_node.call_device_method(device_id, method, *args, **kwargs) - + return self._ros_node.call_device_method(device_id, method, *args, **kwargs) + # 2. 直接模式:直接调用硬件接口方法 elif self.hardware_interface and hasattr(self.hardware_interface, method): return getattr(self.hardware_interface, method)(*args, **kwargs) - + else: raise AttributeError(f"硬件接口不支持方法: {method}") def get_device_status(self) -> Dict[str, Any]: """获取设备状态""" try: - return self.call_device_method('get_status') + return self.call_device_method("get_status") except AttributeError: # 如果设备不支持get_status方法,返回基础状态 return { "status": "unknown", "interface_type": type(self.hardware_interface).__name__, - "timestamp": time.time() + "timestamp": time.time(), } def is_device_available(self) -> bool: @@ -274,30 +331,29 @@ class WorkstationBase(ABC): return False # ============ 物料系统接口 ============ - + def get_deck(self) -> Deck: """获取主 Deck""" return self.deck - + def get_all_resources(self) -> Dict[str, PLRResource]: """获取所有 PLR 资源""" return self.plr_resources.copy() - + def find_resource_by_name(self, name: str) -> Optional[PLRResource]: """按名称查找资源""" return self.plr_resources.get(name) - + def find_resources_by_type(self, resource_type: type) -> List[PLRResource]: """按类型查找资源""" - return [res for res in self.plr_resources.values() - if isinstance(res, resource_type)] - + return [res for res in self.plr_resources.values() if isinstance(res, resource_type)] + async def sync_with_external_system(self) -> bool: """与外部物料系统同步""" if not self.resource_synchronizer: logger.info(f"工作站 {self.device_id} 没有配置资源同步器") return True - + try: success = await self.resource_synchronizer.sync_from_external() if success: @@ -318,19 +374,19 @@ class WorkstationBase(ABC): self.current_workflow_status = WorkflowStatus.INITIALIZING self.workflow_parameters = parameters self.workflow_start_time = time.time() - + # 委托给子类实现 success = self._execute_workflow_impl(workflow_name, parameters) - + if success: self.current_workflow_status = WorkflowStatus.RUNNING logger.info(f"工作站 {self.device_id} 工作流 {workflow_name} 启动成功") else: self.current_workflow_status = WorkflowStatus.ERROR logger.error(f"工作站 {self.device_id} 工作流 {workflow_name} 启动失败") - + return success - + except Exception as e: self.current_workflow_status = WorkflowStatus.ERROR logger.error(f"工作站 {self.device_id} 执行工作流失败: {e}") @@ -342,28 +398,28 @@ class WorkstationBase(ABC): if self.current_workflow_status in [WorkflowStatus.IDLE, WorkflowStatus.STOPPED]: logger.warning(f"工作站 {self.device_id} 没有正在运行的工作流") return True - + self.current_workflow_status = WorkflowStatus.STOPPING - + # 委托给子类实现 success = self._stop_workflow_impl(emergency) - + if success: self.current_workflow_status = WorkflowStatus.STOPPED logger.info(f"工作站 {self.device_id} 工作流停止成功 (紧急: {emergency})") else: self.current_workflow_status = WorkflowStatus.ERROR logger.error(f"工作站 {self.device_id} 工作流停止失败") - + return success - + except Exception as e: self.current_workflow_status = WorkflowStatus.ERROR logger.error(f"工作站 {self.device_id} 停止工作流失败: {e}") return False # ============ 状态属性 ============ - + @property def workflow_status(self) -> WorkflowStatus: """获取当前工作流状态""" @@ -375,7 +431,7 @@ class WorkstationBase(ABC): return self.current_workflow_status in [ WorkflowStatus.INITIALIZING, WorkflowStatus.RUNNING, - WorkflowStatus.STOPPING + WorkflowStatus.STOPPING, ] @property @@ -386,18 +442,48 @@ class WorkstationBase(ABC): return time.time() - self.workflow_start_time # ============ 抽象方法 - 子类必须实现 ============ - + @abstractmethod def _register_supported_workflows(self): """注册支持的工作流 - 子类必须实现""" pass - + @abstractmethod def _execute_workflow_impl(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: """执行工作流的具体实现 - 子类必须实现""" pass - + @abstractmethod def _stop_workflow_impl(self, emergency: bool = False) -> bool: """停止工作流的具体实现 - 子类必须实现""" pass + +class WorkstationExample(WorkstationBase): + """工作站示例实现""" + + def _register_supported_workflows(self): + """注册支持的工作流""" + self.supported_workflows["example_workflow"] = WorkflowInfo( + name="example_workflow", + description="这是一个示例工作流", + estimated_duration=300.0, + required_materials=["sample_plate"], + output_product="processed_plate", + parameters_schema={"param1": "string", "param2": "integer"}, + ) + + def _execute_workflow_impl(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: + """执行工作流的具体实现""" + if workflow_name not in self.supported_workflows: + logger.error(f"工作站 {self.device_id} 不支持工作流: {workflow_name}") + return False + + # 这里添加实际的工作流逻辑 + logger.info(f"工作站 {self.device_id} 正在执行工作流: {workflow_name} with parameters {parameters}") + return True + + def _stop_workflow_impl(self, emergency: bool = False) -> bool: + """停止工作流的具体实现""" + # 这里添加实际的停止逻辑 + logger.info(f"工作站 {self.device_id} 正在停止工作流 (紧急: {emergency})") + return True \ No newline at end of file diff --git a/unilabos/registry/devices/work_station.yaml b/unilabos/registry/devices/work_station.yaml index 0524252f..5de4d420 100644 --- a/unilabos/registry/devices/work_station.yaml +++ b/unilabos/registry/devices/work_station.yaml @@ -6112,7 +6112,7 @@ workstation: title: initialize_device参数 type: object type: UniLabJsonCommand - module: unilabos.ros.nodes.presets.workstation_node:ROS2WorkstationNode + module: unilabos.ros.nodes.presets.workstation:ROS2WorkstationNode status_types: {} type: ros2 config_info: [] @@ -6218,9 +6218,9 @@ workstation.example: title: create_resource参数 type: object type: UniLabJsonCommand - module: unilabos.ros.nodes.presets.workstation:WorkStationExample + module: unilabos.devices.workstation.workstation_base:WorkstationExample status_types: {} - type: ros2 + type: python config_info: [] description: '' handles: [] diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 3a8d2e46..77f00eca 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -7,13 +7,13 @@ import networkx as nx from unilabos_msgs.msg import Resource from unilabos.resources.container import RegularContainer -from unilabos.ros.msgs.message_converter import convert_from_ros_msg_with_mapping, convert_to_ros_msg +from unilabos.ros.msgs.message_converter import convert_to_ros_msg try: from pylabrobot.resources.resource import Resource as ResourcePLR except ImportError: pass -from typing import Union, get_origin, get_args +from typing import Union, get_origin physical_setup_graph: nx.Graph = None @@ -327,7 +327,7 @@ def nested_dict_to_list(nested_dict: dict) -> list[dict]: # FIXME 是tree? return result def convert_resources_to_type( - resources_list: list[dict], resource_type: type, *, plr_model: bool = False + resources_list: list[dict], resource_type: Union[type, list[type]], *, plr_model: bool = False ) -> Union[list[dict], dict, None, "ResourcePLR"]: """ Convert resources to a given type (PyLabRobot or NestedDict) from flattened list of dictionaries. @@ -358,7 +358,7 @@ def convert_resources_to_type( return None -def convert_resources_from_type(resources_list, resource_type: type) -> Union[list[dict], dict, None, "ResourcePLR"]: +def convert_resources_from_type(resources_list, resource_type: Union[type, list[type]], *, is_plr: bool = False) -> Union[list[dict], dict, None, "ResourcePLR"]: """ Convert resources from a given type (PyLabRobot or NestedDict) to flattened list of dictionaries. @@ -374,11 +374,11 @@ def convert_resources_from_type(resources_list, resource_type: type) -> Union[li elif isinstance(resource_type, type) and issubclass(resource_type, ResourcePLR): resources_tree = [resource_plr_to_ulab(resources_list)] return tree_to_list(resources_tree) - elif isinstance(resource_type, list) : + elif isinstance(resource_type, list): if all((get_origin(t) is Union) for t in resource_type): resources_tree = [resource_plr_to_ulab(r) for r in resources_list] return tree_to_list(resources_tree) - elif all(issubclass(t, ResourcePLR) for t in resource_type): + elif is_plr or all(issubclass(t, ResourcePLR) for t in resource_type): resources_tree = [resource_plr_to_ulab(r) for r in resources_list] return tree_to_list(resources_tree) else: diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index ebe59236..75d953f5 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -5,7 +5,7 @@ import threading import time import traceback import uuid -from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional +from typing import List, get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional from concurrent.futures import ThreadPoolExecutor import asyncio @@ -504,6 +504,17 @@ class BaseROS2DeviceNode(Node, Generic[T]): rclpy.get_global_executor().add_node(self) self.lab_logger().debug(f"ROS节点初始化完成") + async def update_resource(self, resources: List[Any]): + r = ResourceUpdate.Request() + unique_resources = [] + for resource in resources: # resource是list[ResourcePLR] + # 目前更新资源只支持传入plr的对象,后面要更新convert_resources_from_type函数 + converted_list = convert_resources_from_type([resource], resource_type=[object], is_plr=True) + unique_resources.extend([convert_to_ros_msg(Resource, converted) for converted in converted_list]) + r.resources = unique_resources + response = await self._resource_clients["resource_update"].call_async(r) + self.lab_logger().debug(f"资源更新结果: {response}") + def register_device(self): """向注册表中注册设备信息""" topics_info = self._property_publishers.copy() @@ -932,6 +943,7 @@ class ROS2DeviceNode: self._driver_class = driver_class self.device_config = device_config self.driver_is_ros = driver_is_ros + self.driver_is_workstation = False self.resource_tracker = DeviceNodeResourceTracker() # use_pylabrobot_creator 使用 cls的包路径检测 @@ -944,12 +956,6 @@ class ROS2DeviceNode: # TODO: 要在创建之前预先请求服务器是否有当前id的物料,放到resource_tracker中,让pylabrobot进行创建 # 创建设备类实例 - # 判断是否包含设备子节点,决定是否使用ROS2WorkstationNode - has_device_children = any( - child_config.get("type", "device") == "device" - for child_config in children.values() - ) - if use_pylabrobot_creator: # 先对pylabrobot的子资源进行加载,不然subclass无法认出 # 在下方对于加载Deck等Resource要手动import @@ -958,19 +964,12 @@ class ROS2DeviceNode: driver_class, children=children, resource_tracker=self.resource_tracker ) else: - from unilabos.ros.nodes.presets.workstation_node import ROS2WorkstationNode from unilabos.devices.workstation.workstation_base import WorkstationBase - # 检查是否是WorkstationBase的子类且包含设备子节点 - if issubclass(self._driver_class, WorkstationBase) and has_device_children: - # WorkstationBase + 设备子节点 -> 使用WorkstationNode作为ros_instance - self._use_workstation_node_ros = True - self._driver_creator = DeviceClassCreator(driver_class, children=children, resource_tracker=self.resource_tracker) - elif issubclass(self._driver_class, ROS2WorkstationNode): # 是WorkstationNode的子节点,就要调用WorkstationNodeCreator - self._use_workstation_node_ros = False + if issubclass(self._driver_class, WorkstationBase): # 是WorkstationNode的子节点,就要调用WorkstationNodeCreator + self.driver_is_workstation = True self._driver_creator = WorkstationNodeCreator(driver_class, children=children, resource_tracker=self.resource_tracker) else: - self._use_workstation_node_ros = False self._driver_creator = DeviceClassCreator(driver_class, children=children, resource_tracker=self.resource_tracker) if driver_is_ros: @@ -980,38 +979,22 @@ class ROS2DeviceNode: if self._driver_instance is None: logger.critical(f"设备实例创建失败 {driver_class}, params: {driver_params}") raise DeviceInitError("错误: 设备实例创建失败") - + # 创建ROS2节点 if driver_is_ros: self._ros_node = self._driver_instance # type: ignore - elif hasattr(self, '_use_workstation_node_ros') and self._use_workstation_node_ros: - # WorkstationBase + 设备子节点 -> 创建ROS2WorkstationNode作为ros_instance - from unilabos.ros.nodes.presets.workstation_node import ROS2WorkstationNode - - # 从children提取设备协议类型 - protocol_types = set() - for child_id, child_config in children.items(): - if child_config.get("type", "device") == "device": - # 检查设备配置中的协议类型 - if "protocol_type" in child_config: - if isinstance(child_config["protocol_type"], list): - protocol_types.update(child_config["protocol_type"]) - else: - protocol_types.add(child_config["protocol_type"]) - - # 如果没有明确的协议类型,使用默认值 - if not protocol_types: - protocol_types = ["default_protocol"] - + elif self.driver_is_workstation: + from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode self._ros_node = ROS2WorkstationNode( - device_id=device_id, + protocol_type=driver_params["protocol_type"], children=children, - protocol_type=list(protocol_types), + driver_instance=self._driver_instance, # type: ignore + device_id=device_id, + status_types=status_types, + action_value_mappings=action_value_mappings, + hardware_interface=hardware_interface, + print_publish=print_publish, resource_tracker=self.resource_tracker, - workstation_config={ - 'workstation_instance': self._driver_instance, - 'deck_config': getattr(self._driver_instance, 'deck_config', {}), - } ) else: self._ros_node = BaseROS2DeviceNode( diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index 188d65b9..c790940d 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -1,86 +1,618 @@ -import collections -from typing import Union, Dict, Any, Optional +import json +import time +import traceback +from pprint import pformat +from typing import List, Dict, Any, Optional, TYPE_CHECKING -from unilabos_msgs.msg import Resource -from pylabrobot.resources import Resource as PLRResource, Plate, TipRack, Coordinate -from unilabos.ros.nodes.presets.protocol_node import ROS2WorkstationNode -from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker +import rclpy +from rosidl_runtime_py import message_to_ordereddict + +from unilabos.messages import * # type: ignore # protocol names +from rclpy.action import ActionServer, ActionClient +from rclpy.action.server import ServerGoalHandle +from rclpy.callback_groups import ReentrantCallbackGroup +from unilabos_msgs.msg import Resource # type: ignore +from unilabos_msgs.srv import ResourceGet, ResourceUpdate # type: ignore + +from unilabos.compile import action_protocol_generators +from unilabos.resources.graphio import list_to_nested_dict, nested_dict_to_list +from unilabos.ros.initialize_device import initialize_device_from_dict +from unilabos.ros.msgs.message_converter import ( + get_action_type, + convert_to_ros_msg, + convert_from_ros_msg, + convert_from_ros_msg_with_mapping, +) +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode +from unilabos.utils.type_check import serialize_result_info + +if TYPE_CHECKING: + from unilabos.devices.workstation.workstation_base import WorkstationBase -class WorkStationContainer(Plate, TipRack): +class ROS2WorkstationNode(BaseROS2DeviceNode): """ - WorkStation 专用 Container 类,继承自 Plate和TipRack - 注意这个物料必须通过plr_additional_res_reg.py注册到edge,才能正常序列化 + ROS2WorkstationNode代表管理ROS2环境中设备通信和动作的协议节点。 + 它初始化设备节点,处理动作客户端,并基于指定的协议执行工作流。 + 它还物理上代表一组协同工作的设备,如带夹持器的机械臂,带传送带的CNC机器等。 """ - def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str, ordering: collections.OrderedDict, model: Optional[str] = None,): - """ - 这里的初始化入参要和plr的保持一致 - """ - super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model) - self._unilabos_state = {} # 必须有此行,自己的类描述的是物料的 + driver_instance: "WorkstationBase" - def load_state(self, state: Dict[str, Any]) -> None: - """从给定的状态加载工作台信息。""" - super().load_state(state) - self._unilabos_state = state - - def serialize_state(self) -> Dict[str, Dict[str, Any]]: - data = super().serialize_state() - data.update(self._unilabos_state) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) - return data - - -def get_workstation_plate_resource(name: str) -> PLRResource: # 要给定一个返回plr的方法 - """ - 用于获取一些模板,例如返回一个带有特定信息/子物料的 Plate,这里需要到注册表注册,例如unilabos/registry/resources/organic/workstation.yaml - 可以直接运行该函数或者利用注册表补全机制,来检查是否资源出错 - :param name: 资源名称 - :return: Resource对象 - """ - plate = WorkStationContainer(name, size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict()) - tip_rack = WorkStationContainer("tip_rack_inside_plate", size_x=50, size_y=50, size_z=10, category="tip_rack", ordering=collections.OrderedDict()) - plate.assign_child_resource(tip_rack, Coordinate.zero()) - return plate - - -class WorkStationExample(ROS2WorkstationNode): - def __init__(self, - # 你可以在这里增加任意的参数,对应启动json填写相应的参数内容 - device_id: str, - children: dict, - protocol_type: Union[str, list[str]], - resource_tracker: DeviceNodeResourceTracker - ): - super().__init__(device_id, children, protocol_type, resource_tracker) - - def create_resource( + def __init__( self, - resource_tracker: DeviceNodeResourceTracker, - resources: list[Resource], - bind_parent_id: str, - bind_location: dict[str, float], - liquid_input_slot: list[int], - liquid_type: list[str], - liquid_volume: list[int], - slot_on_deck: int, - ) -> Dict[str, Any]: - return { # edge侧返回给前端的创建物料的结果。云端调用初始化瓶子等。执行该函数时,物料已经上报给云端,一般不需要继承使用 + protocol_type: List[str], + children: Dict[str, Any], + *, + driver_instance: "WorkstationBase", + device_id: str, + status_types: Dict[str, Any], + action_value_mappings: Dict[str, Any], + hardware_interface: Dict[str, Any], + print_publish=True, + resource_tracker: Optional["DeviceNodeResourceTracker"] = None, + ): + self._setup_protocol_names(protocol_type) - } + # 初始化非BaseROSNode的属性 + self.children = children + # 初始化基类,让基类处理常规动作 + super().__init__( + driver_instance=driver_instance, + device_id=device_id, + status_types=status_types, + action_value_mappings={ + **action_value_mappings, + **self.protocol_action_mappings + }, + hardware_interface=hardware_interface, + print_publish=print_publish, + resource_tracker=resource_tracker, + ) - def transfer_bottle(self, tip_rack: PLRResource, base_plate: PLRResource): # 使用自定义物料的举例 + self.workstation_config = children + self.communication_interfaces = self.workstation_config.get( + "communication_interfaces", {} + ) # 从工作站配置获取通信接口 + + # 新增:获取工作站实例(如果存在) + self.workstation_instance = driver_instance + + self._busy = False + self.sub_devices = {} + self.communication_devices = {} + self.logical_devices = {} + self._goals = {} + self._protocol_servers = {} + self._action_clients = {} + # 初始化子设备 + self._initialize_child_devices() + + if isinstance(getattr(driver_instance, "hardware_interface", None), str): + self.logical_devices[device_id] = driver_instance + else: + self.communication_devices[device_id] = driver_instance + + # 设置硬件接口代理 + for device_id, device_node in self.logical_devices.items(): + if device_node and hasattr(device_node, "ros_node_instance"): + self._setup_device_hardware_proxy(device_id, device_node) + + # 新增:如果有工作站实例,建立双向引用和硬件接口设置 + if self.workstation_instance: + self._setup_workstation_integration() + + def _setup_workstation_integration(self): + """设置工作站集成 - 统一设备处理模式""" + # 1. 建立协议节点引用 + self.workstation_instance.set_workstation_node(self) + + self.lab_logger().info( + f"ROS2WorkstationNode {self.device_id} 与工作站实例 {type(self.workstation_instance).__name__} 集成完成" + ) + + def _initialize_child_devices(self): + """初始化子设备 - 重构为更清晰的方法""" + # 设备分类字典 - 统一管理 + + for device_id, device_config in self.children.items(): + if device_config.get("type", "device") != "device": + self.lab_logger().debug( + f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping." + ) + continue + + try: + d = self.initialize_device(device_id, device_config) + if d is None: + continue + + # 统一的设备分类逻辑 + device_type = device_config.get("device_type", "logical") + + # 兼容旧的ID匹配方式和新的配置方式 + if device_type == "communication" or "serial_" in device_id or "io_" in device_id: + self.communication_devices[device_id] = d # 新的统一方式 + self.lab_logger().info(f"通信设备 {device_id} 初始化并分类成功") + elif device_type == "logical": + self.logical_devices[device_id] = d + self.lab_logger().info(f"逻辑设备 {device_id} 初始化并分类成功") + else: + # 默认作为逻辑设备处理 + self.logical_devices[device_id] = d + self.lab_logger().info(f"设备 {device_id} 作为逻辑设备处理") + + except Exception as ex: + self.lab_logger().error( + f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}" + ) + + def _setup_device_hardware_proxy(self, device_id: str, device: ROS2DeviceNode): + """统一的设备硬件接口代理设置方法 + + Args: + device_id: 设备ID + device: 设备实例 """ - 将tip_rack assign给base_plate,两个入参都得是PLRResource,unilabos会代替当前物料操作,自动刷新他们的父子关系等状态 - """ - pass + hardware_interface = device.ros_node_instance._hardware_interface + if not self._validate_hardware_interface(device, hardware_interface): + return - def trigger_resource_update(self, from_plate: PLRResource, to_base_plate: PLRResource): - """ - 有些时候物料发生了子设备的迁移,一般对该设备的最大一级的物料进行操作,例如要将A物料搬移到B物料上,他们不共同拥有一个物料 - 该步骤操作结束后,会主动刷新from_plate的父物料,与to_base_plate的父物料(如没有则刷新自身) + # 获取硬件接口名称 + interface_name = getattr(device.driver_instance, hardware_interface["name"]) - """ - to_base_plate.assign_child_resource(from_plate, Coordinate.zero()) - pass + # 情况1: 如果interface_name是字符串,说明需要转发到其他设备 + if isinstance(interface_name, str): + # 查找目标设备 + communication_device = self.communication_devices.get(device_id, None) + if not communication_device: + self.lab_logger().error(f"转发目标设备 {device_id} 不存在") + return + read_method = hardware_interface.get("read", None) + write_method = hardware_interface.get("write", None) + + # 设置传统硬件代理 + communicate_hardware_info = communication_device.ros_node_instance._hardware_interface + self._setup_hardware_proxy(device, communication_device, read_method, write_method) + self.lab_logger().info( + f"传统通信代理:为子设备{device.device_id} " + f"添加了{read_method}方法(来源:{communication_device.device_id} {communicate_hardware_info['read']}) " + f"添加了{write_method}方法(来源:{communication_device.device_id} {communicate_hardware_info['write']})" + ) + self.lab_logger().info(f"字符串转发代理:设备 {device.device_id} -> {device_id}") + + # 情况2: 如果设备有communication_interface配置,设置协议代理 + elif hasattr(self, "communication_interfaces") and device_id in self.communication_interfaces: + interface_config = self._get_communication_interface_config(device_id) + protocol_type = interface_config.get("protocol_type", "modbus") + self._setup_communication_proxy(device, interface_config, protocol_type) + + # 情况3: 其他情况,使用默认处理 + else: + self.lab_logger().debug(f"设备 {device_id} 使用默认硬件接口处理") + + def _get_communication_interface_config(self, device_id: str) -> dict: + """获取设备的通信接口配置""" + # 优先从工作站配置获取 + if hasattr(self, "communication_interfaces") and device_id in self.communication_interfaces: + return self.communication_interfaces[device_id] + + # 从设备自身配置获取 + device_node = self.logical_devices[device_id] + if device_node and hasattr(device_node.driver_instance, "communication_interface"): + return getattr(device_node.driver_instance, "communication_interface") + + return {} + + def _validate_hardware_interface(self, device: ROS2DeviceNode, hardware_interface: dict) -> bool: + """验证硬件接口配置""" + return ( + hasattr(device.driver_instance, hardware_interface["name"]) + and hasattr(device.driver_instance, hardware_interface["write"]) + and (hardware_interface["read"] is None or hasattr(device.driver_instance, hardware_interface["read"])) + ) + + def _setup_communication_proxy(self, logical_device: ROS2DeviceNode, interface_config, protocol_type): + """为逻辑设备设置通信代理 - 统一方法""" + try: + # 获取通信设备 + comm_device_id = interface_config.get("device_id") + comm_device = self.communication_devices.get(comm_device_id) + + if not comm_device: + self.lab_logger().error(f"通信设备 {comm_device_id} 不存在") + return + # FIXME http、modbus(tcpip)都是支持多客户端的 + # 根据协议类型设置不同的代理方法 + if protocol_type == "modbus": + self._setup_modbus_proxy(logical_device, comm_device, interface_config) + elif protocol_type == "opcua": + self._setup_opcua_proxy(logical_device, comm_device, interface_config) + elif protocol_type == "http": + self._setup_http_proxy(logical_device, comm_device, interface_config) + elif protocol_type == "serial": + self._setup_serial_proxy(logical_device, comm_device, interface_config) + else: + self.lab_logger().warning(f"不支持的协议类型: {protocol_type}") + return + + self.lab_logger().info( + f"通信代理:为逻辑设备 {logical_device.device_id} 设置{protocol_type}通信代理 -> {comm_device_id}" + ) + + except Exception as e: + self.lab_logger().error(f"设置通信代理失败: {e}") + + def _setup_protocol_names(self, protocol_type): + # 处理协议类型 + if isinstance(protocol_type, str): + if "," not in protocol_type: + self.protocol_names = [protocol_type] + else: + self.protocol_names = [protocol.strip() for protocol in protocol_type.split(",")] + else: + self.protocol_names = protocol_type + # 准备协议相关的动作值映射 + self.protocol_action_mappings = {} + for protocol_name in self.protocol_names: + protocol_type = globals()[protocol_name] + self.protocol_action_mappings[protocol_name] = get_action_type(protocol_type) + + def initialize_device(self, device_id, device_config): + """初始化设备并创建相应的动作客户端""" + # device_id_abs = f"{self.device_id}/{device_id}" + device_id_abs = f"{device_id}" + self.lab_logger().info(f"初始化子设备: {device_id_abs}") + d = self.sub_devices[device_id] = initialize_device_from_dict(device_id_abs, device_config) + + # 为子设备的每个动作创建动作客户端 + if d is not None and hasattr(d, "ros_node_instance"): + node = d.ros_node_instance + node.resource_tracker = self.resource_tracker # 站内应当共享资源跟踪器 + for action_name, action_mapping in node._action_value_mappings.items(): + if action_name.startswith("auto-") or str(action_mapping.get("type", "")).startswith( + "UniLabJsonCommand" + ): + continue + action_id = f"/devices/{device_id_abs}/{action_name}" + if action_id not in self._action_clients: + try: + self._action_clients[action_id] = ActionClient( + self, action_mapping["type"], action_id, callback_group=self.callback_group + ) + except Exception as ex: + self.lab_logger().error(f"创建动作客户端失败: {action_id}, 错误: {ex}") + continue + self.lab_logger().trace(f"为子设备 {device_id} 创建动作客户端: {action_name}") + return d + + def create_ros_action_server(self, action_name, action_value_mapping): + """创建ROS动作服务器""" + # 和Base创建的路径是一致的 + protocol_name = action_name + action_type = action_value_mapping["type"] + str_action_type = str(action_type)[8:-2] + protocol_type = globals()[protocol_name] + protocol_steps_generator = action_protocol_generators[protocol_type] + + self._action_servers[action_name] = ActionServer( + self, + action_type, + action_name, + execute_callback=self._create_protocol_execute_callback(action_name, protocol_steps_generator), + callback_group=ReentrantCallbackGroup(), + ) + + self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}") + + def _create_protocol_execute_callback(self, protocol_name, protocol_steps_generator): + async def execute_protocol(goal_handle: ServerGoalHandle): + """执行完整的工作流""" + # 初始化结果信息变量 + execution_error = "" + execution_success = False + protocol_return_value = None + self.get_logger().info(f"Executing {protocol_name} action...") + action_value_mapping = self._action_value_mappings[protocol_name] + step_results = [] + try: + print("+" * 30) + print(protocol_steps_generator) + # 从目标消息中提取参数, 并调用Protocol生成器(根据设备连接图)生成action步骤 + goal = goal_handle.request + protocol_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"]) + + # # 🔧 添加调试信息 + # print(f"🔍 转换后的 protocol_kwargs: {protocol_kwargs}") + # print(f"🔍 vessel 在转换后: {protocol_kwargs.get('vessel', 'NOT_FOUND')}") + + # # 🔧 完全禁用Host查询,直接使用转换后的数据 + # print(f"🔧 跳过Host查询,直接使用转换后的数据") + # 向Host查询物料当前状态 + for k, v in goal.get_fields_and_field_types().items(): + if v in ["unilabos_msgs/Resource", "sequence"]: + r = ResourceGet.Request() + resource_id = ( + protocol_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else protocol_kwargs[k][0]["id"] + ) + r.id = resource_id + r.with_children = True + response = await self._resource_clients["resource_get"].call_async(r) + protocol_kwargs[k] = list_to_nested_dict( + [convert_from_ros_msg(rs) for rs in response.resources] + ) + + self.lab_logger().info(f"🔍 最终的 vessel: {protocol_kwargs.get('vessel', 'NOT_FOUND')}") + + from unilabos.resources.graphio import physical_setup_graph + + self.lab_logger().info(f"Working on physical setup: {physical_setup_graph}") + protocol_steps = protocol_steps_generator(G=physical_setup_graph, **protocol_kwargs) + logs = [] + for step in protocol_steps: + if isinstance(step, dict) and "log_message" in step.get("action_kwargs", {}): + logs.append(step) + elif isinstance(step, list): + logs.append(step) + self.lab_logger().info( + f"Goal received: {protocol_kwargs}, running steps: " + f"{json.dumps(logs, indent=4, ensure_ascii=False)}" + ) + + time_start = time.time() + time_overall = 100 + self._busy = True + + # 逐步执行工作流 + for i, action in enumerate(protocol_steps): + # self.get_logger().info(f"Running step {i + 1}: {action}") + if isinstance(action, dict): + # 如果是单个动作,直接执行 + if action["action_name"] == "wait": + time.sleep(action["action_kwargs"]["time"]) + step_results.append({"step": i + 1, "action": "wait", "result": "completed"}) + else: + result = await self.execute_single_action(**action) + step_results.append({"step": i + 1, "action": action["action_name"], "result": result}) + ret_info = json.loads(getattr(result, "return_info", "{}")) + if not ret_info.get("suc", False): + raise RuntimeError(f"Step {i + 1} failed.") + elif isinstance(action, list): + # 如果是并行动作,同时执行 + actions = action + futures = [ + rclpy.get_global_executor().create_task(self.execute_single_action(**a)) for a in actions + ] + results = [await f for f in futures] + step_results.append( + { + "step": i + 1, + "parallel_actions": [a["action_name"] for a in actions], + "results": results, + } + ) + + # 向Host更新物料当前状态 + for k, v in goal.get_fields_and_field_types().items(): + if v in ["unilabos_msgs/Resource", "sequence"]: + r = ResourceUpdate.Request() + r.resources = [ + convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k]) + ] + response = await self._resource_clients["resource_update"].call_async(r) + + # 设置成功状态和返回值 + execution_success = True + protocol_return_value = { + "protocol_name": protocol_name, + "steps_executed": len(protocol_steps), + "step_results": step_results, + "total_time": time.time() - time_start, + } + + goal_handle.succeed() + + except Exception as e: + # 捕获并记录错误信息 + str_step_results = [ + { + k: dict(message_to_ordereddict(v)) if k == "result" and hasattr(v, "SLOT_TYPES") else v + for k, v in i.items() + } + for i in step_results + ] + execution_error = f"{traceback.format_exc()}\n\nStep Result: {pformat(str_step_results)}" + execution_success = False + self.lab_logger().error(f"协议 {protocol_name} 执行出错: {str(e)} \n{traceback.format_exc()}") + + # 设置动作失败 + goal_handle.abort() + + finally: + self._busy = False + + # 创建结果消息 + result = action_value_mapping["type"].Result() + result.success = execution_success + + # 获取结果消息类型信息,检查是否有return_info字段 + result_msg_types = action_value_mapping["type"].Result.get_fields_and_field_types() + + # 设置return_info字段(如果存在) + for attr_name in result_msg_types.keys(): + if attr_name in ["success", "reached_goal"]: + setattr(result, attr_name, execution_success) + elif attr_name == "return_info": + setattr( + result, + attr_name, + serialize_result_info(execution_error, execution_success, protocol_return_value), + ) + + self.lab_logger().info(f"协议 {protocol_name} 完成并返回结果") + return result + + return execute_protocol + + async def execute_single_action(self, device_id, action_name, action_kwargs): + """执行单个动作""" + # 构建动作ID + if device_id in ["", None, "self"]: + action_id = f"/devices/{self.device_id}/{action_name}" + else: + action_id = f"/devices/{device_id}/{action_name}" # 执行时取消了主节点信息 /{self.device_id} + + # 检查动作客户端是否存在 + if action_id not in self._action_clients: + self.lab_logger().error(f"找不到动作客户端: {action_id}") + return None + + # 发送动作请求 + action_client = self._action_clients[action_id] + goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs) + + ##### self.lab_logger().info(f"发送动作请求到: {action_id}") + action_client.wait_for_server() + + # 等待动作完成 + request_future = action_client.send_goal_async(goal_msg) + handle = await request_future + + if not handle.accepted: + self.lab_logger().error(f"动作请求被拒绝: {action_name}") + return None + + result_future = await handle.get_result_async() + ##### self.lab_logger().info(f"动作完成: {action_name}") + + return result_future.result + + """还没有改过的部分""" + + def _setup_modbus_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): + """设置Modbus通信代理""" + config = interface_config.get("config", {}) + + # 设置Modbus读写方法 + def modbus_read(address, count=1, function_code=3): + """Modbus读取方法""" + return comm_device.driver_instance.read_holding_registers( + address=address, count=count, slave_id=config.get("slave_id", 1) + ) + + def modbus_write(address, value, function_code=6): + """Modbus写入方法""" + if isinstance(value, (list, tuple)): + return comm_device.driver_instance.write_multiple_registers( + address=address, values=value, slave_id=config.get("slave_id", 1) + ) + else: + return comm_device.driver_instance.write_single_register( + address=address, value=value, slave_id=config.get("slave_id", 1) + ) + + # 绑定方法到逻辑设备 + setattr(logical_device.driver_instance, "comm_read", modbus_read) + setattr(logical_device.driver_instance, "comm_write", modbus_write) + setattr(logical_device.driver_instance, "comm_config", config) + setattr(logical_device.driver_instance, "comm_protocol", "modbus") + + def _setup_opcua_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): + """设置OPC UA通信代理""" + config = interface_config.get("config", {}) + + def opcua_read(node_id): + """OPC UA读取方法""" + return comm_device.driver_instance.read_node_value(node_id) + + def opcua_write(node_id, value): + """OPC UA写入方法""" + return comm_device.driver_instance.write_node_value(node_id, value) + + # 绑定方法到逻辑设备 + setattr(logical_device.driver_instance, "comm_read", opcua_read) + setattr(logical_device.driver_instance, "comm_write", opcua_write) + setattr(logical_device.driver_instance, "comm_config", config) + setattr(logical_device.driver_instance, "comm_protocol", "opcua") + + def _setup_http_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): + """设置HTTP/RPC通信代理""" + config = interface_config.get("config", {}) + base_url = config.get("base_url", "http://localhost:8080") + + def http_read(endpoint, params=None): + """HTTP GET请求""" + url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" + return comm_device.driver_instance.get_request(url, params=params) + + def http_write(endpoint, data): + """HTTP POST请求""" + url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" + return comm_device.driver_instance.post_request(url, data=data) + + # 绑定方法到逻辑设备 + setattr(logical_device.driver_instance, "comm_read", http_read) + setattr(logical_device.driver_instance, "comm_write", http_write) + setattr(logical_device.driver_instance, "comm_config", config) + setattr(logical_device.driver_instance, "comm_protocol", "http") + + def _setup_serial_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): + """设置串口通信代理""" + config = interface_config.get("config", {}) + + def serial_read(timeout=1.0): + """串口读取方法""" + return comm_device.driver_instance.read_data(timeout=timeout) + + def serial_write(data): + """串口写入方法""" + if isinstance(data, str): + data = data.encode("utf-8") + return comm_device.driver_instance.write_data(data) + + # 绑定方法到逻辑设备 + setattr(logical_device.driver_instance, "comm_read", serial_read) + setattr(logical_device.driver_instance, "comm_write", serial_write) + setattr(logical_device.driver_instance, "comm_config", config) + setattr(logical_device.driver_instance, "comm_protocol", "serial") + + def _setup_hardware_proxy( + self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method + ): + """为设备设置硬件接口代理""" + # extra_info = [getattr(device.driver_instance, info) for info in communication_device.ros_node_instance._hardware_interface.get("extra_info", [])] + write_func = getattr( + communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["write"] + ) + read_func = getattr( + communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["read"] + ) + + def _read(*args, **kwargs): + return read_func(*args, **kwargs) + + def _write(*args, **kwargs): + return write_func(*args, **kwargs) + + if read_method: + # bound_read = MethodType(_read, device.driver_instance) + setattr(device.driver_instance, read_method, _read) + + if write_method: + # bound_write = MethodType(_write, device.driver_instance) + setattr(device.driver_instance, write_method, _write) + + async def _update_resources(self, goal, protocol_kwargs): + """更新资源状态""" + for k, v in goal.get_fields_and_field_types().items(): + if v in ["unilabos_msgs/Resource", "sequence"]: + if protocol_kwargs[k] is not None: + try: + r = ResourceUpdate.Request() + r.resources = [ + convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k]) + ] + await self._resource_clients["resource_update"].call_async(r) + except Exception as e: + self.lab_logger().error(f"更新资源失败: {e}") diff --git a/unilabos/ros/nodes/presets/workstation_node.py b/unilabos/ros/nodes/presets/workstation_node.py deleted file mode 100644 index 49c9e223..00000000 --- a/unilabos/ros/nodes/presets/workstation_node.py +++ /dev/null @@ -1,603 +0,0 @@ -import json -import time -import traceback -from pprint import pprint, saferepr, pformat -from typing import Union, Dict, Any - -import rclpy -from rosidl_runtime_py import message_to_ordereddict - -from unilabos.messages import * # type: ignore # protocol names -from rclpy.action import ActionServer, ActionClient -from rclpy.action.server import ServerGoalHandle -from rclpy.callback_groups import ReentrantCallbackGroup -from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceGet, ResourceUpdate # type: ignore - -from unilabos.compile import action_protocol_generators -from unilabos.resources.graphio import list_to_nested_dict, nested_dict_to_list -from unilabos.ros.initialize_device import initialize_device_from_dict -from unilabos.ros.msgs.message_converter import ( - get_action_type, - convert_to_ros_msg, - convert_from_ros_msg, - convert_from_ros_msg_with_mapping, String, -) -from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode -from unilabos.utils.log import error -from unilabos.utils.type_check import serialize_result_info - - -class ROS2WorkstationNode(BaseROS2DeviceNode): - """ - ROS2WorkstationNode代表管理ROS2环境中设备通信和动作的协议节点。 - 它初始化设备节点,处理动作客户端,并基于指定的协议执行工作流。 - 它还物理上代表一组协同工作的设备,如带夹持器的机械臂,带传送带的CNC机器等。 - """ - - # create_action_server = False # Action Server要自己创建 - - def __init__( - self, - device_id: str, - children: dict, - protocol_type: Union[str, list[str]], - resource_tracker: DeviceNodeResourceTracker, - workstation_config: dict = {}, - workstation_instance: object = None, - *args, - **kwargs, - ): - self._setup_protocol_names(protocol_type) - - # 初始化其它属性 - self.children = children - self.workstation_config = workstation_config or {} # 新增:保存工作站配置 - self.communication_interfaces = self.workstation_config.get('communication_interfaces', {}) # 从工作站配置获取通信接口 - - # 新增:获取工作站实例(如果存在) - self.workstation_instance = workstation_instance - - self._busy = False - self.sub_devices = {} - self.communication_devices = {} - self.logical_devices = {} - self._goals = {} - self._protocol_servers = {} - self._action_clients = {} - - # 初始化基类,让基类处理常规动作 - # 如果有工作站实例,使用工作站实例作为driver_instance - driver_instance = self.workstation_instance if self.workstation_instance else self - - super().__init__( - driver_instance=driver_instance, - device_id=device_id, - status_types={}, - action_value_mappings=self.protocol_action_mappings, - hardware_interface={"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []}, - print_publish=False, - resource_tracker=resource_tracker, - ) - - # 初始化子设备 - self._initialize_child_devices() - - if isinstance(getattr(driver_instance, "hardware_interface", None), str): - self.logical_devices[device_id] = driver_instance - else: - self.communication_devices[device_id] = driver_instance - - # 设置硬件接口代理 - for device_id, device_node in self.logical_devices.items(): - if device_node and hasattr(device_node, 'ros_node_instance'): - self._setup_device_hardware_proxy(device_id, device_node) - - # 新增:如果有工作站实例,建立双向引用和硬件接口设置 - if self.workstation_instance: - self._setup_workstation_integration() - - def _setup_workstation_integration(self): - """设置工作站集成 - 统一设备处理模式""" - # 1. 建立协议节点引用 - self.workstation_instance.set_workstation_node(self) - - self.lab_logger().info(f"ROS2WorkstationNode {self.device_id} 与工作站实例 {type(self.workstation_instance).__name__} 集成完成") - - def _initialize_child_devices(self): - """初始化子设备 - 重构为更清晰的方法""" - # 设备分类字典 - 统一管理 - - for device_id, device_config in self.children.items(): - if device_config.get("type", "device") != "device": - self.lab_logger().debug( - f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping." - ) - continue - - try: - d = self.initialize_device(device_id, device_config) - if d is None: - continue - - # 统一的设备分类逻辑 - device_type = device_config.get("device_type", "logical") - - # 兼容旧的ID匹配方式和新的配置方式 - if device_type == "communication" or "serial_" in device_id or "io_" in device_id: - self.communication_devices[device_id] = d # 新的统一方式 - self.lab_logger().info(f"通信设备 {device_id} 初始化并分类成功") - elif device_type == "logical": - self.logical_devices[device_id] = d - self.lab_logger().info(f"逻辑设备 {device_id} 初始化并分类成功") - else: - # 默认作为逻辑设备处理 - self.logical_devices[device_id] = d - self.lab_logger().info(f"设备 {device_id} 作为逻辑设备处理") - - except Exception as ex: - self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}") - - def _setup_device_hardware_proxy(self, device_id: str, device: ROS2DeviceNode): - """统一的设备硬件接口代理设置方法 - - Args: - device_id: 设备ID - device: 设备实例 - """ - hardware_interface = device.ros_node_instance._hardware_interface - if not self._validate_hardware_interface(device, hardware_interface): - return - - # 获取硬件接口名称 - interface_name = getattr(device.driver_instance, hardware_interface["name"]) - - # 情况1: 如果interface_name是字符串,说明需要转发到其他设备 - if isinstance(interface_name, str): - # 查找目标设备 - communication_device = self.communication_devices.get(device_id, None) - if not communication_device: - self.lab_logger().error(f"转发目标设备 {device_id} 不存在") - return - - read_method = hardware_interface.get("read", None) - write_method = hardware_interface.get("write", None) - - # 设置传统硬件代理 - communicate_hardware_info = communication_device.ros_node_instance._hardware_interface - self._setup_hardware_proxy(device, communication_device, read_method, write_method) - self.lab_logger().info( - f"传统通信代理:为子设备{device.device_id} " - f"添加了{read_method}方法(来源:{communication_device.device_id} {communicate_hardware_info['read']}) " - f"添加了{write_method}方法(来源:{communication_device.device_id} {communicate_hardware_info['write']})" - ) - self.lab_logger().info(f"字符串转发代理:设备 {device.device_id} -> {device_id}") - - # 情况2: 如果设备有communication_interface配置,设置协议代理 - elif hasattr(self, 'communication_interfaces') and device_id in self.communication_interfaces: - interface_config = self._get_communication_interface_config(device_id) - protocol_type = interface_config.get('protocol_type', 'modbus') - self._setup_communication_proxy(device, interface_config, protocol_type) - - # 情况3: 其他情况,使用默认处理 - else: - self.lab_logger().debug(f"设备 {device_id} 使用默认硬件接口处理") - - def _get_communication_interface_config(self, device_id: str) -> dict: - """获取设备的通信接口配置""" - # 优先从工作站配置获取 - if hasattr(self, 'communication_interfaces') and device_id in self.communication_interfaces: - return self.communication_interfaces[device_id] - - # 从设备自身配置获取 - device_node = self.logical_devices[device_id] - if device_node and hasattr(device_node.driver_instance, 'communication_interface'): - return getattr(device_node.driver_instance, 'communication_interface') - - return {} - - def _validate_hardware_interface(self, device: ROS2DeviceNode, hardware_interface: dict) -> bool: - """验证硬件接口配置""" - return ( - hasattr(device.driver_instance, hardware_interface["name"]) - and hasattr(device.driver_instance, hardware_interface["write"]) - and (hardware_interface["read"] is None or hasattr(device.driver_instance, hardware_interface["read"])) - ) - - def _setup_communication_proxy(self, logical_device: ROS2DeviceNode, interface_config, protocol_type): - """为逻辑设备设置通信代理 - 统一方法""" - try: - # 获取通信设备 - comm_device_id = interface_config.get('device_id') - comm_device = self.communication_devices.get(comm_device_id) - - if not comm_device: - self.lab_logger().error(f"通信设备 {comm_device_id} 不存在") - return - - # 根据协议类型设置不同的代理方法 - if protocol_type == 'modbus': - self._setup_modbus_proxy(logical_device, comm_device, interface_config) - elif protocol_type == 'opcua': - self._setup_opcua_proxy(logical_device, comm_device, interface_config) - elif protocol_type == 'http': - self._setup_http_proxy(logical_device, comm_device, interface_config) - elif protocol_type == 'serial': - self._setup_serial_proxy(logical_device, comm_device, interface_config) - else: - self.lab_logger().warning(f"不支持的协议类型: {protocol_type}") - return - - self.lab_logger().info(f"通信代理:为逻辑设备 {logical_device.device_id} 设置{protocol_type}通信代理 -> {comm_device_id}") - - except Exception as e: - self.lab_logger().error(f"设置通信代理失败: {e}") - - def _setup_protocol_names(self, protocol_type): - # 处理协议类型 - if isinstance(protocol_type, str): - if "," not in protocol_type: - self.protocol_names = [protocol_type] - else: - self.protocol_names = [protocol.strip() for protocol in protocol_type.split(",")] - else: - self.protocol_names = protocol_type - # 准备协议相关的动作值映射 - self.protocol_action_mappings = {} - for protocol_name in self.protocol_names: - protocol_type = globals()[protocol_name] - self.protocol_action_mappings[protocol_name] = get_action_type(protocol_type) - - def initialize_device(self, device_id, device_config): - """初始化设备并创建相应的动作客户端""" - # device_id_abs = f"{self.device_id}/{device_id}" - device_id_abs = f"{device_id}" - self.lab_logger().info(f"初始化子设备: {device_id_abs}") - d = self.sub_devices[device_id] = initialize_device_from_dict(device_id_abs, device_config) - - # 为子设备的每个动作创建动作客户端 - if d is not None and hasattr(d, "ros_node_instance"): - node = d.ros_node_instance - node.resource_tracker = self.resource_tracker # 站内应当共享资源跟踪器 - for action_name, action_mapping in node._action_value_mappings.items(): - if action_name.startswith("auto-") or str(action_mapping.get("type", "")).startswith("UniLabJsonCommand"): - continue - action_id = f"/devices/{device_id_abs}/{action_name}" - if action_id not in self._action_clients: - try: - self._action_clients[action_id] = ActionClient( - self, action_mapping["type"], action_id, callback_group=self.callback_group - ) - except Exception as ex: - self.lab_logger().error(f"创建动作客户端失败: {action_id}, 错误: {ex}") - continue - self.lab_logger().trace(f"为子设备 {device_id} 创建动作客户端: {action_name}") - return d - - def create_ros_action_server(self, action_name, action_value_mapping): - """创建ROS动作服务器""" - # 和Base创建的路径是一致的 - protocol_name = action_name - action_type = action_value_mapping["type"] - str_action_type = str(action_type)[8:-2] - protocol_type = globals()[protocol_name] - protocol_steps_generator = action_protocol_generators[protocol_type] - - self._action_servers[action_name] = ActionServer( - self, - action_type, - action_name, - execute_callback=self._create_protocol_execute_callback(action_name, protocol_steps_generator), - callback_group=ReentrantCallbackGroup(), - ) - - self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}") - - def _create_protocol_execute_callback(self, protocol_name, protocol_steps_generator): - async def execute_protocol(goal_handle: ServerGoalHandle): - """执行完整的工作流""" - # 初始化结果信息变量 - execution_error = "" - execution_success = False - protocol_return_value = None - self.get_logger().info(f"Executing {protocol_name} action...") - action_value_mapping = self._action_value_mappings[protocol_name] - step_results = [] - try: - print("+" * 30) - print(protocol_steps_generator) - # 从目标消息中提取参数, 并调用Protocol生成器(根据设备连接图)生成action步骤 - goal = goal_handle.request - protocol_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"]) - - # # 🔧 添加调试信息 - # print(f"🔍 转换后的 protocol_kwargs: {protocol_kwargs}") - # print(f"🔍 vessel 在转换后: {protocol_kwargs.get('vessel', 'NOT_FOUND')}") - - # # 🔧 完全禁用Host查询,直接使用转换后的数据 - # print(f"🔧 跳过Host查询,直接使用转换后的数据") - # 向Host查询物料当前状态 - for k, v in goal.get_fields_and_field_types().items(): - if v in ["unilabos_msgs/Resource", "sequence"]: - r = ResourceGet.Request() - resource_id = ( - protocol_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else protocol_kwargs[k][0]["id"] - ) - r.id = resource_id - r.with_children = True - response = await self._resource_clients["resource_get"].call_async(r) - protocol_kwargs[k] = list_to_nested_dict( - [convert_from_ros_msg(rs) for rs in response.resources] - ) - - self.lab_logger().info(f"🔍 最终的 vessel: {protocol_kwargs.get('vessel', 'NOT_FOUND')}") - - from unilabos.resources.graphio import physical_setup_graph - - self.lab_logger().info(f"Working on physical setup: {physical_setup_graph}") - protocol_steps = protocol_steps_generator(G=physical_setup_graph, **protocol_kwargs) - logs = [] - for step in protocol_steps: - if isinstance(step, dict) and "log_message" in step.get("action_kwargs", {}): - logs.append(step) - elif isinstance(step, list): - logs.append(step) - self.lab_logger().info(f"Goal received: {protocol_kwargs}, running steps: " - f"{json.dumps(logs, indent=4, ensure_ascii=False)}") - - time_start = time.time() - time_overall = 100 - self._busy = True - - # 逐步执行工作流 - for i, action in enumerate(protocol_steps): - # self.get_logger().info(f"Running step {i + 1}: {action}") - if isinstance(action, dict): - # 如果是单个动作,直接执行 - if action["action_name"] == "wait": - time.sleep(action["action_kwargs"]["time"]) - step_results.append({"step": i + 1, "action": "wait", "result": "completed"}) - else: - result = await self.execute_single_action(**action) - step_results.append({"step": i + 1, "action": action["action_name"], "result": result}) - ret_info = json.loads(getattr(result, "return_info", "{}")) - if not ret_info.get("suc", False): - raise RuntimeError(f"Step {i + 1} failed.") - elif isinstance(action, list): - # 如果是并行动作,同时执行 - actions = action - futures = [ - rclpy.get_global_executor().create_task(self.execute_single_action(**a)) for a in actions - ] - results = [await f for f in futures] - step_results.append( - { - "step": i + 1, - "parallel_actions": [a["action_name"] for a in actions], - "results": results, - } - ) - - # 向Host更新物料当前状态 - for k, v in goal.get_fields_and_field_types().items(): - if v in ["unilabos_msgs/Resource", "sequence"]: - r = ResourceUpdate.Request() - r.resources = [ - convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k]) - ] - response = await self._resource_clients["resource_update"].call_async(r) - - # 设置成功状态和返回值 - execution_success = True - protocol_return_value = { - "protocol_name": protocol_name, - "steps_executed": len(protocol_steps), - "step_results": step_results, - "total_time": time.time() - time_start, - } - - goal_handle.succeed() - - except Exception as e: - # 捕获并记录错误信息 - str_step_results = [{k: dict(message_to_ordereddict(v)) if k == "result" and hasattr(v, "SLOT_TYPES") else v for k, v in i.items()} for i in step_results] - execution_error = f"{traceback.format_exc()}\n\nStep Result: {pformat(str_step_results)}" - execution_success = False - self.lab_logger().error(f"协议 {protocol_name} 执行出错: {str(e)} \n{traceback.format_exc()}") - - # 设置动作失败 - goal_handle.abort() - - finally: - self._busy = False - - # 创建结果消息 - result = action_value_mapping["type"].Result() - result.success = execution_success - - # 获取结果消息类型信息,检查是否有return_info字段 - result_msg_types = action_value_mapping["type"].Result.get_fields_and_field_types() - - # 设置return_info字段(如果存在) - for attr_name in result_msg_types.keys(): - if attr_name in ["success", "reached_goal"]: - setattr(result, attr_name, execution_success) - elif attr_name == "return_info": - setattr( - result, - attr_name, - serialize_result_info(execution_error, execution_success, protocol_return_value), - ) - - self.lab_logger().info(f"协议 {protocol_name} 完成并返回结果") - return result - - return execute_protocol - - async def execute_single_action(self, device_id, action_name, action_kwargs): - """执行单个动作""" - # 构建动作ID - if device_id in ["", None, "self"]: - action_id = f"/devices/{self.device_id}/{action_name}" - else: - action_id = f"/devices/{device_id}/{action_name}" # 执行时取消了主节点信息 /{self.device_id} - - # 检查动作客户端是否存在 - if action_id not in self._action_clients: - self.lab_logger().error(f"找不到动作客户端: {action_id}") - return None - - # 发送动作请求 - action_client = self._action_clients[action_id] - goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs) - - ##### self.lab_logger().info(f"发送动作请求到: {action_id}") - action_client.wait_for_server() - - # 等待动作完成 - request_future = action_client.send_goal_async(goal_msg) - handle = await request_future - - if not handle.accepted: - self.lab_logger().error(f"动作请求被拒绝: {action_name}") - return None - - result_future = await handle.get_result_async() - ##### self.lab_logger().info(f"动作完成: {action_name}") - - return result_future.result - - """还没有改过的部分""" - - def _setup_modbus_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): - """设置Modbus通信代理""" - config = interface_config.get('config', {}) - - # 设置Modbus读写方法 - def modbus_read(address, count=1, function_code=3): - """Modbus读取方法""" - return comm_device.driver_instance.read_holding_registers( - address=address, - count=count, - slave_id=config.get('slave_id', 1) - ) - - def modbus_write(address, value, function_code=6): - """Modbus写入方法""" - if isinstance(value, (list, tuple)): - return comm_device.driver_instance.write_multiple_registers( - address=address, - values=value, - slave_id=config.get('slave_id', 1) - ) - else: - return comm_device.driver_instance.write_single_register( - address=address, - value=value, - slave_id=config.get('slave_id', 1) - ) - - # 绑定方法到逻辑设备 - setattr(logical_device.driver_instance, 'comm_read', modbus_read) - setattr(logical_device.driver_instance, 'comm_write', modbus_write) - setattr(logical_device.driver_instance, 'comm_config', config) - setattr(logical_device.driver_instance, 'comm_protocol', 'modbus') - - def _setup_opcua_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): - """设置OPC UA通信代理""" - config = interface_config.get('config', {}) - - def opcua_read(node_id): - """OPC UA读取方法""" - return comm_device.driver_instance.read_node_value(node_id) - - def opcua_write(node_id, value): - """OPC UA写入方法""" - return comm_device.driver_instance.write_node_value(node_id, value) - - # 绑定方法到逻辑设备 - setattr(logical_device.driver_instance, 'comm_read', opcua_read) - setattr(logical_device.driver_instance, 'comm_write', opcua_write) - setattr(logical_device.driver_instance, 'comm_config', config) - setattr(logical_device.driver_instance, 'comm_protocol', 'opcua') - - def _setup_http_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): - """设置HTTP/RPC通信代理""" - config = interface_config.get('config', {}) - base_url = config.get('base_url', 'http://localhost:8080') - - def http_read(endpoint, params=None): - """HTTP GET请求""" - url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" - return comm_device.driver_instance.get_request(url, params=params) - - def http_write(endpoint, data): - """HTTP POST请求""" - url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" - return comm_device.driver_instance.post_request(url, data=data) - - # 绑定方法到逻辑设备 - setattr(logical_device.driver_instance, 'comm_read', http_read) - setattr(logical_device.driver_instance, 'comm_write', http_write) - setattr(logical_device.driver_instance, 'comm_config', config) - setattr(logical_device.driver_instance, 'comm_protocol', 'http') - - def _setup_serial_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): - """设置串口通信代理""" - config = interface_config.get('config', {}) - - def serial_read(timeout=1.0): - """串口读取方法""" - return comm_device.driver_instance.read_data(timeout=timeout) - - def serial_write(data): - """串口写入方法""" - if isinstance(data, str): - data = data.encode('utf-8') - return comm_device.driver_instance.write_data(data) - - # 绑定方法到逻辑设备 - setattr(logical_device.driver_instance, 'comm_read', serial_read) - setattr(logical_device.driver_instance, 'comm_write', serial_write) - setattr(logical_device.driver_instance, 'comm_config', config) - setattr(logical_device.driver_instance, 'comm_protocol', 'serial') - - def _setup_hardware_proxy( - self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method - ): - """为设备设置硬件接口代理""" - # extra_info = [getattr(device.driver_instance, info) for info in communication_device.ros_node_instance._hardware_interface.get("extra_info", [])] - write_func = getattr( - communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["write"] - ) - read_func = getattr( - communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["read"] - ) - - def _read(*args, **kwargs): - return read_func(*args, **kwargs) - - def _write(*args, **kwargs): - return write_func(*args, **kwargs) - - if read_method: - # bound_read = MethodType(_read, device.driver_instance) - setattr(device.driver_instance, read_method, _read) - - if write_method: - # bound_write = MethodType(_write, device.driver_instance) - setattr(device.driver_instance, write_method, _write) - - async def _update_resources(self, goal, protocol_kwargs): - """更新资源状态""" - for k, v in goal.get_fields_and_field_types().items(): - if v in ["unilabos_msgs/Resource", "sequence"]: - if protocol_kwargs[k] is not None: - try: - r = ResourceUpdate.Request() - r.resources = [ - convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k]) - ] - await self._resource_clients["resource_update"].call_async(r) - except Exception as e: - self.lab_logger().error(f"更新资源失败: {e}") diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index a96c4459..06fc1c27 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -1,7 +1,12 @@ -from typing import List, Tuple, Any +from typing import List, Tuple, Any, Dict, TYPE_CHECKING +from abc import ABC, abstractmethod from unilabos.utils.log import logger +if TYPE_CHECKING: + from unilabos.devices.workstation.workstation_base import WorkstationBase + from pylabrobot.resources import Resource as PLRResource + class DeviceNodeResourceTracker(object): @@ -37,10 +42,20 @@ class DeviceNodeResourceTracker(object): def figure_resource(self, query_resource, try_mode=False): if isinstance(query_resource, list): return [self.figure_resource(r, try_mode) for r in query_resource] - elif isinstance(query_resource, dict) and "id" not in query_resource and "name" not in query_resource: # 临时处理,要删除的,driver有太多类型错误标注 + elif ( + isinstance(query_resource, dict) and "id" not in query_resource and "name" not in query_resource + ): # 临时处理,要删除的,driver有太多类型错误标注 return [self.figure_resource(r, try_mode) for r in query_resource.values()] - res_id = query_resource.id if hasattr(query_resource, "id") else (query_resource.get("id") if isinstance(query_resource, dict) else None) - res_name = query_resource.name if hasattr(query_resource, "name") else (query_resource.get("name") if isinstance(query_resource, dict) else None) + res_id = ( + query_resource.id + if hasattr(query_resource, "id") + else (query_resource.get("id") if isinstance(query_resource, dict) else None) + ) + res_name = ( + query_resource.name + if hasattr(query_resource, "name") + else (query_resource.get("name") if isinstance(query_resource, dict) else None) + ) res_identifier = res_id if res_id else res_name identifier_key = "id" if res_id else "name" resource_cls_type = type(query_resource) @@ -54,7 +69,9 @@ class DeviceNodeResourceTracker(object): ) else: res_list.extend( - self.loop_find_resource(r, resource_cls_type, identifier_key, getattr(query_resource, identifier_key)) + self.loop_find_resource( + r, resource_cls_type, identifier_key, getattr(query_resource, identifier_key) + ) ) if not try_mode: assert len(res_list) > 0, f"没有找到资源 {query_resource},请检查资源是否存在" @@ -66,12 +83,16 @@ class DeviceNodeResourceTracker(object): self.resource2parent_resource[id(res_list[0][1])] = res_list[0][0] return res_list[0][1] - def loop_find_resource(self, resource, target_resource_cls_type, identifier_key, compare_value, parent_res=None) -> List[Tuple[Any, Any]]: + def loop_find_resource( + self, resource, target_resource_cls_type, identifier_key, compare_value, parent_res=None + ) -> List[Tuple[Any, Any]]: res_list = [] # print(resource, target_resource_cls_type, identifier_key, compare_value) children = getattr(resource, "children", []) for child in children: - res_list.extend(self.loop_find_resource(child, target_resource_cls_type, identifier_key, compare_value, resource)) + res_list.extend( + self.loop_find_resource(child, target_resource_cls_type, identifier_key, compare_value, resource) + ) if target_resource_cls_type == type(resource): if target_resource_cls_type == dict: if identifier_key in resource: diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index 5e16989f..3c621408 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -6,7 +6,6 @@ """ import asyncio import inspect -import json import traceback from abc import abstractmethod from typing import Type, Any, Dict, Optional, TypeVar, Generic @@ -297,6 +296,11 @@ class WorkstationNodeCreator(DeviceClassCreator[T]): try: # 创建实例,额外补充一个给protocol node的字段,后面考虑取消 data["children"] = self.children + station_resource_dict = data["station_resource"] + from pylabrobot.resources import Deck, Resource + plrc = PyLabRobotCreator(Deck, self.children, self.resource_tracker) + station_resource = plrc.create_instance(station_resource_dict) + data["station_resource"] = station_resource self.device_instance = super(WorkstationNodeCreator, self).create_instance(data) self.post_create() return self.device_instance