feat: workstation example

This commit is contained in:
Xuwznln
2025-08-29 02:47:20 +08:00
parent ce5bab3af1
commit 19027350fb
11 changed files with 2533 additions and 827 deletions

View File

@@ -1,7 +1,12 @@
from typing import List, Tuple, Any
from typing import List, Tuple, Any, Dict, TYPE_CHECKING
from abc import ABC, abstractmethod
from unilabos.utils.log import logger
if TYPE_CHECKING:
from unilabos.devices.workstation.workstation_base import WorkstationBase
from pylabrobot.resources import Resource as PLRResource
class DeviceNodeResourceTracker(object):
@@ -37,10 +42,20 @@ class DeviceNodeResourceTracker(object):
def figure_resource(self, query_resource, try_mode=False):
if isinstance(query_resource, list):
return [self.figure_resource(r, try_mode) for r in query_resource]
elif isinstance(query_resource, dict) and "id" not in query_resource and "name" not in query_resource: # 临时处理要删除的driver有太多类型错误标注
elif (
isinstance(query_resource, dict) and "id" not in query_resource and "name" not in query_resource
): # 临时处理要删除的driver有太多类型错误标注
return [self.figure_resource(r, try_mode) for r in query_resource.values()]
res_id = query_resource.id if hasattr(query_resource, "id") else (query_resource.get("id") if isinstance(query_resource, dict) else None)
res_name = query_resource.name if hasattr(query_resource, "name") else (query_resource.get("name") if isinstance(query_resource, dict) else None)
res_id = (
query_resource.id
if hasattr(query_resource, "id")
else (query_resource.get("id") if isinstance(query_resource, dict) else None)
)
res_name = (
query_resource.name
if hasattr(query_resource, "name")
else (query_resource.get("name") if isinstance(query_resource, dict) else None)
)
res_identifier = res_id if res_id else res_name
identifier_key = "id" if res_id else "name"
resource_cls_type = type(query_resource)
@@ -54,7 +69,9 @@ class DeviceNodeResourceTracker(object):
)
else:
res_list.extend(
self.loop_find_resource(r, resource_cls_type, identifier_key, getattr(query_resource, identifier_key))
self.loop_find_resource(
r, resource_cls_type, identifier_key, getattr(query_resource, identifier_key)
)
)
if not try_mode:
assert len(res_list) > 0, f"没有找到资源 {query_resource},请检查资源是否存在"
@@ -66,12 +83,16 @@ class DeviceNodeResourceTracker(object):
self.resource2parent_resource[id(res_list[0][1])] = res_list[0][0]
return res_list[0][1]
def loop_find_resource(self, resource, target_resource_cls_type, identifier_key, compare_value, parent_res=None) -> List[Tuple[Any, Any]]:
def loop_find_resource(
self, resource, target_resource_cls_type, identifier_key, compare_value, parent_res=None
) -> List[Tuple[Any, Any]]:
res_list = []
# print(resource, target_resource_cls_type, identifier_key, compare_value)
children = getattr(resource, "children", [])
for child in children:
res_list.extend(self.loop_find_resource(child, target_resource_cls_type, identifier_key, compare_value, resource))
res_list.extend(
self.loop_find_resource(child, target_resource_cls_type, identifier_key, compare_value, resource)
)
if target_resource_cls_type == type(resource):
if target_resource_cls_type == dict:
if identifier_key in resource: