mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-09 00:15:10 +00:00
Compare commits
15 Commits
4e92a26057
...
88ae56806c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
88ae56806c | ||
|
|
95dd8beb81 | ||
|
|
4ab3fadbec | ||
|
|
229888f834 | ||
|
|
b443b39ebf | ||
|
|
0434bbc15b | ||
|
|
5791b81954 | ||
|
|
bd51c74fab | ||
|
|
ba81cbddf8 | ||
|
|
c2895bb197 | ||
|
|
0423f4f452 | ||
|
|
98bdb4e7e4 | ||
|
|
9d2c93807d | ||
|
|
2d26c3fac6 | ||
|
|
f5753afb7c |
68
test/resources/test_resourcetreeset.py
Normal file
68
test/resources/test_resourcetreeset.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import pytest
|
||||
import json
|
||||
import os
|
||||
|
||||
from pylabrobot.resources import Resource as ResourcePLR
|
||||
from unilabos.resources.graphio import resource_bioyond_to_plr
|
||||
from unilabos.ros.nodes.resource_tracker import ResourceTreeSet
|
||||
from unilabos.registry.registry import lab_registry
|
||||
|
||||
from unilabos.resources.bioyond.decks import BIOYOND_PolymerReactionStation_Deck
|
||||
|
||||
lab_registry.setup()
|
||||
|
||||
|
||||
type_mapping = {
|
||||
"烧杯": ("BIOYOND_PolymerStation_1FlaskCarrier", "3a14196b-24f2-ca49-9081-0cab8021bf1a"),
|
||||
"试剂瓶": ("BIOYOND_PolymerStation_1BottleCarrier", ""),
|
||||
"样品板": ("BIOYOND_PolymerStation_6StockCarrier", "3a14196e-b7a0-a5da-1931-35f3000281e9"),
|
||||
"分装板": ("BIOYOND_PolymerStation_6VialCarrier", "3a14196e-5dfe-6e21-0c79-fe2036d052c4"),
|
||||
"样品瓶": ("BIOYOND_PolymerStation_Solid_Stock", "3a14196a-cf7d-8aea-48d8-b9662c7dba94"),
|
||||
"90%分装小瓶": ("BIOYOND_PolymerStation_Solid_Vial", "3a14196c-cdcf-088d-dc7d-5cf38f0ad9ea"),
|
||||
"10%分装小瓶": ("BIOYOND_PolymerStation_Liquid_Vial", "3a14196c-76be-2279-4e22-7310d69aed68"),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bioyond_materials_reaction() -> list[dict]:
|
||||
print("加载 BioYond 物料数据...")
|
||||
print(os.getcwd())
|
||||
with open("bioyond_materials_reaction.json", "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(f"加载了 {len(data)} 条物料数据")
|
||||
return data
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bioyond_materials_liquidhandling_1() -> list[dict]:
|
||||
print("加载 BioYond 物料数据...")
|
||||
print(os.getcwd())
|
||||
with open("bioyond_materials_liquidhandling_1.json", "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(f"加载了 {len(data)} 条物料数据")
|
||||
return data
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bioyond_materials_liquidhandling_2() -> list[dict]:
|
||||
print("加载 BioYond 物料数据...")
|
||||
print(os.getcwd())
|
||||
with open("bioyond_materials_liquidhandling_2.json", "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(f"加载了 {len(data)} 条物料数据")
|
||||
return data
|
||||
|
||||
|
||||
@pytest.mark.parametrize("materials_fixture", [
|
||||
"bioyond_materials_reaction",
|
||||
"bioyond_materials_liquidhandling_1",
|
||||
])
|
||||
def test_resourcetreeset_from_plr(materials_fixture, request) -> list[dict]:
|
||||
materials = request.getfixturevalue(materials_fixture)
|
||||
deck = BIOYOND_PolymerReactionStation_Deck("test_deck")
|
||||
output = resource_bioyond_to_plr(materials, type_mapping=type_mapping, deck=deck)
|
||||
print(deck.summary())
|
||||
|
||||
r = ResourceTreeSet.from_plr_resources([deck])
|
||||
print(r.dump())
|
||||
# json.dump(deck.serialize(), open("test.json", "w", encoding="utf-8"), indent=4)
|
||||
@@ -12,13 +12,13 @@ from unilabos.devices.workstation.bioyond_studio.config import API_CONFIG
|
||||
|
||||
class BioyondReactionStation(BioyondWorkstation):
|
||||
"""Bioyond反应站类
|
||||
|
||||
|
||||
继承自BioyondWorkstation,提供反应站特定的业务方法
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, config: dict = None, deck=None, protocol_type=None, **kwargs):
|
||||
"""初始化反应站
|
||||
|
||||
|
||||
Args:
|
||||
config: 配置字典,应包含workflow_mappings等配置
|
||||
deck: Deck对象
|
||||
@@ -27,13 +27,13 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
"""
|
||||
if deck is None and config:
|
||||
deck = config.get('deck')
|
||||
|
||||
|
||||
print(f"BioyondReactionStation初始化 - config包含workflow_mappings: {'workflow_mappings' in (config or {})}")
|
||||
if config and 'workflow_mappings' in config:
|
||||
print(f"workflow_mappings内容: {config['workflow_mappings']}")
|
||||
|
||||
|
||||
super().__init__(bioyond_config=config, deck=deck)
|
||||
|
||||
|
||||
print(f"BioyondReactionStation初始化完成 - workflow_mappings: {self.workflow_mappings}")
|
||||
print(f"workflow_mappings长度: {len(self.workflow_mappings)}")
|
||||
|
||||
@@ -49,21 +49,21 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
return json.dumps({"suc": True})
|
||||
|
||||
def reactor_taken_in(
|
||||
self,
|
||||
assign_material_name: str,
|
||||
self,
|
||||
assign_material_name: str,
|
||||
cutoff: str = "900000",
|
||||
temperature: float = -10.00
|
||||
):
|
||||
"""反应器放入
|
||||
|
||||
|
||||
Args:
|
||||
assign_material_name: 物料名称(不能为空)
|
||||
cutoff: 截止值/通量配置(需为有效数字字符串,默认 "900000")
|
||||
temperature: 温度上限(°C,范围:-50.00 至 100.00)
|
||||
|
||||
|
||||
Returns:
|
||||
str: JSON 字符串,格式为 {"suc": True}
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: 若物料名称无效或 cutoff 格式错误
|
||||
"""
|
||||
@@ -73,7 +73,7 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
float(cutoff)
|
||||
except ValueError:
|
||||
raise ValueError("cutoff 必须是有效的数字字符串")
|
||||
|
||||
|
||||
self.append_to_workflow_sequence('{"web_workflow_name": "reactor_taken_in"}')
|
||||
material_id = self.hardware_interface._get_material_id_by_name(assign_material_name)
|
||||
if material_id is None:
|
||||
@@ -103,15 +103,15 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
return json.dumps({"suc": True})
|
||||
|
||||
def solid_feeding_vials(
|
||||
self,
|
||||
material_id: str,
|
||||
time: str = "0",
|
||||
self,
|
||||
material_id: str,
|
||||
time: str = "0",
|
||||
torque_variation: int = 1,
|
||||
assign_material_name: str = None,
|
||||
assign_material_name: str = None,
|
||||
temperature: float = 25.00
|
||||
):
|
||||
"""固体进料小瓶
|
||||
|
||||
|
||||
Args:
|
||||
material_id: 粉末类型ID
|
||||
time: 观察时间(分钟)
|
||||
@@ -127,7 +127,7 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
feeding_step_id = WORKFLOW_STEP_IDS["solid_feeding_vials"]["feeding"]
|
||||
observe_step_id = WORKFLOW_STEP_IDS["solid_feeding_vials"]["observe"]
|
||||
|
||||
|
||||
solid_feeding_vials_params = {
|
||||
"param_values": {
|
||||
feeding_step_id: {
|
||||
@@ -152,16 +152,16 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
return json.dumps({"suc": True})
|
||||
|
||||
def liquid_feeding_vials_non_titration(
|
||||
self,
|
||||
self,
|
||||
volume_formula: str,
|
||||
assign_material_name: str,
|
||||
titration_type: str = "1",
|
||||
time: str = "0",
|
||||
torque_variation: int = 1,
|
||||
torque_variation: int = 1,
|
||||
temperature: float = 25.00
|
||||
):
|
||||
"""液体进料小瓶(非滴定)
|
||||
|
||||
|
||||
Args:
|
||||
volume_formula: 分液公式(μL)
|
||||
assign_material_name: 物料名称
|
||||
@@ -180,7 +180,7 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
liquid_step_id = WORKFLOW_STEP_IDS["liquid_feeding_vials_non_titration"]["liquid"]
|
||||
observe_step_id = WORKFLOW_STEP_IDS["liquid_feeding_vials_non_titration"]["observe"]
|
||||
|
||||
|
||||
params = {
|
||||
"param_values": {
|
||||
liquid_step_id: {
|
||||
@@ -206,16 +206,16 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
return json.dumps({"suc": True})
|
||||
|
||||
def liquid_feeding_solvents(
|
||||
self,
|
||||
assign_material_name: str,
|
||||
volume: str,
|
||||
self,
|
||||
assign_material_name: str,
|
||||
volume: str,
|
||||
titration_type: str = "1",
|
||||
time: str = "360",
|
||||
torque_variation: int = 2,
|
||||
time: str = "360",
|
||||
torque_variation: int = 2,
|
||||
temperature: float = 25.00
|
||||
):
|
||||
"""液体进料-溶剂
|
||||
|
||||
|
||||
Args:
|
||||
assign_material_name: 物料名称
|
||||
volume: 分液量(μL)
|
||||
@@ -231,10 +231,10 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
if isinstance(temperature, str):
|
||||
temperature = float(temperature)
|
||||
|
||||
|
||||
liquid_step_id = WORKFLOW_STEP_IDS["liquid_feeding_solvents"]["liquid"]
|
||||
observe_step_id = WORKFLOW_STEP_IDS["liquid_feeding_solvents"]["observe"]
|
||||
|
||||
|
||||
params = {
|
||||
"param_values": {
|
||||
liquid_step_id: {
|
||||
@@ -260,16 +260,16 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
return json.dumps({"suc": True})
|
||||
|
||||
def liquid_feeding_titration(
|
||||
self,
|
||||
volume_formula: str,
|
||||
assign_material_name: str,
|
||||
self,
|
||||
volume_formula: str,
|
||||
assign_material_name: str,
|
||||
titration_type: str = "1",
|
||||
time: str = "90",
|
||||
time: str = "90",
|
||||
torque_variation: int = 2,
|
||||
temperature: float = 25.00
|
||||
):
|
||||
"""液体进料(滴定)
|
||||
|
||||
|
||||
Args:
|
||||
volume_formula: 分液公式(μL)
|
||||
assign_material_name: 物料名称
|
||||
@@ -314,16 +314,16 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
return json.dumps({"suc": True})
|
||||
|
||||
def liquid_feeding_beaker(
|
||||
self,
|
||||
volume: str = "35000",
|
||||
self,
|
||||
volume: str = "35000",
|
||||
assign_material_name: str = "BAPP",
|
||||
time: str = "0",
|
||||
torque_variation: int = 1,
|
||||
time: str = "0",
|
||||
torque_variation: int = 1,
|
||||
titration_type: str = "1",
|
||||
temperature: float = 25.00
|
||||
):
|
||||
"""液体进料烧杯
|
||||
|
||||
|
||||
Args:
|
||||
volume: 分液量(μL)
|
||||
assign_material_name: 物料名称(试剂瓶位)
|
||||
@@ -366,7 +366,7 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
print(f"成功添加液体进料烧杯参数: volume={volume}μL, material={assign_material_name}->ID:{material_id}")
|
||||
print(f"当前队列长度: {len(self.pending_task_params)}")
|
||||
return json.dumps({"suc": True})
|
||||
|
||||
|
||||
def drip_back(
|
||||
self,
|
||||
assign_material_name: str,
|
||||
@@ -377,7 +377,7 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
temperature: float = 25.00
|
||||
):
|
||||
"""滴回去
|
||||
|
||||
|
||||
Args:
|
||||
assign_material_name: 物料名称(液体种类)
|
||||
volume: 分液量(μL)
|
||||
@@ -425,7 +425,7 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
def get_workflow_sequence(self) -> List[str]:
|
||||
"""获取当前工作流执行顺序
|
||||
|
||||
|
||||
Returns:
|
||||
工作流名称列表
|
||||
"""
|
||||
@@ -439,10 +439,10 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
def workflow_step_query(self, workflow_id: str) -> dict:
|
||||
"""查询工作流步骤参数
|
||||
|
||||
|
||||
Args:
|
||||
workflow_id: 工作流ID
|
||||
|
||||
|
||||
Returns:
|
||||
工作流步骤参数字典
|
||||
"""
|
||||
@@ -450,10 +450,10 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
def create_order(self, json_str: str) -> dict:
|
||||
"""创建订单
|
||||
|
||||
|
||||
Args:
|
||||
json_str: 订单参数的JSON字符串
|
||||
|
||||
|
||||
Returns:
|
||||
创建结果
|
||||
"""
|
||||
@@ -463,10 +463,10 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
def process_web_workflows(self, web_workflow_json: str) -> List[Dict[str, str]]:
|
||||
"""处理网页工作流列表
|
||||
|
||||
|
||||
Args:
|
||||
web_workflow_json: JSON 格式的网页工作流列表
|
||||
|
||||
|
||||
Returns:
|
||||
List[Dict[str, str]]: 包含工作流 ID 和名称的字典列表
|
||||
"""
|
||||
@@ -492,11 +492,11 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
def process_and_execute_workflow(self, workflow_name: str, task_name: str) -> dict:
|
||||
"""
|
||||
一站式处理工作流程:解析网页工作流列表,合并工作流(带参数),然后发布任务
|
||||
|
||||
|
||||
Args:
|
||||
workflow_name: 合并后的工作流名称
|
||||
task_name: 任务名称
|
||||
|
||||
|
||||
Returns:
|
||||
任务创建结果
|
||||
"""
|
||||
@@ -504,32 +504,32 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
print(f"\n{'='*60}")
|
||||
print(f"📋 处理网页工作流列表: {web_workflow_list}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
|
||||
web_workflow_json = json.dumps({"web_workflow_list": web_workflow_list})
|
||||
workflows_result = self.process_web_workflows(web_workflow_json)
|
||||
|
||||
|
||||
if not workflows_result:
|
||||
return self._create_error_result("处理网页工作流列表失败", "process_web_workflows")
|
||||
|
||||
|
||||
print(f"workflows_result 类型: {type(workflows_result)}")
|
||||
print(f"workflows_result 内容: {workflows_result}")
|
||||
|
||||
|
||||
workflows_with_params = self._build_workflows_with_parameters(workflows_result)
|
||||
|
||||
|
||||
merge_data = {
|
||||
"name": workflow_name,
|
||||
"workflows": workflows_with_params
|
||||
}
|
||||
|
||||
|
||||
# print(f"\n🔄 合并工作流(带参数),名称: {workflow_name}")
|
||||
merged_workflow = self.merge_workflow_with_parameters(json.dumps(merge_data))
|
||||
|
||||
|
||||
if not merged_workflow:
|
||||
return self._create_error_result("合并工作流失败", "merge_workflow_with_parameters")
|
||||
|
||||
|
||||
workflow_id = merged_workflow.get("subWorkflows", [{}])[0].get("id", "")
|
||||
# print(f"\n📤 使用工作流创建任务: {workflow_name} (ID: {workflow_id})")
|
||||
|
||||
|
||||
order_params = [{
|
||||
"orderCode": f"task_{self.hardware_interface.get_current_time_iso8601()}",
|
||||
"orderName": task_name,
|
||||
@@ -537,16 +537,16 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
"borderNumber": 1,
|
||||
"paramValues": {}
|
||||
}]
|
||||
|
||||
|
||||
result = self.create_order(json.dumps(order_params))
|
||||
|
||||
|
||||
if not result:
|
||||
return self._create_error_result("创建任务失败", "create_order")
|
||||
|
||||
|
||||
# 清空工作流序列和参数,防止下次执行时累积重复
|
||||
self.pending_task_params = []
|
||||
self.clear_workflows()
|
||||
|
||||
self.clear_workflows() # 清空工作流序列,避免重复累积
|
||||
|
||||
# print(f"\n✅ 任务创建成功: {result}")
|
||||
# print(f"\n✅ 任务创建成功")
|
||||
print(f"{'='*60}\n")
|
||||
@@ -555,10 +555,10 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
def _build_workflows_with_parameters(self, workflows_result: list) -> list:
|
||||
"""
|
||||
构建带参数的工作流列表
|
||||
|
||||
|
||||
Args:
|
||||
workflows_result: 处理后的工作流列表(应为包含 id 和 name 的字典列表)
|
||||
|
||||
|
||||
Returns:
|
||||
符合新接口格式的工作流参数结构
|
||||
"""
|
||||
@@ -577,19 +577,19 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
continue
|
||||
workflow_name = workflow_info.get("name", "")
|
||||
# print(f"\n🔧 处理工作流 [{idx}]: {workflow_name} (ID: {workflow_id})")
|
||||
|
||||
|
||||
if idx >= len(self.pending_task_params):
|
||||
# print(f" ⚠️ 无对应参数,跳过")
|
||||
workflows_with_params.append({"id": workflow_id})
|
||||
continue
|
||||
|
||||
|
||||
param_data = self.pending_task_params[idx]
|
||||
param_values = param_data.get("param_values", {})
|
||||
if not param_values:
|
||||
# print(f" ⚠️ 参数为空,跳过")
|
||||
workflows_with_params.append({"id": workflow_id})
|
||||
continue
|
||||
|
||||
|
||||
step_parameters = {}
|
||||
for step_id, actions_dict in param_values.items():
|
||||
# print(f" 📍 步骤ID: {step_id}")
|
||||
@@ -645,25 +645,25 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
def merge_workflow_with_parameters(self, json_str: str) -> dict:
|
||||
"""
|
||||
调用新接口:合并工作流并传递参数
|
||||
|
||||
|
||||
Args:
|
||||
json_str: JSON格式的字符串,包含:
|
||||
- name: 工作流名称
|
||||
- workflows: [{"id": "工作流ID", "stepParameters": {...}}]
|
||||
|
||||
|
||||
Returns:
|
||||
合并后的工作流信息
|
||||
"""
|
||||
try:
|
||||
data = json.loads(json_str)
|
||||
|
||||
|
||||
# 在工作流名称后面添加时间戳,避免重复
|
||||
if "name" in data and data["name"]:
|
||||
timestamp = self.hardware_interface.get_current_time_iso8601().replace(":", "-").replace(".", "-")
|
||||
original_name = data["name"]
|
||||
data["name"] = f"{original_name}_{timestamp}"
|
||||
print(f"🕒 工作流名称已添加时间戳: {original_name} -> {data['name']}")
|
||||
|
||||
|
||||
request_data = {
|
||||
"apiKey": API_CONFIG["api_key"],
|
||||
"requestTime": self.hardware_interface.get_current_time_iso8601(),
|
||||
@@ -672,37 +672,37 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
print(f"\n📤 发送合并请求:")
|
||||
print(f" 工作流名称: {data.get('name')}")
|
||||
print(f" 子工作流数量: {len(data.get('workflows', []))}")
|
||||
|
||||
|
||||
# 打印完整的POST请求内容
|
||||
print(f"\n🔍 POST请求详细内容:")
|
||||
print(f" URL: {self.hardware_interface.host}/api/lims/workflow/merge-workflow-with-parameters")
|
||||
print(f" Headers: {{'Content-Type': 'application/json'}}")
|
||||
print(f" Request Data:")
|
||||
print(f" {json.dumps(request_data, indent=4, ensure_ascii=False)}")
|
||||
#
|
||||
#
|
||||
response = requests.post(
|
||||
f"{self.hardware_interface.host}/api/lims/workflow/merge-workflow-with-parameters",
|
||||
json=request_data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=30
|
||||
)
|
||||
|
||||
|
||||
# # 打印响应详细内容
|
||||
# print(f"\n📥 POST响应详细内容:")
|
||||
# print(f" 状态码: {response.status_code}")
|
||||
# print(f" 响应头: {dict(response.headers)}")
|
||||
# print(f" 响应体: {response.text}")
|
||||
# #
|
||||
# #
|
||||
try:
|
||||
result = response.json()
|
||||
# #
|
||||
# #
|
||||
# print(f"\n📋 解析后的响应JSON:")
|
||||
# print(f" {json.dumps(result, indent=4, ensure_ascii=False)}")
|
||||
# #
|
||||
# #
|
||||
except json.JSONDecodeError:
|
||||
print(f"❌ 服务器返回非 JSON 格式响应: {response.text}")
|
||||
return None
|
||||
|
||||
|
||||
if result.get("code") == 1:
|
||||
print(f"✅ 工作流合并成功(带参数)")
|
||||
return result.get("data", {})
|
||||
@@ -710,7 +710,7 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
error_msg = result.get('message', '未知错误')
|
||||
print(f"❌ 工作流合并失败: {error_msg}")
|
||||
return None
|
||||
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
print(f"❌ 合并工作流请求超时")
|
||||
return None
|
||||
@@ -726,10 +726,10 @@ class BioyondReactionStation(BioyondWorkstation):
|
||||
|
||||
def _validate_and_refresh_workflow_if_needed(self, workflow_name: str) -> bool:
|
||||
"""验证工作流ID是否有效,如果无效则重新合并
|
||||
|
||||
|
||||
Args:
|
||||
workflow_name: 工作流名称
|
||||
|
||||
|
||||
Returns:
|
||||
bool: 验证或刷新是否成功
|
||||
"""
|
||||
|
||||
@@ -668,7 +668,7 @@ __all__ = [
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 简单测试HTTP服务
|
||||
class DummyWorkstation:
|
||||
class BioyondWorkstation:
|
||||
device_id = "WS-001"
|
||||
|
||||
def process_step_finish_report(self, report_request):
|
||||
|
||||
@@ -643,15 +643,15 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
|
||||
for detail in material["detail"]:
|
||||
number = (
|
||||
(detail.get("z", 0) - 1) * plr_material.num_items_x * plr_material.num_items_y
|
||||
+ (detail.get("x", 0) - 1) * plr_material.num_items_x
|
||||
+ (detail.get("y", 0) - 1)
|
||||
+ (detail.get("y", 0) - 1) * plr_material.num_items_y
|
||||
+ (detail.get("x", 0) - 1)
|
||||
)
|
||||
bottle = plr_material[number]
|
||||
if detail["name"] in type_mapping:
|
||||
if detail["typeName"] in type_mapping:
|
||||
# plr_material.unassign_child_resource(bottle)
|
||||
plr_material.sites[number] = None
|
||||
plr_material[number] = initialize_resource(
|
||||
{"name": f'{detail["name"]}_{number}', "class": type_mapping[detail["name"]][0]}, resource_type=ResourcePLR
|
||||
{"name": f'{detail["name"]}_{number}', "class": type_mapping[detail["typeName"]][0]}, resource_type=ResourcePLR
|
||||
)
|
||||
else:
|
||||
bottle.tracker.liquids = [
|
||||
|
||||
@@ -74,6 +74,7 @@ class ItemizedCarrier(ResourcePLR):
|
||||
num_items_x: int = 0,
|
||||
num_items_y: int = 0,
|
||||
num_items_z: int = 0,
|
||||
layout: str = "x-y",
|
||||
sites: Optional[Dict[Union[int, str], Optional[ResourcePLR]]] = None,
|
||||
category: Optional[str] = "carrier",
|
||||
model: Optional[str] = None,
|
||||
@@ -88,6 +89,8 @@ class ItemizedCarrier(ResourcePLR):
|
||||
)
|
||||
self.num_items = len(sites)
|
||||
self.num_items_x, self.num_items_y, self.num_items_z = num_items_x, num_items_y, num_items_z
|
||||
self.layout = "z-y" if self.num_items_z > 1 and self.num_items_x == 1 else "x-z" if self.num_items_z > 1 and self.num_items_y == 1 else "x-y"
|
||||
|
||||
if isinstance(sites, dict):
|
||||
sites = sites or {}
|
||||
self.sites: List[Optional[ResourcePLR]] = list(sites.values())
|
||||
@@ -150,7 +153,7 @@ class ItemizedCarrier(ResourcePLR):
|
||||
def assign_resource_to_site(self, resource: ResourcePLR, spot: int):
|
||||
if self.sites[spot] is not None and not isinstance(self.sites[spot], ResourceHolder):
|
||||
raise ValueError(f"spot {spot} already has a resource, {resource}")
|
||||
self.assign_child_resource(resource, location=self.child_locations.get(str(spot)), spot=spot)
|
||||
self.assign_child_resource(resource, location=self.child_locations.get(list(self._ordering.keys())[spot]), spot=spot)
|
||||
|
||||
def unassign_child_resource(self, resource: ResourcePLR):
|
||||
found = False
|
||||
@@ -403,6 +406,7 @@ class ItemizedCarrier(ResourcePLR):
|
||||
"num_items_x": self.num_items_x,
|
||||
"num_items_y": self.num_items_y,
|
||||
"num_items_z": self.num_items_z,
|
||||
"layout": self.layout,
|
||||
"sites": [{
|
||||
"label": str(identifier),
|
||||
"visible": True if self[identifier] is not None else False,
|
||||
|
||||
@@ -5,10 +5,13 @@ from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_reso
|
||||
from unilabos.resources.itemized_carrier import ItemizedCarrier, ResourcePLR
|
||||
|
||||
|
||||
LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
|
||||
|
||||
def warehouse_factory(
|
||||
name: str,
|
||||
num_items_x: int = 4,
|
||||
num_items_y: int = 1,
|
||||
num_items_x: int = 1,
|
||||
num_items_y: int = 4,
|
||||
num_items_z: int = 4,
|
||||
dx: float = 137.0,
|
||||
dy: float = 96.0,
|
||||
@@ -33,13 +36,16 @@ def warehouse_factory(
|
||||
locations.append(Coordinate(x, y, z))
|
||||
if removed_positions:
|
||||
locations = [loc for i, loc in enumerate(locations) if i not in removed_positions]
|
||||
sites = create_homogeneous_resources(
|
||||
_sites = create_homogeneous_resources(
|
||||
klass=ResourceHolder,
|
||||
locations=locations,
|
||||
resource_size_x=127.0,
|
||||
resource_size_y=86.0,
|
||||
name_prefix=name,
|
||||
)
|
||||
len_x, len_y = (num_items_x, num_items_y) if num_items_z == 1 else (num_items_y, num_items_z) if num_items_x == 1 else (num_items_x, num_items_z)
|
||||
keys = [f"{LETTERS[j]}{i + 1}" for i in range(len_x) for j in range(len_y)]
|
||||
sites = {i: site for i, site in zip(keys, _sites.values())}
|
||||
|
||||
return WareHouse(
|
||||
name=name,
|
||||
|
||||
@@ -32,7 +32,7 @@ class ResourceDictPositionObject(BaseModel):
|
||||
class ResourceDictPosition(BaseModel):
|
||||
size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize)
|
||||
scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale)
|
||||
layout: Literal["2d"] = Field(description="Resource layout", default="2d")
|
||||
layout: Literal["2d", "x-y", "z-y", "x-z"] = Field(description="Resource layout", default="x-y")
|
||||
position: ResourceDictPositionObject = Field(
|
||||
description="Resource position", default_factory=ResourceDictPositionObject
|
||||
)
|
||||
@@ -42,6 +42,7 @@ class ResourceDictPosition(BaseModel):
|
||||
rotation: ResourceDictPositionObject = Field(
|
||||
description="Resource rotation", default_factory=ResourceDictPositionObject
|
||||
)
|
||||
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field(description="Cross section type", default="rectangle")
|
||||
|
||||
|
||||
# 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化
|
||||
@@ -58,6 +59,7 @@ class ResourceDict(BaseModel):
|
||||
type: Literal["device"] | str = Field(description="Resource type")
|
||||
klass: str = Field(alias="class", description="Resource class name")
|
||||
position: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
|
||||
pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
|
||||
config: Dict[str, Any] = Field(description="Resource configuration")
|
||||
data: Dict[str, Any] = Field(description="Resource data")
|
||||
|
||||
@@ -136,6 +138,8 @@ class ResourceDictInstance(object):
|
||||
content["config"] = {}
|
||||
if not content.get("data"):
|
||||
content["data"] = {}
|
||||
if "pose" not in content:
|
||||
content["pose"] = content.get("position", {})
|
||||
return ResourceDictInstance(ResourceDict.model_validate(content))
|
||||
|
||||
def get_nested_dict(self) -> Dict[str, Any]:
|
||||
@@ -330,6 +334,21 @@ class ResourceTreeSet(object):
|
||||
) -> ResourceDictInstance:
|
||||
current_uuid = uuids.pop(0)
|
||||
|
||||
raw_pos = (
|
||||
{"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]}
|
||||
if d["location"]
|
||||
else {"x": 0, "y": 0, "z": 0}
|
||||
)
|
||||
pos = {
|
||||
"size": {"width": d["size_x"], "height": d["size_y"], "depth": d["size_z"]},
|
||||
"scale": {"x": 1.0, "y": 1.0, "z": 1.0},
|
||||
"layout": d.get("layout", "x-y"),
|
||||
"position": raw_pos,
|
||||
"position3d": raw_pos,
|
||||
"rotation": d["rotation"],
|
||||
"cross_section_type": d.get("cross_section_type", "rectangle"),
|
||||
}
|
||||
|
||||
# 先构建当前节点的字典(不包含children)
|
||||
r_dict = {
|
||||
"id": d["name"],
|
||||
@@ -338,12 +357,10 @@ class ResourceTreeSet(object):
|
||||
"parent": parent_resource, # 直接传入 ResourceDict 对象
|
||||
"type": replace_plr_type(d.get("category", "")),
|
||||
"class": d.get("class", ""),
|
||||
"position": (
|
||||
{"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]}
|
||||
if d["location"]
|
||||
else {"x": 0, "y": 0, "z": 0}
|
||||
),
|
||||
"config": {k: v for k, v in d.items() if k not in ["name", "children", "parent_name", "location"]},
|
||||
"position": pos,
|
||||
"pose": pos,
|
||||
"config": {k: v for k, v in d.items() if k not in
|
||||
["name", "children", "parent_name", "location", "rotation", "size_x", "size_y", "size_z", "cross_section_type", "bottom_type"]},
|
||||
"data": states[d["name"]],
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user