Files
Uni-Lab-OS/unilabos/ros/nodes/resource_tracker.py
Junhan Chang 0bfb52df00 Squash merge from dev
Update recipe.yaml

fix: figure_resource

use call_async in all service to avoid deadlock

fix: prcxi import error

临时兼容错误的driver写法

fix protocol node

fix filter protocol

bugfixes on organic protocols

fix and remove redundant info

feat: 新增use_remote_resource参数

fix all protocol_compilers and remove deprecated devices

feat: 优化protocol node节点运行日志

fix pumps and liquid_handler handle

feat: workstation example

add: prcxi res
fix: startup slow

fix: prcxi_res

fix: discard_tips

fix: discard_tips error

fix: drop_tips not using auto resource select

feat: 添加ChinWe设备控制类,支持串口通信和电机控制功能 (#79)

feat: add trace log level

modify default discovery_interval to 15s

fix: working dir error when input config path
feat: report publish topic when error

fix: workstation handlers and vessel_id parsing

Cleanup registry to be easy-understanding (#76)

* delete deprecated mock devices

* rename categories

* combine chromatographic devices

* rename rviz simulation nodes

* organic virtual devices

* parse vessel_id

* run registry completion before merge

---------

Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>
2025-09-10 21:41:50 +08:00

93 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List, Tuple, Any
from unilabos.utils.log import logger
class DeviceNodeResourceTracker(object):
def __init__(self):
self.resources = []
self.resource2parent_resource = {}
pass
def prefix_path(self, resource):
resource_prefix_path = "/"
resource_parent = getattr(resource, "parent", None)
while resource_parent is not None:
resource_prefix_path = f"/{resource_parent.name}" + resource_prefix_path
resource_parent = resource_parent.parent
return resource_prefix_path
def parent_resource(self, resource):
if id(resource) in self.resource2parent_resource:
return self.resource2parent_resource[id(resource)]
else:
return resource
def add_resource(self, resource):
for r in self.resources:
if id(r) == id(resource):
return
self.resources.append(resource)
def clear_resource(self):
self.resources = []
def figure_resource(self, query_resource, try_mode=False):
if isinstance(query_resource, list):
return [self.figure_resource(r, try_mode) for r in query_resource]
elif isinstance(query_resource, dict) and "id" not in query_resource and "name" not in query_resource: # 临时处理要删除的driver有太多类型错误标注
return [self.figure_resource(r, try_mode) for r in query_resource.values()]
res_id = query_resource.id if hasattr(query_resource, "id") else (query_resource.get("id") if isinstance(query_resource, dict) else None)
res_name = query_resource.name if hasattr(query_resource, "name") else (query_resource.get("name") if isinstance(query_resource, dict) else None)
res_identifier = res_id if res_id else res_name
identifier_key = "id" if res_id else "name"
resource_cls_type = type(query_resource)
if res_identifier is None:
logger.warning(f"resource {query_resource} 没有id或name暂不能对应figure")
res_list = []
for r in self.resources:
if isinstance(query_resource, dict):
res_list.extend(
self.loop_find_resource(r, resource_cls_type, identifier_key, query_resource[identifier_key])
)
else:
res_list.extend(
self.loop_find_resource(r, resource_cls_type, identifier_key, getattr(query_resource, identifier_key))
)
if not try_mode:
assert len(res_list) > 0, f"没有找到资源 {query_resource},请检查资源是否存在"
assert len(res_list) == 1, f"{query_resource} 找到多个资源,请检查资源是否唯一: {res_list}"
else:
return [i[1] for i in res_list]
# 后续加入其他对比方式
self.resource2parent_resource[id(query_resource)] = res_list[0][0]
self.resource2parent_resource[id(res_list[0][1])] = res_list[0][0]
return res_list[0][1]
def loop_find_resource(self, resource, target_resource_cls_type, identifier_key, compare_value, parent_res=None) -> List[Tuple[Any, Any]]:
res_list = []
# print(resource, target_resource_cls_type, identifier_key, compare_value)
children = getattr(resource, "children", [])
for child in children:
res_list.extend(self.loop_find_resource(child, target_resource_cls_type, identifier_key, compare_value, resource))
if target_resource_cls_type == type(resource):
if target_resource_cls_type == dict:
if identifier_key in resource:
if resource[identifier_key] == compare_value:
res_list.append((parent_res, resource))
elif hasattr(resource, identifier_key):
if getattr(resource, identifier_key) == compare_value:
res_list.append((parent_res, resource))
return res_list
def filter_find_list(self, res_list, compare_std_dict):
new_list = []
for res in res_list:
for k, v in compare_std_dict.items():
if hasattr(res, k):
if getattr(res, k) == v:
new_list.append(res)
return new_list