mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-06 15:05:13 +00:00
* Create app/main API * create example device * create ROS backend and example device SDK Wrapper * Add ROS host and host starting from app.py * Add gripper device and mock implementation * add "status_types" & "action_types" to ROS device decorator * add ActionServer debug example * [bugfix] complete mock gripper example * ROS Backend Host for Device action calling and Resource management * add conda/mamba ENV file * add host_node communication with app/main.py * add action message value mappings and converters * Update ilabos.yaml * Update issue templates * example devices.json and resources.json * Fix Device wrapper to use async property and actions (#7) * Fix Device wrapper to use async property and actions * Resolve #1 : support async get methods and actions. Give new examples. * add both sync/async GRBL controller SDK * 2 call device actions from appmainpy api to ros hostpy (#8) * feature: add job * fix:node start * feature:add get job status * fix:get device * clean * Resolve #5 device connection diagram and workflow compilation support (#9) * add syringe pump device and its compilation using device connection diagram * add RunzeSyringePump real device with ROS2 example * Prototype machine with 1 pump and 1 CNC * add ROS2ProtocolNode and related functions * add ilabos_msgs (to use PumpTransfer action) * add example device connection graph * refactor protocol_node code into separate file * add ROS2SerialNode * add SerialCommand srv in ilabos_msgs * add pump_protocol example, and fix bugs * [fix] serial service: avoid async service deadlock by directly call serial `send_command` * use SendCmd instead of SingleJointPosition for valve control * initialize device connection graph when server starts * Fix #5: async workflow execution (#16) * add rclpyx and protocol example for async-native workflow * use async in ROS2ProtocolNode, and host initialization * add examples of "ros-async" protocol implementation, and `run_in_event_loop` for using native async functions * use "ros-async" in protocols and device nodes * fix pump_protocol: push to 0.0 μL * Envs, docs, and conda recipes (#19) * update ENV: use python 3.11 and deprecate ros-humble-gazebo-ros * add ilabos-msgs conda recipe * Update ilabos.yaml * fix recipe and env yaml * Add sphinx docs * add aichemeco * add bioyong * add bioyong * Support XDL devices & protocols (#20) * [Feature] support multiple protocols in a single ProtocolNode * add Junjie's code * Support "Clean" protocol * Update Grignard_flow_batchreact_single_pumpvalve.json * test_grignard_add * add stir device node and example * Update device_node.py add print_publish flag to control the node's log output * NH4Cl_add * add "HeaterStirrer" device and "HeatChill" action * add wait time after each pump action for equilibration * fix stir * add Separate protocol * Refactor Separator device and Stir action * add rotavap_node * fix stir * add chiller node * Move rinsings into PumpTransfer * Fix SeparateProtocol under refactored Separator device and Stir action * Supports automatically add new protocol action_types * fix PumpTransfer protocol because of rinsing * Add Rotavap and Chiller devices * fix SeparateProtocol * add EvaporateProtocol * add rotavap devices config * fix HeaterStirrer and SeparatorController IO * Fix automatically add new protocol action_types * Add HeaterStirrer and SeparatorController device config * fix pump protocols * Fix Evaporate action * Update evaporate_protocol.py * add temp_sensor node and add function remap * update docs --------- Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> Co-authored-by: q434343 <554662886@qq.com> * fix aichemeco * [Bugfix] fix Windows conda packaging * add file upload api * update dependencies: force to use 3.11 and remove conflict in WIN64 and OSX64 * update dependencies: force to use 3.11 and remove conflict in WIN64 and OSX64 * Create aichemeco_simple.py * fix * update * add aichemeco file * MQTT [1/2]: action start (#25) * add mq * fix * clean * add class * fix excel * update bioyong * add api * fix --------- Co-authored-by: caok@dp.tech <xiaoyeqiannian@163.com> * motor & grasp * Add Grasp motor support and enhance EleGripper class - Introduced a new motor configuration for Grasp in sjtu.json. - Updated EleGripper class to inherit from UniversalDriver and added status property. - Implemented move_and_rotate method for coordinated movement. - Adjusted threading logic in EleGripper initialization. - Registered Grasp motor in ROS2 device node configuration. This commit enhances the motor control capabilities by integrating the Grasp motor and improving the existing EleGripper functionality. * fix read data lenth * Update Grasp.py * MQTT (2/2): publish Device Status, Action Feedback & Results (#27) * Add bridges in HostNode for device_status publishing * Add "bridges" selection (fastapi & mqtt) when app start * add MQ feedback & result publisher, and fix message converter * fix UUID converting between ROS and MQ * lint api model.py * Continuous controllers: PID, MPC, custom controllers etc. (#23) * add controller config & wrapper * add controller setup at app.main * control loop example * fix com port * add agv , ur_arm and raman * MQTT (3/4): Unified Resources and Sync when starting the server (#28) * update http upload api * generate uuid when init device * example resource json * fix * add new example full-content json (device, resource, graph) * fix full-content json and related reading code * fix * add json_schema when initialize resources * fix * update schema * refactor heaterstirrer.dalong * fix * fix refactor heaterstirrer.dalong * refactor syringepump.runze: use ml instead of μL * Update ilabos/ros/host.py Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --------- Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * Distributed initialization with self-organizing network (#29) * add distributed launching option "--without_host" * fix --------- Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> * Refactor Workstation: Add resource service and tracking (#30) * move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices * add Resource srv and message_converter * move graphio to ilabos/resources * refactor resources type conversion * add resource clients in device_node * add mock resources service * pass Gripper1 resource test * update http resource services * add AGV compile function * add AGV transfer protocol * update full mock_gripper edit_id example * update full mock_gripper edit_id example * get and update resource also in protocol_node * mock resource update in AichemecoHiwo * Create HT_hiwo.json * add children in resources * bugfixes * fix rpc * add Revvity winprep --------- Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com> Co-authored-by: 3218923350 <105201755+3218923350@users.noreply.github.com> * Distributed launch (2/2): distributed resource create (#32) * add resource_add request to host for slave mode * add AGV * fix protocol resources * optimize host callbacks * bugfixes * add revvity registry --------- Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com> * Refactor Driver Files Structure (#33) * Integration with pywinauto & recorder Added execute run and initialize procdure * 酶标仪状态检测、使用示例,整体流程待测试 * nivo ready version * Add HPLC driver and example script - Introduced HPLCDriver class for managing HPLC device status and operations. - Implemented device status monitoring and command execution via ROS2 actions. - Added example script (hplc.py) demonstrating how to run commands on the HPLC device. - Created PlayerUtil and UniversalDriver classes for shared functionality across devices. - Refactored NivoDriver to utilize the new UniversalDriver structure. - Enhanced error handling and process management in the NivoDriver. * 修复start的错误定位 * hplc tested ok * relative path to build msgs * template_driver & jiageng devices * fetch correct status type and action type * fix mtype fetch * gpc bus integration * ilab build * remove chs * recipe rename * modbus update 1 * json available * hplc & modbus rewrite * Update AgilentHPLC.py hplc datafile reader * move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices * modbus分设备 * gpc * gpc 2 * fix address * default register node * fix MainScreenGPC * add Resource srv and message_converter * move graphio to ilabos/resources * refactor resources type conversion * add resource clients in device_node * add mock resources service * pass Gripper1 resource test * update http resource services * add AGV compile function * add AGV transfer protocol * update recipe.yaml * update full mock_gripper edit_id example * update full mock_gripper edit_id example * get and update resource also in protocol_node * mock resource update in AichemecoHiwo * feat: add other jiageng PLC device code * ilabos compile * correct format * correct recipe format * correct setup.py format * remove unnecessary files * remove unnecessary files * Create HT_hiwo.json * add children in resources * hplc support sample_id * correct hplc sample_id * correct hplc sample_id * hplc upload * fix type hint * oss upload tested ver * recipe yaml fix for linux * update installation yaml * refactor: moved all driver files according to its feat * merge main to dev --------- Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn> Co-authored-by: Junhan Chang <changjh@pku.edu.cn> Co-authored-by: jiawei <miaojiawei@dp.tech> * add: NMR LH and RU device control (#34) * Add Registry for device drivers and Support GraphML (#35) * read chemputer graphml * read graphml in app/main * add devices in ros/devices * add schema for devices * read registry directory and initialize when entry from main * Delete devices.py * Update add_protocol.md * delete unecessary files * feat: 2278 devices registry yaml (#36) * read chemputer graphml * read graphml in app/main * add devices in ros/devices * add schema for devices * read registry directory and initialize when entry from main * Delete devices.py * add: NMR LH and RU device control * fix: modify jiageng devices registry --------- Co-authored-by: Junhan Chang <changjh@pku.edu.cn> * Device/Resource Registry and GraphML support (#37) * add resource type conversion to PLR * add resource registry and test * add docs * fix registry * add solenoid_valve_mock, its registry and test * fix registry for directly using examples * add EvacuateAndRefillProtocol and testcases * allow function sequence call in ACTION * add read & write & extra_info for hardware_interface * Update device_node.py * add solenoid valve * add doc developer guide yaml * fixes for starting IK station * add graphml grouping parser * fix graphml grouping parser * add communication edge parser * fix io solenoid valve * Update .gitignore * Update plates.yaml --------- Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn> * Uni-Lab Doc v0.2 (#39) * add Uni-Lab docs * change doc name * Dev (#41) * Integration with pywinauto & recorder Added execute run and initialize procdure * 酶标仪状态检测、使用示例,整体流程待测试 * nivo ready version * Add HPLC driver and example script - Introduced HPLCDriver class for managing HPLC device status and operations. - Implemented device status monitoring and command execution via ROS2 actions. - Added example script (hplc.py) demonstrating how to run commands on the HPLC device. - Created PlayerUtil and UniversalDriver classes for shared functionality across devices. - Refactored NivoDriver to utilize the new UniversalDriver structure. - Enhanced error handling and process management in the NivoDriver. * 修复start的错误定位 * hplc tested ok * relative path to build msgs * template_driver & jiageng devices * fetch correct status type and action type * fix mtype fetch * gpc bus integration * ilab build * remove chs * recipe rename * modbus update 1 * json available * hplc & modbus rewrite * Update AgilentHPLC.py hplc datafile reader * move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices * modbus分设备 * gpc * gpc 2 * fix address * default register node * fix MainScreenGPC * add Resource srv and message_converter * move graphio to ilabos/resources * refactor resources type conversion * add resource clients in device_node * add mock resources service * pass Gripper1 resource test * update http resource services * add AGV compile function * add AGV transfer protocol * update recipe.yaml * update full mock_gripper edit_id example * update full mock_gripper edit_id example * get and update resource also in protocol_node * mock resource update in AichemecoHiwo * feat: add other jiageng PLC device code * ilabos compile * correct format * correct recipe format * correct setup.py format * remove unnecessary files * remove unnecessary files * Create HT_hiwo.json * add children in resources * hplc support sample_id * correct hplc sample_id * correct hplc sample_id * hplc upload * fix type hint * oss upload tested ver * recipe yaml fix for linux * update installation yaml * refactor: moved all driver files according to its feat * merge main to dev * add HPLC registry and json * 升级 ros2-distro-mutex 依赖版本至 0.6 * 修改 ros2-distro-mutex 依赖版本为通配符匹配 * 更新 ros-humble-ilabos-msgs 依赖为 robostack-humble 命名空间 * add resource type conversion to PLR * add resource registry and test * feat: 更新oss上传 * fix device id * add docs * fix registry * add solenoid_valve_mock, its registry and test * fix registry for directly using examples * add EvacuateAndRefillProtocol and testcases * allow function sequence call in ACTION * add read & write & extra_info for hardware_interface * Update device_node.py * add solenoid valve * add doc developer guide yaml * use robostack-staging * rclpy version test * lower rclpy * ensure 0.6* env * fixes for starting IK station * add graphml grouping parser * fix graphml grouping parser * add communication edge parser * fix io solenoid valve * Update .gitignore * Update plates.yaml * Feature/device node later init (#40) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable --------- Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn> Co-authored-by: Junhan Chang <changjh@pku.edu.cn> Co-authored-by: jiawei <miaojiawei@dp.tech> Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn> * Dev (#45) * Integration with pywinauto & recorder Added execute run and initialize procdure * 酶标仪状态检测、使用示例,整体流程待测试 * nivo ready version * Add HPLC driver and example script - Introduced HPLCDriver class for managing HPLC device status and operations. - Implemented device status monitoring and command execution via ROS2 actions. - Added example script (hplc.py) demonstrating how to run commands on the HPLC device. - Created PlayerUtil and UniversalDriver classes for shared functionality across devices. - Refactored NivoDriver to utilize the new UniversalDriver structure. - Enhanced error handling and process management in the NivoDriver. * 修复start的错误定位 * hplc tested ok * relative path to build msgs * template_driver & jiageng devices * fetch correct status type and action type * fix mtype fetch * gpc bus integration * ilab build * remove chs * recipe rename * modbus update 1 * json available * hplc & modbus rewrite * Update AgilentHPLC.py hplc datafile reader * move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices * modbus分设备 * gpc * gpc 2 * fix address * default register node * fix MainScreenGPC * add Resource srv and message_converter * move graphio to ilabos/resources * refactor resources type conversion * add resource clients in device_node * add mock resources service * pass Gripper1 resource test * update http resource services * add AGV compile function * add AGV transfer protocol * update recipe.yaml * update full mock_gripper edit_id example * update full mock_gripper edit_id example * get and update resource also in protocol_node * mock resource update in AichemecoHiwo * feat: add other jiageng PLC device code * ilabos compile * correct format * correct recipe format * correct setup.py format * remove unnecessary files * remove unnecessary files * Create HT_hiwo.json * add children in resources * hplc support sample_id * correct hplc sample_id * correct hplc sample_id * hplc upload * fix type hint * oss upload tested ver * recipe yaml fix for linux * update installation yaml * refactor: moved all driver files according to its feat * merge main to dev * add HPLC registry and json * 升级 ros2-distro-mutex 依赖版本至 0.6 * 修改 ros2-distro-mutex 依赖版本为通配符匹配 * 更新 ros-humble-ilabos-msgs 依赖为 robostack-humble 命名空间 * add resource type conversion to PLR * add resource registry and test * feat: 更新oss上传 * fix device id * add docs * fix registry * add solenoid_valve_mock, its registry and test * fix registry for directly using examples * add EvacuateAndRefillProtocol and testcases * allow function sequence call in ACTION * add read & write & extra_info for hardware_interface * Update device_node.py * add solenoid valve * add doc developer guide yaml * use robostack-staging * rclpy version test * lower rclpy * ensure 0.6* env * fixes for starting IK station * add graphml grouping parser * fix graphml grouping parser * add communication edge parser * fix io solenoid valve * Update .gitignore * Update plates.yaml * Feature/device node later init (#40) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable * Feature/device node later init (#42) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable * action * plr suc * plr suc!! * plr suc!! * plr suc!! * plr msgs * Feature/device node later init (#43) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable * action * plr suc * plr suc!! * plr suc!! * plr suc!! * plr msgs * plr * action * plr reg fix * Feature/device node later init (#44) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable * action * plr suc * plr suc!! * plr suc!! * plr suc!! * plr msgs * plr * fix convert error fix async logic error added async error print * new device test * test resource add * test resource add * test resource add * test resource add * local env setup * node type fix temp fix root_node error fix convert res from type error * resource tracker * fix bug from qhh * fix bug from qhh * fix bug from qhh * fix bug from qhh * refactor MQTT client logging and connection handling; update group ID in config * driver_params allow empty * allow other init param * fix driver param and enhance type hint * refactor MQConfig to use double quotes for string literals * fix wrong function calling * fix wrong function calling * fix log for mac * fix networkx compatibility * add mqtt loggers * add action to jsonschema converter * random client id * type converter & registry * correct conversion * fix action publish only from discovered devices * add "Bio" tag for action doc generation * 改进module提示 * Fix doc * mqtt不连接也可用 性价样例提示 * add docs * 更新plr test案例 * Update intro.md * 更新有机案例 * skip --------- Co-authored-by: Harvey Que <Q-Query@outlook.com> Co-authored-by: Junhan Chang <1700011741@pku.edu.cn> --------- Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn> Co-authored-by: Junhan Chang <changjh@pku.edu.cn> Co-authored-by: jiawei <miaojiawei@dp.tech> Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn> Co-authored-by: Harvey Que <Q-Query@outlook.com> Co-authored-by: Junhan Chang <1700011741@pku.edu.cn> * Canonicalize before Open Source (#46) * big big refactor try01 * refactor 02 --------- Co-authored-by: ck <xiaoyeqiannian@163.com> Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> Co-authored-by: q434343 <554662886@qq.com> Co-authored-by: Xuwznln <xuwznln@gmail.com> Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com> Co-authored-by: 3218923350 <105201755+3218923350@users.noreply.github.com> Co-authored-by: Xuwznln <1023025701@qq.com> Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn> Co-authored-by: jiawei <miaojiawei@dp.tech> Co-authored-by: Jiawei <91898272+jiawei723@users.noreply.github.com> Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn> Co-authored-by: Harvey Que <Q-Query@outlook.com>
478 lines
18 KiB
Python
478 lines
18 KiB
Python
import importlib
|
||
import json
|
||
from typing import Union
|
||
import numpy as np
|
||
import networkx as nx
|
||
|
||
try:
|
||
from pylabrobot.resources.resource import Resource as ResourcePLR
|
||
except ImportError:
|
||
pass
|
||
|
||
|
||
physical_setup_graph: nx.Graph = None
|
||
|
||
|
||
def canonicalize_nodes_data(data: dict, parent_relation: dict = {}) -> dict:
|
||
for node in data.get("nodes", []):
|
||
if node.get("label") is not None:
|
||
id = node.pop("label")
|
||
node["id"] = node["name"] = id
|
||
if "id" not in node:
|
||
node["id"] = node.get("name", "NaN")
|
||
if "name" not in node:
|
||
node["name"] = node["id"]
|
||
if node.get("position") is None:
|
||
node["position"] = {
|
||
"x": node.pop("x", 0.0),
|
||
"y": node.pop("y", 0.0),
|
||
"z": node.pop("z", 0.0),
|
||
}
|
||
if node.get("config") is None:
|
||
node["config"] = {}
|
||
node["data"] = {}
|
||
for k in list(node.keys()):
|
||
if k not in [
|
||
"id",
|
||
"name",
|
||
"class",
|
||
"type",
|
||
"position",
|
||
"children",
|
||
"parent",
|
||
"config",
|
||
"data",
|
||
]:
|
||
if k in ["chemical", "current_volume"]:
|
||
if node["data"].get("liquids") is None:
|
||
node["data"]["liquids"] = [{}]
|
||
if k == "chemical":
|
||
node["data"]["liquids"][0]["liquid_name"] = node.pop(k)
|
||
elif k == "current_volume":
|
||
node["data"]["liquids"][0]["liquid_volume"] = node.pop(k)
|
||
elif k == "max_volume":
|
||
node["data"]["max_volume"] = node.pop(k)
|
||
elif k == "url":
|
||
node.pop(k)
|
||
else:
|
||
node["config"][k] = node.pop(k)
|
||
if "class" not in node:
|
||
node["class"] = None
|
||
if "type" not in node:
|
||
node["type"] = (
|
||
"container"
|
||
if node["class"] is None
|
||
else "device" if node["class"] not in ["container", "plate"] else node["class"]
|
||
)
|
||
if "children" not in node:
|
||
node["children"] = []
|
||
|
||
id2idx = {node_data["id"]: idx for idx, node_data in enumerate(data["nodes"])}
|
||
for parent, children in parent_relation.items():
|
||
data["nodes"][id2idx[parent]]["children"] = children
|
||
for child in children:
|
||
data["nodes"][id2idx[child]]["parent"] = parent
|
||
return data
|
||
|
||
|
||
def canonicalize_links_ports(data: dict) -> dict:
|
||
# 第一遍处理:将字符串类型的port转换为字典格式
|
||
for link in data.get("links", []):
|
||
port = link.get("port")
|
||
if isinstance(port, int):
|
||
port = str(port)
|
||
if isinstance(port, str):
|
||
port_str = port.strip()
|
||
if port_str.startswith("(") and port_str.endswith(")"):
|
||
# 处理格式为 "(A,B)" 的情况
|
||
content = port_str[1:-1].strip()
|
||
parts = [p.strip() for p in content.split(",", 1)]
|
||
source_port = parts[0]
|
||
dest_port = parts[1] if len(parts) > 1 else None
|
||
else:
|
||
# 处理格式为 "A" 的情况
|
||
source_port = port_str
|
||
dest_port = None
|
||
link["port"] = {link["source"]: source_port, link["target"]: dest_port}
|
||
elif not isinstance(port, dict):
|
||
# 若port既非字符串也非字典,初始化为空结构
|
||
link["port"] = {link["source"]: None, link["target"]: None}
|
||
|
||
# 构建边字典,键为(source节点, target节点),值为对应的port信息
|
||
edges = {(link["source"], link["target"]): link["port"] for link in data.get("links", [])}
|
||
|
||
# 第二遍处理:填充反向边的dest信息
|
||
delete_reverses = []
|
||
for i, link in enumerate(data.get("links", [])):
|
||
s, t = link["source"], link["target"]
|
||
current_port = link["port"]
|
||
if current_port.get(t) is None:
|
||
reverse_key = (t, s)
|
||
reverse_port = edges.get(reverse_key)
|
||
if reverse_port:
|
||
reverse_source = reverse_port.get(s)
|
||
if reverse_source is not None:
|
||
# 设置当前边的dest为反向边的source
|
||
current_port[t] = reverse_source
|
||
delete_reverses.append(i)
|
||
else:
|
||
# 若不存在反向边,初始化为空结构
|
||
current_port[t] = current_port[s]
|
||
# 删除已被使用反向端口信息的反向边
|
||
data["links"] = [link for i, link in enumerate(data.get("links", [])) if i not in delete_reverses]
|
||
|
||
return data
|
||
|
||
|
||
def handle_communications(G: nx.Graph):
|
||
available_communication_types = ["serial", "io_device", "plc", "io"]
|
||
for e, edata in G.edges.items():
|
||
if edata.get("type", "physical") != "communication":
|
||
continue
|
||
if G.nodes[e[0]].get("class") in available_communication_types:
|
||
device_comm, device = e[0], e[1]
|
||
elif G.nodes[e[1]].get("class") in available_communication_types:
|
||
device_comm, device = e[1], e[0]
|
||
else:
|
||
continue
|
||
|
||
if G.nodes[device_comm].get("class") == "serial":
|
||
G.nodes[device]["config"]["port"] = device_comm
|
||
elif G.nodes[device_comm].get("class") == "io_device":
|
||
print(f'!!! Modify {device}\'s io_device_port to {edata["port"][device_comm]}')
|
||
G.nodes[device]["config"]["io_device_port"] = int(edata["port"][device_comm])
|
||
|
||
|
||
def read_node_link_json(json_file):
|
||
global physical_setup_graph
|
||
|
||
data = json.load(open(json_file, encoding="utf-8"))
|
||
data = canonicalize_nodes_data(data)
|
||
data = canonicalize_links_ports(data)
|
||
|
||
physical_setup_graph = nx.node_link_graph(data, multigraph=False) # edges="links" 3.6 warning
|
||
handle_communications(physical_setup_graph)
|
||
return physical_setup_graph
|
||
|
||
|
||
def read_graphml(graphml_file):
|
||
global physical_setup_graph
|
||
|
||
G = nx.read_graphml(graphml_file)
|
||
mapping = {}
|
||
parent_relation = {}
|
||
for node in G.nodes():
|
||
label = G.nodes[node].pop("label", G.nodes[node].get("id", G.nodes[node].get("name", "NaN")))
|
||
mapping[node] = label
|
||
if "::" in node:
|
||
parent = mapping[node.split("::")[0]]
|
||
if parent not in parent_relation:
|
||
parent_relation[parent] = []
|
||
parent_relation[parent].append(label)
|
||
|
||
G2 = nx.relabel_nodes(G, mapping)
|
||
data = nx.node_link_data(G2)
|
||
data = canonicalize_nodes_data(data, parent_relation=parent_relation)
|
||
data = canonicalize_links_ports(data)
|
||
|
||
physical_setup_graph = nx.node_link_graph(data, edges="links", multigraph=False) # edges="links" 3.6 warning
|
||
handle_communications(physical_setup_graph)
|
||
return physical_setup_graph
|
||
|
||
|
||
def dict_from_graph(graph: nx.Graph) -> dict:
|
||
nodes_copy = {node_id: {"id": node_id, **node} for node_id, node in graph.nodes(data=True)}
|
||
return nodes_copy
|
||
|
||
|
||
def dict_to_tree(nodes: dict, devices_only: bool = False) -> list[dict]:
|
||
# 将节点转换为字典,以便通过 ID 快速查找
|
||
nodes_list = [node for node in nodes.values() if node.get("type") == "device" or not devices_only]
|
||
|
||
# 初始化每个节点的 children 为包含节点字典的列表
|
||
for node in nodes_list:
|
||
node["children"] = [nodes[child_id] for child_id in node.get("children", [])]
|
||
|
||
# 找到根节点并返回
|
||
root_nodes = [
|
||
node for node in nodes_list if len(nodes_list) == 1 or node.get("parent", node.get("parent_name")) in [None, "", "None", np.nan]
|
||
]
|
||
|
||
# 如果存在多个根节点,返回所有根节点
|
||
return root_nodes
|
||
|
||
|
||
def dict_to_nested_dict(nodes: dict, devices_only: bool = False) -> dict:
|
||
# 将节点转换为字典,以便通过 ID 快速查找
|
||
nodes_list = [node for node in nodes.values() if node.get("type") == "device" or not devices_only]
|
||
|
||
# 初始化每个节点的 children 为包含节点字典的列表
|
||
for node in nodes_list:
|
||
node["children"] = {
|
||
child_id: nodes[child_id]
|
||
for child_id in node.get("children", [])
|
||
if nodes[child_id].get("type") == "device" or not devices_only
|
||
}
|
||
if len(node["children"]) > 0 and node["type"].lower() == "device" and devices_only:
|
||
node["config"]["children"] = node["children"]
|
||
|
||
# 找到根节点并返回
|
||
root_nodes = {
|
||
node["id"]: node
|
||
for node in nodes_list
|
||
if node.get("parent", node.get("parent_name")) in [None, "", "None", np.nan]
|
||
}
|
||
|
||
# 如果存在多个根节点,返回所有根节点
|
||
return root_nodes
|
||
|
||
|
||
def list_to_nested_dict(nodes: list[dict]) -> dict:
|
||
nodes_dict = {node["id"]: node for node in nodes}
|
||
return dict_to_nested_dict(nodes_dict)
|
||
|
||
|
||
def tree_to_list(tree: list[dict]) -> list[dict]:
|
||
def _tree_to_list(tree: list[dict], result: list[dict]):
|
||
for node_ in tree:
|
||
node = node_.copy()
|
||
result.append(node)
|
||
if node.get("children"):
|
||
_tree_to_list(node["children"], result)
|
||
node["children"] = [n["id"] for n in node["children"]]
|
||
|
||
result = []
|
||
_tree_to_list(tree, result)
|
||
return result
|
||
|
||
|
||
def nested_dict_to_list(nested_dict: dict) -> list[dict]: # FIXME 是tree?
|
||
"""
|
||
将嵌套字典转换为扁平列表
|
||
|
||
嵌套字典的层次结构将通过children属性表示
|
||
|
||
Args:
|
||
nested_dict: 嵌套的字典结构
|
||
|
||
Returns:
|
||
扁平化的字典列表
|
||
"""
|
||
result = []
|
||
|
||
# 如果输入本身是一个节点,先添加它
|
||
if "id" in nested_dict:
|
||
node = nested_dict.copy()
|
||
# 暂存子节点
|
||
children_dict = node.get("children", {})
|
||
# 如果children是字典,将其转换为键列表
|
||
if isinstance(children_dict, dict):
|
||
node["children"] = list(children_dict.keys())
|
||
elif not isinstance(children_dict, list):
|
||
node["children"] = []
|
||
result.append(node)
|
||
|
||
# 处理子节点字典
|
||
if isinstance(children_dict, dict):
|
||
for child_id, child_data in children_dict.items():
|
||
if isinstance(child_data, dict):
|
||
# 为子节点添加ID(如果不存在)
|
||
if "id" not in child_data:
|
||
child_data["id"] = child_id
|
||
# 递归处理子节点
|
||
result.extend(nested_dict_to_list(child_data))
|
||
|
||
# 处理children字段
|
||
elif "children" in nested_dict:
|
||
children_dict = nested_dict.get("children", {})
|
||
if isinstance(children_dict, dict):
|
||
for child_id, child_data in children_dict.items():
|
||
if isinstance(child_data, dict):
|
||
# 为子节点添加ID(如果不存在)
|
||
if "id" not in child_data:
|
||
child_data["id"] = child_id
|
||
# 递归处理子节点
|
||
result.extend(nested_dict_to_list(child_data))
|
||
|
||
return result
|
||
|
||
|
||
def convert_resources_to_type(
|
||
resources_list: list[dict], resource_type: type, *, plr_model: bool = False
|
||
) -> Union[list[dict], dict, None, "ResourcePLR"]:
|
||
"""
|
||
Convert resources to a given type (PyLabRobot or NestedDict) from flattened list of dictionaries.
|
||
|
||
Args:
|
||
resources: List of resources in the flattened dictionary format.
|
||
resource_type: Type of the resources to convert to.
|
||
plr_model: 是否有plr_model类型
|
||
|
||
Returns:
|
||
List of resources in the given type.
|
||
"""
|
||
if resource_type == dict:
|
||
return list_to_nested_dict(resources_list)
|
||
elif isinstance(resource_type, type) and issubclass(resource_type, ResourcePLR):
|
||
if isinstance(resources_list, dict):
|
||
return resource_ulab_to_plr(resources_list, plr_model)
|
||
resources_tree = dict_to_tree({r["id"]: r for r in resources_list})
|
||
return resource_ulab_to_plr(resources_tree[0], plr_model)
|
||
elif isinstance(resource_type, list) and all(issubclass(t, ResourcePLR) for t in resource_type):
|
||
resources_tree = dict_to_tree({r["id"]: r for r in resources_list})
|
||
return [resource_ulab_to_plr(r, plr_model) for r in resources_tree]
|
||
else:
|
||
return None
|
||
|
||
|
||
def convert_resources_from_type(resources_list, resource_type: type) -> Union[list[dict], dict, None, "ResourcePLR"]:
|
||
"""
|
||
Convert resources from a given type (PyLabRobot or NestedDict) to flattened list of dictionaries.
|
||
|
||
Args:
|
||
resources_list: List of resources in the given type.
|
||
resource_type: Type of the resources to convert from.
|
||
|
||
Returns:
|
||
List of resources in the flattened dictionary format.
|
||
"""
|
||
if resource_type == dict:
|
||
return nested_dict_to_list(resources_list)
|
||
elif isinstance(resource_type, type) and issubclass(resource_type, ResourcePLR):
|
||
resources_tree = [resource_plr_to_ulab(resources_list)]
|
||
return tree_to_list(resources_tree)
|
||
elif isinstance(resource_type, list) and all(issubclass(t, ResourcePLR) for t in resource_type):
|
||
resources_tree = [resource_plr_to_ulab(r) for r in resources_list]
|
||
return tree_to_list(resources_tree)
|
||
else:
|
||
return None
|
||
|
||
|
||
def resource_ulab_to_plr(resource: dict, plr_model=False) -> "ResourcePLR":
|
||
"""
|
||
Resource有model字段,但是Deck下没有,这个plr由外面判断传入
|
||
"""
|
||
if ResourcePLR is None:
|
||
raise ImportError("pylabrobot not found")
|
||
|
||
all_states = {resource["id"]: resource["data"]}
|
||
|
||
def resource_ulab_to_plr_inner(resource: dict):
|
||
all_states[resource["name"]] = resource["data"]
|
||
d = {
|
||
"name": resource["name"],
|
||
"type": resource["type"],
|
||
"size_x": resource["config"].get("size_x", 0),
|
||
"size_y": resource["config"].get("size_y", 0),
|
||
"size_z": resource["config"].get("size_z", 0),
|
||
"location": {**resource["position"], "type": "Coordinate"},
|
||
"rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"}, # Resource如果没有rotation,是plr版本太低
|
||
"category": resource["type"],
|
||
"model": resource["config"].get("model", None), # resource中deck没有model
|
||
"children": (
|
||
[resource_ulab_to_plr_inner(child) for child in resource["children"]]
|
||
if isinstance(resource["children"], list)
|
||
else [resource_ulab_to_plr_inner(child) for child_id, child in resource["children"].items()]
|
||
),
|
||
"parent_name": resource["parent"] if resource["parent"] is not None else None,
|
||
**resource["config"],
|
||
}
|
||
if not plr_model:
|
||
d.pop("model")
|
||
return d
|
||
|
||
d = resource_ulab_to_plr_inner(resource)
|
||
"""无法通过Resource进行反序列化,例如TipSpot必须内部序列化好,直接用TipSpot序列化会多参数,导致出错"""
|
||
from pylabrobot.utils.object_parsing import find_subclass
|
||
resource_plr = find_subclass(d["type"], ResourcePLR).deserialize(d, allow_marshal=True)
|
||
resource_plr.load_all_state(all_states)
|
||
return resource_plr
|
||
|
||
|
||
def resource_plr_to_ulab(resource_plr: "ResourcePLR"):
|
||
def resource_plr_to_ulab_inner(d: dict, all_states: dict) -> dict:
|
||
r = {
|
||
"id": d["name"],
|
||
"name": d["name"],
|
||
"sample_id": None,
|
||
"children": [resource_plr_to_ulab_inner(child, all_states) for child in d["children"]],
|
||
"parent": d["parent_name"] if d["parent_name"] else None,
|
||
"type": "device", # FIXME plr自带的type是python class name
|
||
"class": d.get("class", ""),
|
||
"position": (
|
||
{"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]}
|
||
if d["location"]
|
||
else {"x": 0, "y": 0, "z": 0}
|
||
),
|
||
"config": {k: v for k, v in d.items() if k not in ["name", "children", "parent_name", "location"]},
|
||
"data": all_states[d["name"]],
|
||
}
|
||
return r
|
||
|
||
d = resource_plr.serialize()
|
||
all_states = resource_plr.serialize_all_state()
|
||
r = resource_plr_to_ulab_inner(d, all_states)
|
||
return r
|
||
|
||
|
||
def initialize_resource(resource_config: dict, lab_registry: dict) -> list[dict]:
|
||
"""Initializes a resource based on its configuration.
|
||
|
||
If the config is detailed, then do nothing;
|
||
If it is a string, then import the appropriate class and create an instance of it.
|
||
|
||
Args:
|
||
resource_config (dict): The configuration dictionary for the resource, which includes the class type and other parameters.
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
resource_class_config = resource_config.get("class", None)
|
||
if resource_class_config is None:
|
||
return [resource_config]
|
||
elif type(resource_class_config) == str:
|
||
# Allow special resource class names to be used
|
||
if resource_class_config not in lab_registry.resource_type_registry:
|
||
return [resource_config]
|
||
# If the resource class is a string, look up the class in the
|
||
# resource_type_registry and import it
|
||
resource_class_config = resource_config["class"] = lab_registry.resource_type_registry[resource_class_config][
|
||
"class"
|
||
]
|
||
if type(resource_class_config) == dict:
|
||
module = importlib.import_module(resource_class_config["module"].split(":")[0])
|
||
mclass = resource_class_config["module"].split(":")[1]
|
||
RESOURCE = getattr(module, mclass)
|
||
|
||
if resource_class_config["type"] == "pylabrobot":
|
||
resource_plr = RESOURCE(name=resource_config["name"])
|
||
r = resource_plr_to_ulab(resource_plr=resource_plr)
|
||
if resource_config.get("position") is not None:
|
||
r["position"] = resource_config["position"]
|
||
r = tree_to_list([r])
|
||
elif isinstance(RESOURCE, dict):
|
||
r = [RESOURCE.copy()]
|
||
|
||
return r
|
||
|
||
|
||
def initialize_resources(resources_config) -> list[dict]:
|
||
"""Initializes a list of resources based on their configuration.
|
||
|
||
If the config is detailed, then do nothing;
|
||
If it is a string, then import the appropriate class and create an instance of it.
|
||
|
||
Args:
|
||
resources_config (list[dict]): The configuration dictionary for the resources, which includes the class type and other parameters.
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
|
||
from unilabos.registry.registry import lab_registry
|
||
|
||
resources = []
|
||
for resource_config in resources_config:
|
||
resources.extend(initialize_resource(resource_config, lab_registry))
|
||
return resources
|