Update workstation.

modify workstation_architecture docs

bioyond_HR (#133)

* feat: Enhance Bioyond synchronization and resource management

- Implemented synchronization for all material types (consumables, samples, reagents) from Bioyond, logging detailed information for each type.
- Improved error handling and logging during synchronization processes.
- Added functionality to save Bioyond material IDs in UniLab resources for future updates.
- Enhanced the `sync_to_external` method to handle material movements correctly, including querying and creating materials in Bioyond.
- Updated warehouse configurations to support new storage types and improved layout for better resource management.
- Introduced new resource types such as reactors and tip boxes, with detailed specifications.
- Modified warehouse factory to support column offsets for naming conventions (e.g., A05-D08).
- Improved resource tracking by merging extra attributes instead of overwriting them.
- Added a new method for updating resources in Bioyond, ensuring better synchronization of resource changes.

* feat: 添加TipBox和Reactor的配置到bottles.yaml

* fix: 修复液体投料方法中的volume参数处理逻辑

修复solid_feeding_vials方法中的volume参数处理逻辑,优化solvents参数的使用条件

更新液体投料方法,支持通过溶剂信息自动计算体积,添加solvents参数并更新文档描述

Add batch creation methods for vial and solution tasks

添加批量创建90%10%小瓶投料任务和二胺溶液配置任务的功能,更新相关参数和默认值
This commit is contained in:
ZiWei
2025-10-27 23:55:32 +08:00
committed by Xuwznln
parent 42b78ab4c1
commit 9bd72b48e1
24 changed files with 1765 additions and 1450 deletions

View File

@@ -233,7 +233,7 @@ class BioyondV1RPC(BaseRequest):
return response.get("data", {})
def material_outbound(self, material_id: str, location_name: str, quantity: int) -> dict:
"""指定库位出库物料"""
"""指定库位出库物料(通过库位名称)"""
location_id = LOCATION_MAPPING.get(location_name, location_name)
params = {
@@ -251,7 +251,36 @@ class BioyondV1RPC(BaseRequest):
})
if not response or response['code'] != 1:
return {}
return None
return response
def material_outbound_by_id(self, material_id: str, location_id: str, quantity: int) -> dict:
"""指定库位出库物料直接使用location_id
Args:
material_id: 物料ID
location_id: 库位ID不是库位名称是UUID
quantity: 数量
Returns:
dict: API响应失败返回None
"""
params = {
"materialId": material_id,
"locationId": location_id,
"quantity": quantity
}
response = self.post(
url=f'{self.host}/api/lims/storage/outbound',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response or response['code'] != 1:
return None
return response
# ==================== 工作流查询相关接口 ====================
@@ -703,10 +732,10 @@ class BioyondV1RPC(BaseRequest):
"""预加载材料列表到缓存中"""
try:
print("正在加载材料列表缓存...")
# 加载所有类型的材料:耗材(0)、样品(1)、试剂(2)
material_types = [1, 2]
for type_mode in material_types:
print(f"正在加载类型 {type_mode} 的材料...")
stock_query = f'{{"typeMode": {type_mode}, "includeDetail": true}}'
@@ -723,7 +752,7 @@ class BioyondV1RPC(BaseRequest):
material_id = material.get("id")
if material_name and material_id:
self.material_cache[material_name] = material_id
# 处理样品板等容器中的detail材料
detail_materials = material.get("detail", [])
for detail_material in detail_materials:

View File

@@ -7,7 +7,7 @@ from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstati
class BioyondDispensingStation(BioyondWorkstation):
def __init__(
self,
self,
config,
# 桌子
deck,
@@ -77,7 +77,7 @@ class BioyondDispensingStation(BioyondWorkstation):
- hold_m_name: 库位名称,如"C01"用于查找对应的holdMId
返回: 任务创建结果
异常:
- BioyondException: 各种错误情况下的统一异常
"""
@@ -85,7 +85,7 @@ class BioyondDispensingStation(BioyondWorkstation):
# 1. 参数验证
if not hold_m_name:
raise BioyondException("hold_m_name 是必填参数")
# 检查90%物料参数的完整性
# 90%_1物料如果有物料名称或目标重量就必须有全部参数
if percent_90_1_assign_material_name or percent_90_1_target_weigh:
@@ -93,21 +93,21 @@ class BioyondDispensingStation(BioyondWorkstation):
raise BioyondException("90%_1物料如果提供了目标重量必须同时提供物料名称")
if not percent_90_1_target_weigh:
raise BioyondException("90%_1物料如果提供了物料名称必须同时提供目标重量")
# 90%_2物料如果有物料名称或目标重量就必须有全部参数
if percent_90_2_assign_material_name or percent_90_2_target_weigh:
if not percent_90_2_assign_material_name:
raise BioyondException("90%_2物料如果提供了目标重量必须同时提供物料名称")
if not percent_90_2_target_weigh:
raise BioyondException("90%_2物料如果提供了物料名称必须同时提供目标重量")
# 90%_3物料如果有物料名称或目标重量就必须有全部参数
if percent_90_3_assign_material_name or percent_90_3_target_weigh:
if not percent_90_3_assign_material_name:
raise BioyondException("90%_3物料如果提供了目标重量必须同时提供物料名称")
if not percent_90_3_target_weigh:
raise BioyondException("90%_3物料如果提供了物料名称必须同时提供目标重量")
# 检查10%物料参数的完整性
# 10%_1物料如果有物料名称、目标重量、体积或液体物料名称中的任何一个就必须有全部参数
if any([percent_10_1_assign_material_name, percent_10_1_target_weigh, percent_10_1_volume, percent_10_1_liquid_material_name]):
@@ -119,7 +119,7 @@ class BioyondDispensingStation(BioyondWorkstation):
raise BioyondException("10%_1物料如果提供了其他参数必须同时提供液体体积")
if not percent_10_1_liquid_material_name:
raise BioyondException("10%_1物料如果提供了其他参数必须同时提供液体物料名称")
# 10%_2物料如果有物料名称、目标重量、体积或液体物料名称中的任何一个就必须有全部参数
if any([percent_10_2_assign_material_name, percent_10_2_target_weigh, percent_10_2_volume, percent_10_2_liquid_material_name]):
if not percent_10_2_assign_material_name:
@@ -130,7 +130,7 @@ class BioyondDispensingStation(BioyondWorkstation):
raise BioyondException("10%_2物料如果提供了其他参数必须同时提供液体体积")
if not percent_10_2_liquid_material_name:
raise BioyondException("10%_2物料如果提供了其他参数必须同时提供液体物料名称")
# 10%_3物料如果有物料名称、目标重量、体积或液体物料名称中的任何一个就必须有全部参数
if any([percent_10_3_assign_material_name, percent_10_3_target_weigh, percent_10_3_volume, percent_10_3_liquid_material_name]):
if not percent_10_3_assign_material_name:
@@ -141,7 +141,7 @@ class BioyondDispensingStation(BioyondWorkstation):
raise BioyondException("10%_3物料如果提供了其他参数必须同时提供液体体积")
if not percent_10_3_liquid_material_name:
raise BioyondException("10%_3物料如果提供了其他参数必须同时提供液体物料名称")
# 2. 生成任务编码和设置默认值
order_code = "task_vial_" + str(int(datetime.now().timestamp()))
if order_name is None:
@@ -152,7 +152,7 @@ class BioyondDispensingStation(BioyondWorkstation):
temperature = "40"
if delay_time is None:
delay_time = "600"
# 3. 工作流ID
workflow_id = "3a19310d-16b9-9d81-b109-0748e953694b"
@@ -160,22 +160,22 @@ class BioyondDispensingStation(BioyondWorkstation):
material_info = self.hardware_interface.material_id_query(workflow_id)
if not material_info:
raise BioyondException(f"无法查询工作流 {workflow_id} 的物料信息")
# 获取locations列表
locations = material_info.get("locations", []) if isinstance(material_info, dict) else []
if not locations:
raise BioyondException(f"工作流 {workflow_id} 没有找到库位信息")
# 查找指定名称的库位
hold_mid = None
for location in locations:
if location.get("holdMName") == hold_m_name:
hold_mid = location.get("holdMId")
break
if not hold_mid:
raise BioyondException(f"未找到库位名称为 {hold_m_name} 的库位,请检查名称是否正确")
extend_properties = f"{{\"{ hold_mid }\": {{}}}}"
self.hardware_interface._logger.info(f"找到库位 {hold_m_name} 对应的holdMId: {hold_mid}")
@@ -271,7 +271,7 @@ class BioyondDispensingStation(BioyondWorkstation):
result = self.hardware_interface.create_order(json_str)
self.hardware_interface._logger.info(f"创建90%10%小瓶投料任务结果: {result}")
return json.dumps({"suc": True})
except BioyondException:
# 重新抛出BioyondException
raise
@@ -307,7 +307,7 @@ class BioyondDispensingStation(BioyondWorkstation):
- hold_m_name: 库位名称,如"ODA-1"用于查找对应的holdMId
返回: 任务创建结果
异常:
- BioyondException: 各种错误情况下的统一异常
"""
@@ -321,8 +321,8 @@ class BioyondDispensingStation(BioyondWorkstation):
raise BioyondException("volume 是必填参数")
if not hold_m_name:
raise BioyondException("hold_m_name 是必填参数")
# 2. 生成任务编码和设置默认值
order_code = "task_oda_" + str(int(datetime.now().timestamp()))
if order_name is None:
@@ -333,30 +333,30 @@ class BioyondDispensingStation(BioyondWorkstation):
temperature = "20"
if delay_time is None:
delay_time = "600"
# 3. 工作流ID - 二胺溶液配置工作流
workflow_id = "3a15d4a1-3bbe-76f9-a458-292896a338f5"
# 4. 查询工作流对应的holdMID
material_info = self.hardware_interface.material_id_query(workflow_id)
if not material_info:
raise BioyondException(f"无法查询工作流 {workflow_id} 的物料信息")
# 获取locations列表
locations = material_info.get("locations", []) if isinstance(material_info, dict) else []
if not locations:
raise BioyondException(f"工作流 {workflow_id} 没有找到库位信息")
# 查找指定名称的库位
hold_mid = None
for location in locations:
if location.get("holdMName") == hold_m_name:
hold_mid = location.get("holdMId")
break
if not hold_mid:
raise BioyondException(f"未找到库位名称为 {hold_m_name} 的库位,请检查名称是否正确")
extend_properties = f"{{\"{ hold_mid }\": {{}}}}"
self.hardware_interface._logger.info(f"找到库位 {hold_m_name} 对应的holdMId: {hold_mid}")
@@ -397,9 +397,9 @@ class BioyondDispensingStation(BioyondWorkstation):
# 7. 调用create_order方法创建任务
result = self.hardware_interface.create_order(json_str)
self.hardware_interface._logger.info(f"创建二胺溶液配置任务结果: {result}")
return json.dumps({"suc": True})
except BioyondException:
# 重新抛出BioyondException
raise
@@ -409,17 +409,278 @@ class BioyondDispensingStation(BioyondWorkstation):
self.hardware_interface._logger.error(error_msg)
raise BioyondException(error_msg)
# 批量创建二胺溶液配置任务
def batch_create_diamine_solution_tasks(self,
solutions,
liquid_material_name: str = "NMP",
speed: str = None,
temperature: str = None,
delay_time: str = None) -> str:
"""
批量创建二胺溶液配置任务
参数说明:
- solutions: 溶液列表数组或JSON字符串格式如下:
[
{
"name": "MDA",
"order": 0,
"solid_mass": 5.0,
"solvent_volume": 20,
...
},
...
]
- liquid_material_name: 液体物料名称,默认为"NMP"
- speed: 搅拌速度如果为None则使用默认值400
- temperature: 温度如果为None则使用默认值20
- delay_time: 延迟时间如果为None则使用默认值600
返回: JSON字符串格式的任务创建结果
异常:
- BioyondException: 各种错误情况下的统一异常
"""
try:
# 参数类型转换:如果是字符串则解析为列表
if isinstance(solutions, str):
try:
solutions = json.loads(solutions)
except json.JSONDecodeError as e:
raise BioyondException(f"solutions JSON解析失败: {str(e)}")
# 参数验证
if not isinstance(solutions, list):
raise BioyondException("solutions 必须是列表类型或有效的JSON数组字符串")
if not solutions:
raise BioyondException("solutions 列表不能为空")
# 批量创建任务
results = []
success_count = 0
failed_count = 0
for idx, solution in enumerate(solutions):
try:
# 提取参数
name = solution.get("name")
solid_mass = solution.get("solid_mass")
solvent_volume = solution.get("solvent_volume")
order = solution.get("order")
if not all([name, solid_mass is not None, solvent_volume is not None]):
self.hardware_interface._logger.warning(
f"跳过第 {idx + 1} 个溶液:缺少必要参数"
)
results.append({
"index": idx + 1,
"name": name,
"success": False,
"error": "缺少必要参数"
})
failed_count += 1
continue
# 生成库位名称(直接使用物料名称)
# 如果需要其他命名规则,可以在这里调整
hold_m_name = name
# 调用单个任务创建方法
result = self.create_diamine_solution_task(
order_name=f"二胺溶液配置-{name}",
material_name=name,
target_weigh=str(solid_mass),
volume=str(solvent_volume),
liquid_material_name=liquid_material_name,
speed=speed,
temperature=temperature,
delay_time=delay_time,
hold_m_name=hold_m_name
)
results.append({
"index": idx + 1,
"name": name,
"success": True,
"hold_m_name": hold_m_name
})
success_count += 1
self.hardware_interface._logger.info(
f"成功创建二胺溶液配置任务: {name}"
)
except BioyondException as e:
results.append({
"index": idx + 1,
"name": solution.get("name", "unknown"),
"success": False,
"error": str(e)
})
failed_count += 1
self.hardware_interface._logger.error(
f"创建第 {idx + 1} 个任务失败: {str(e)}"
)
except Exception as e:
results.append({
"index": idx + 1,
"name": solution.get("name", "unknown"),
"success": False,
"error": f"未知错误: {str(e)}"
})
failed_count += 1
self.hardware_interface._logger.error(
f"创建第 {idx + 1} 个任务时发生未知错误: {str(e)}"
)
# 返回汇总结果
summary = {
"total": len(solutions),
"success": success_count,
"failed": failed_count,
"details": results
}
self.hardware_interface._logger.info(
f"批量创建二胺溶液配置任务完成: 总数={len(solutions)}, "
f"成功={success_count}, 失败={failed_count}"
)
# 返回JSON字符串格式
return json.dumps(summary, ensure_ascii=False)
except BioyondException:
raise
except Exception as e:
error_msg = f"批量创建二胺溶液配置任务时发生未预期的错误: {str(e)}"
self.hardware_interface._logger.error(error_msg)
raise BioyondException(error_msg)
# 批量创建90%10%小瓶投料任务
def batch_create_90_10_vial_feeding_tasks(self,
titration,
hold_m_name: str = None,
speed: str = None,
temperature: str = None,
delay_time: str = None,
liquid_material_name: str = "NMP") -> str:
"""
批量创建90%10%小瓶投料任务仅创建1个任务但包含所有90%和10%物料)
参数说明:
- titration: 滴定信息的字典或JSON字符串格式如下:
{
"name": "BTDA",
"main_portion": 1.9152351915461294, # 主称固体质量(g) -> 90%物料
"titration_portion": 0.05923407808905555, # 滴定固体质量(g) -> 10%物料固体
"titration_solvent": 3.050555021586361 # 滴定溶液体积(mL) -> 10%物料液体
}
- hold_m_name: 库位名称,如"C01"。必填参数
- speed: 搅拌速度如果为None则使用默认值400
- temperature: 温度如果为None则使用默认值40
- delay_time: 延迟时间如果为None则使用默认值600
- liquid_material_name: 10%物料的液体物料名称,默认为"NMP"
返回: JSON字符串格式的任务创建结果
异常:
- BioyondException: 各种错误情况下的统一异常
"""
try:
# 参数类型转换:如果是字符串则解析为字典
if isinstance(titration, str):
try:
titration = json.loads(titration)
except json.JSONDecodeError as e:
raise BioyondException(f"titration参数JSON解析失败: {str(e)}")
# 参数验证
if not isinstance(titration, dict):
raise BioyondException("titration 必须是字典类型或有效的JSON字符串")
if not hold_m_name:
raise BioyondException("hold_m_name 是必填参数")
if not titration:
raise BioyondException("titration 参数不能为空")
# 提取滴定数据
name = titration.get("name")
main_portion = titration.get("main_portion") # 主称固体质量
titration_portion = titration.get("titration_portion") # 滴定固体质量
titration_solvent = titration.get("titration_solvent") # 滴定溶液体积
if not all([name, main_portion is not None, titration_portion is not None, titration_solvent is not None]):
raise BioyondException("titration 数据缺少必要参数")
# 将main_portion平均分成3份作为90%物料3个小瓶
portion_90 = main_portion / 3
# 调用单个任务创建方法
result = self.create_90_10_vial_feeding_task(
order_name=f"90%10%小瓶投料-{name}",
speed=speed,
temperature=temperature,
delay_time=delay_time,
# 90%物料 - 主称固体平均分成3份
percent_90_1_assign_material_name=name,
percent_90_1_target_weigh=str(round(portion_90, 6)),
percent_90_2_assign_material_name=name,
percent_90_2_target_weigh=str(round(portion_90, 6)),
percent_90_3_assign_material_name=name,
percent_90_3_target_weigh=str(round(portion_90, 6)),
# 10%物料 - 滴定固体 + 滴定溶剂只使用第1个10%小瓶)
percent_10_1_assign_material_name=name,
percent_10_1_target_weigh=str(round(titration_portion, 6)),
percent_10_1_volume=str(round(titration_solvent, 6)),
percent_10_1_liquid_material_name=liquid_material_name,
hold_m_name=hold_m_name
)
summary = {
"success": True,
"hold_m_name": hold_m_name,
"material_name": name,
"90_vials": {
"count": 3,
"weight_per_vial": round(portion_90, 6),
"total_weight": round(main_portion, 6)
},
"10_vials": {
"count": 1,
"solid_weight": round(titration_portion, 6),
"liquid_volume": round(titration_solvent, 6)
}
}
self.hardware_interface._logger.info(
f"成功创建90%10%小瓶投料任务: {hold_m_name}, "
f"90%物料={portion_90:.6f}g×3, 10%物料={titration_portion:.6f}g+{titration_solvent:.6f}mL"
)
# 返回JSON字符串格式
return json.dumps(summary, ensure_ascii=False)
except BioyondException:
raise
except Exception as e:
error_msg = f"批量创建90%10%小瓶投料任务时发生未预期的错误: {str(e)}"
self.hardware_interface._logger.error(error_msg)
raise BioyondException(error_msg)
if __name__ == "__main__":
bioyond = BioyondDispensingStation(config={
"api_key": "DE9BDDA0",
"api_host": "http://192.168.1.200:44388"
})
# ============ 原有示例代码 ============
# 示例1使用material_id_query查询工作流对应的holdMID
workflow_id_1 = "3a15d4a1-3bbe-76f9-a458-292896a338f5" # 二胺溶液配置工作流ID
workflow_id_2 = "3a19310d-16b9-9d81-b109-0748e953694b" # 90%10%小瓶投料工作流ID
#示例2创建二胺溶液配置任务 - ODA指定库位名称
# bioyond.create_diamine_solution_task(
# order_code="task_oda_" + str(int(datetime.now().timestamp())),
@@ -433,7 +694,7 @@ if __name__ == "__main__":
# delay_time="600",
# hold_m_name="烧杯ODA"
# )
# bioyond.create_diamine_solution_task(
# order_code="task_pda_" + str(int(datetime.now().timestamp())),
# order_name="二胺溶液配置-PDA",
@@ -446,7 +707,7 @@ if __name__ == "__main__":
# delay_time="600",
# hold_m_name="烧杯PDA-2"
# )
# bioyond.create_diamine_solution_task(
# order_code="task_mpda_" + str(int(datetime.now().timestamp())),
# order_name="二胺溶液配置-MPDA",
@@ -462,8 +723,8 @@ if __name__ == "__main__":
bioyond.material_id_query("3a19310d-16b9-9d81-b109-0748e953694b")
bioyond.material_id_query("3a15d4a1-3bbe-76f9-a458-292896a338f5")
#示例4创建90%10%小瓶投料任务
# vial_result = bioyond.create_90_10_vial_feeding_task(
# order_code="task_vial_" + str(int(datetime.now().timestamp())),
@@ -487,7 +748,7 @@ if __name__ == "__main__":
# delay_time="1200",
# hold_m_name="8.4分装板-1"
# )
# vial_result = bioyond.create_90_10_vial_feeding_task(
# order_code="task_vial_" + str(int(datetime.now().timestamp())),
# order_name="90%10%小瓶投料-2",
@@ -510,7 +771,7 @@ if __name__ == "__main__":
# delay_time="1200",
# hold_m_name="8.4分装板-2"
# )
#启动调度器
#bioyond.scheduler_start()
@@ -529,7 +790,7 @@ if __name__ == "__main__":
material_data_yp = {
"typeId": "3a14196e-b7a0-a5da-1931-35f3000281e9",
#"code": "物料编码001",
#"barCode": "物料条码001",
#"barCode": "物料条码001",
"name": "8.4样品板",
"unit": "",
"quantity": 1,
@@ -540,7 +801,7 @@ if __name__ == "__main__":
"name": "BTDA-1",
"quantity": 20,
"x": 1,
"y": 1,
"y": 1,
#"unit": "单位"
"molecular": 1,
"Parameters":"{\"molecular\": 1}"
@@ -585,7 +846,7 @@ if __name__ == "__main__":
material_data_yp = {
"typeId": "3a14196e-b7a0-a5da-1931-35f3000281e9",
#"code": "物料编码001",
#"barCode": "物料条码001",
#"barCode": "物料条码001",
"name": "8.7样品板",
"unit": "",
"quantity": 1,
@@ -596,7 +857,7 @@ if __name__ == "__main__":
"name": "mianfen",
"quantity": 13,
"x": 1,
"y": 1,
"y": 1,
#"unit": "单位"
"molecular": 1,
"Parameters":"{\"molecular\": 1}"
@@ -620,7 +881,7 @@ if __name__ == "__main__":
material_data_fzb_1 = {
"typeId": "3a14196e-5dfe-6e21-0c79-fe2036d052c4",
#"code": "物料编码001",
#"barCode": "物料条码001",
#"barCode": "物料条码001",
"name": "8.7分装板",
"unit": "",
"quantity": 1,
@@ -631,7 +892,7 @@ if __name__ == "__main__":
"name": "10%小瓶1",
"quantity": 1,
"x": 1,
"y": 1,
"y": 1,
#"unit": "单位"
"molecular": 1,
"Parameters":"{\"molecular\": 1}"
@@ -642,7 +903,7 @@ if __name__ == "__main__":
"name": "10%小瓶2",
"quantity": 1,
"x": 1,
"y": 2,
"y": 2,
#"unit": "单位"
"molecular": 1,
"Parameters":"{\"molecular\": 1}"
@@ -653,7 +914,7 @@ if __name__ == "__main__":
"name": "10%小瓶3",
"quantity": 1,
"x": 1,
"y": 3,
"y": 3,
#"unit": "单位"
"molecular": 1,
"Parameters":"{\"molecular\": 1}"
@@ -697,7 +958,7 @@ if __name__ == "__main__":
material_data_fzb_2 = {
"typeId": "3a14196e-5dfe-6e21-0c79-fe2036d052c4",
#"code": "物料编码001",
#"barCode": "物料条码001",
#"barCode": "物料条码001",
"name": "8.4分装板-2",
"unit": "",
"quantity": 1,
@@ -708,7 +969,7 @@ if __name__ == "__main__":
"name": "10%小瓶1",
"quantity": 1,
"x": 1,
"y": 1,
"y": 1,
#"unit": "单位"
"molecular": 1,
"Parameters":"{\"molecular\": 1}"
@@ -719,7 +980,7 @@ if __name__ == "__main__":
"name": "10%小瓶2",
"quantity": 1,
"x": 1,
"y": 2,
"y": 2,
#"unit": "单位"
"molecular": 1,
"Parameters":"{\"molecular\": 1}"
@@ -730,7 +991,7 @@ if __name__ == "__main__":
"name": "10%小瓶3",
"quantity": 1,
"x": 1,
"y": 3,
"y": 3,
#"unit": "单位"
"molecular": 1,
"Parameters":"{\"molecular\": 1}"
@@ -775,7 +1036,7 @@ if __name__ == "__main__":
material_data_sb_oda = {
"typeId": "3a14196b-24f2-ca49-9081-0cab8021bf1a",
#"code": "物料编码001",
#"barCode": "物料条码001",
#"barCode": "物料条码001",
"name": "mianfen1",
"unit": "",
"quantity": 1,
@@ -785,7 +1046,7 @@ if __name__ == "__main__":
material_data_sb_pda_2 = {
"typeId": "3a14196b-24f2-ca49-9081-0cab8021bf1a",
#"code": "物料编码001",
#"barCode": "物料条码001",
#"barCode": "物料条码001",
"name": "mianfen2",
"unit": "",
"quantity": 1,
@@ -795,7 +1056,7 @@ if __name__ == "__main__":
# material_data_sb_mpda = {
# "typeId": "3a14196b-24f2-ca49-9081-0cab8021bf1a",
# #"code": "物料编码001",
# #"barCode": "物料条码001",
# #"barCode": "物料条码001",
# "name": "烧杯MPDA",
# "unit": "个",
# "quantity": 1,

View File

@@ -208,7 +208,8 @@ class BioyondReactionStation(BioyondWorkstation):
def liquid_feeding_solvents(
self,
assign_material_name: str,
volume: str,
volume: str = None,
solvents = None,
titration_type: str = "1",
time: str = "360",
torque_variation: int = 2,
@@ -218,12 +219,41 @@ class BioyondReactionStation(BioyondWorkstation):
Args:
assign_material_name: 物料名称
volume: 分液量(μL)
volume: 分液量(μL),直接指定体积(可选,如果提供solvents则自动计算)
solvents: 溶剂信息的字典或JSON字符串(可选),格式如下:
{
"additional_solvent": 33.55092503597727, # 溶剂体积(mL)
"total_liquid_volume": 48.00916988195499
}
如果提供solvents,则从中提取additional_solvent并转换为μL
titration_type: 是否滴定(1=否, 2=是)
time: 观察时间(分钟)
torque_variation: 是否观察(int类型, 1=否, 2=是)
temperature: 温度设定(°C)
"""
# 处理 volume 参数:优先使用直接传入的 volume,否则从 solvents 中提取
if not volume and solvents is not None:
# 参数类型转换:如果是字符串则解析为字典
if isinstance(solvents, str):
try:
solvents = json.loads(solvents)
except json.JSONDecodeError as e:
raise ValueError(f"solvents参数JSON解析失败: {str(e)}")
# 参数验证
if not isinstance(solvents, dict):
raise ValueError("solvents 必须是字典类型或有效的JSON字符串")
# 提取 additional_solvent 值
additional_solvent = solvents.get("additional_solvent")
if additional_solvent is None:
raise ValueError("solvents 中没有找到 additional_solvent 字段")
# 转换为微升(μL) - 从毫升(mL)转换
volume = str(float(additional_solvent) * 1000)
elif volume is None:
raise ValueError("必须提供 volume 或 solvents 参数之一")
self.append_to_workflow_sequence('{"web_workflow_name": "Liquid_feeding_solvents"}')
material_id = self.hardware_interface._get_material_id_by_name(assign_material_name)
if material_id is None:

View File

@@ -85,8 +85,90 @@ class BioyondResourceSynchronizer(ResourceSynchronizer):
def sync_to_external(self, resource: Any) -> bool:
"""将本地物料数据变更同步到Bioyond系统"""
try:
if self.bioyond_api_client is None:
logger.error("Bioyond API客户端未初始化")
# ✅ 跳过仓库类型的资源 - 仓库是容器,不是物料
resource_category = getattr(resource, "category", None)
if resource_category == "warehouse":
logger.debug(f"[同步→Bioyond] 跳过仓库类型资源: {resource.name} (仓库是容器,不需要同步为物料)")
return True
logger.info(f"[同步→Bioyond] 收到物料变更: {resource.name}")
# 获取物料的 Bioyond ID
extra_info = getattr(resource, "unilabos_extra", {})
material_bioyond_id = extra_info.get("material_bioyond_id")
# ⭐ 如果没有 Bioyond ID尝试从 Bioyond 系统中按名称查询
if not material_bioyond_id:
logger.warning(f"[同步→Bioyond] 物料 {resource.name} 没有 Bioyond ID尝试按名称查询...")
try:
# 查询所有类型的物料0=耗材, 1=样品, 2=试剂
import json
all_materials = []
for type_mode in [0, 1, 2]:
query_params = json.dumps({
"typeMode": type_mode,
"filter": "", # 空字符串表示查询所有
"includeDetail": True
})
materials = self.bioyond_api_client.stock_material(query_params)
if materials:
all_materials.extend(materials)
logger.info(f"[同步→Bioyond] 查询到 {len(all_materials)} 个物料")
# 按名称匹配
for mat in all_materials:
if mat.get("name") == resource.name:
material_bioyond_id = mat.get("id")
mat_type = mat.get("typeName", "未知")
logger.info(f"✅ 找到物料 {resource.name} ({mat_type}) 的 Bioyond ID: {material_bioyond_id[:8]}...")
# 保存 ID 到资源对象
extra_info["material_bioyond_id"] = material_bioyond_id
setattr(resource, "unilabos_extra", extra_info)
break
if not material_bioyond_id:
logger.warning(f"⚠️ 在 Bioyond 系统中未找到名为 {resource.name} 的物料")
logger.info(f"[同步→Bioyond] 这是一个新物料,将创建并入库到 Bioyond 系统")
# 不返回,继续执行后续的创建+入库流程
except Exception as e:
logger.error(f"查询 Bioyond 物料失败: {e}")
import traceback
traceback.print_exc()
return False
# 检查是否有位置更新请求
update_site = extra_info.get("update_resource_site")
if not update_site:
logger.debug(f"[同步→Bioyond] 无位置更新请求")
return True
# ===== 物料移动/创建流程 =====
if material_bioyond_id:
logger.info(f"[同步→Bioyond] 🔄 开始移动物料 {resource.name}{update_site}")
else:
logger.info(f"[同步→Bioyond] 开始创建新物料 {resource.name} 并入库到 {update_site}") # 第1步获取仓库配置
from .config import WAREHOUSE_MAPPING
warehouse_mapping = WAREHOUSE_MAPPING
# 确定目标仓库名称(通过遍历所有仓库的库位配置)
parent_name = None
target_location_uuid = None
for warehouse_name, warehouse_info in warehouse_mapping.items():
site_uuids = warehouse_info.get("site_uuids", {})
if update_site in site_uuids:
parent_name = warehouse_name
target_location_uuid = site_uuids[update_site]
logger.info(f"[同步] 目标仓库: {parent_name}/{update_site}")
logger.info(f"[同步] 目标库位UUID: {target_location_uuid[:8]}...")
break
if not parent_name or not target_location_uuid:
logger.error(f"❌ 库位 {update_site} 没有在 WAREHOUSE_MAPPING 中配置")
logger.debug(f"可用仓库: {list(warehouse_mapping.keys())}")
return False
bioyond_material = resource_plr_to_bioyond(
@@ -171,11 +253,22 @@ class BioyondWorkstation(WorkstationBase):
def post_init(self, ros_node: ROS2WorkstationNode):
self._ros_node = ros_node
# ⭐ 上传 deck包括所有 warehouses 及其中的物料)
# 注意:如果有从 Bioyond 同步的物料,它们已经被放置到 warehouse 中了
# 所以只需要上传 deck物料会作为 warehouse 的 children 一起上传
logger.info("正在上传 deck包括 warehouses 和物料)到云端...")
ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{
"resources": [self.deck]
})
# 清理临时变量(物料已经在 deck 的 warehouse children 中,不需要单独上传)
if hasattr(self, "_synced_resources"):
logger.info(f"{len(self._synced_resources)} 个从Bioyond同步的物料已包含在 deck 中")
self._synced_resources = []
def transfer_resource_to_another(self, resource: List[ResourceSlot], mount_resource: List[ResourceSlot], sites: List[str], mount_device_id: DeviceSlot):
time.sleep(3)
ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True, **{
"plr_resources": resource,
"target_device_id": mount_device_id,
@@ -246,7 +339,7 @@ class BioyondWorkstation(WorkstationBase):
}
# ==================== 工作流合并与参数设置 API ====================
def append_to_workflow_sequence(self, web_workflow_name: str) -> bool:
# 检查是否为JSON格式的字符串
actual_workflow_name = web_workflow_name
@@ -257,7 +350,7 @@ class BioyondWorkstation(WorkstationBase):
print(f"解析JSON格式工作流名称: {web_workflow_name} -> {actual_workflow_name}")
except json.JSONDecodeError:
print(f"JSON解析失败使用原始字符串: {web_workflow_name}")
workflow_id = self._get_workflow(actual_workflow_name)
if workflow_id:
self.workflow_sequence.append(workflow_id)
@@ -322,7 +415,7 @@ class BioyondWorkstation(WorkstationBase):
# ============ 工作站状态管理 ============
def get_station_info(self) -> Dict[str, Any]:
"""获取工作站基础信息
Returns:
Dict[str, Any]: 工作站基础信息包括设备ID、状态等
"""
@@ -450,8 +543,8 @@ class BioyondWorkstation(WorkstationBase):
# 转换为UniLab格式
unilab_resources = resource_bioyond_to_plr(
bioyond_data,
type_mapping=self.bioyond_config["material_type_mappings"],
bioyond_data,
type_mapping=self.bioyond_config["material_type_mappings"],
deck=self.deck
)

View File

@@ -1,583 +0,0 @@
"""
工作站物料管理基类
Workstation Material Management Base Class
基于PyLabRobot的物料管理系统
"""
from typing import Dict, Any, List, Optional, Union, Type
from abc import ABC, abstractmethod
import json
from pylabrobot.resources import (
Resource as PLRResource,
Container,
Deck,
Coordinate as PLRCoordinate,
)
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker
from unilabos.utils.log import logger
from unilabos.resources.graphio import resource_plr_to_ulab, resource_ulab_to_plr
class MaterialManagementBase(ABC):
"""物料管理基类
定义工作站物料管理的标准接口:
1. 物料初始化 - 根据配置创建物料资源
2. 物料追踪 - 实时跟踪物料位置和状态
3. 物料查找 - 按类型、位置、状态查找物料
4. 物料转换 - PyLabRobot与UniLab资源格式转换
"""
def __init__(
self,
device_id: str,
deck_config: Dict[str, Any],
resource_tracker: DeviceNodeResourceTracker,
children_config: Dict[str, Dict[str, Any]] = None
):
self.device_id = device_id
self.deck_config = deck_config
self.resource_tracker = resource_tracker
self.children_config = children_config or {}
# 创建主台面
self.plr_deck = self._create_deck()
# 扩展ResourceTracker
self._extend_resource_tracker()
# 注册deck到resource tracker
self.resource_tracker.add_resource(self.plr_deck)
# 初始化子资源
self.plr_resources = {}
self._initialize_materials()
def _create_deck(self) -> Deck:
"""创建主台面"""
return Deck(
name=f"{self.device_id}_deck",
size_x=self.deck_config.get("size_x", 1000.0),
size_y=self.deck_config.get("size_y", 1000.0),
size_z=self.deck_config.get("size_z", 500.0),
origin=PLRCoordinate(0, 0, 0)
)
def _extend_resource_tracker(self):
"""扩展ResourceTracker以支持PyLabRobot特定功能"""
def find_by_type(resource_type):
"""按类型查找资源"""
return self._find_resources_by_type_recursive(self.plr_deck, resource_type)
def find_by_category(category: str):
"""按类别查找资源"""
found = []
for resource in self._get_all_resources():
if hasattr(resource, 'category') and resource.category == category:
found.append(resource)
return found
def find_by_name_pattern(pattern: str):
"""按名称模式查找资源"""
import re
found = []
for resource in self._get_all_resources():
if re.search(pattern, resource.name):
found.append(resource)
return found
# 动态添加方法到resource_tracker
self.resource_tracker.find_by_type = find_by_type
self.resource_tracker.find_by_category = find_by_category
self.resource_tracker.find_by_name_pattern = find_by_name_pattern
def _find_resources_by_type_recursive(self, resource, target_type):
"""递归查找指定类型的资源"""
found = []
if isinstance(resource, target_type):
found.append(resource)
# 递归查找子资源
children = getattr(resource, "children", [])
for child in children:
found.extend(self._find_resources_by_type_recursive(child, target_type))
return found
def _get_all_resources(self) -> List[PLRResource]:
"""获取所有资源"""
all_resources = []
def collect_resources(resource):
all_resources.append(resource)
children = getattr(resource, "children", [])
for child in children:
collect_resources(child)
collect_resources(self.plr_deck)
return all_resources
def _initialize_materials(self):
"""初始化物料"""
try:
# 确定创建顺序,确保父资源先于子资源创建
creation_order = self._determine_creation_order()
# 按顺序创建资源
for resource_id in creation_order:
config = self.children_config[resource_id]
self._create_plr_resource(resource_id, config)
logger.info(f"物料管理系统初始化完成,共创建 {len(self.plr_resources)} 个资源")
except Exception as e:
logger.error(f"物料初始化失败: {e}")
def _determine_creation_order(self) -> List[str]:
"""确定资源创建顺序"""
order = []
visited = set()
def visit(resource_id: str):
if resource_id in visited:
return
visited.add(resource_id)
config = self.children_config.get(resource_id, {})
parent_id = config.get("parent")
# 如果有父资源,先访问父资源
if parent_id and parent_id in self.children_config:
visit(parent_id)
order.append(resource_id)
for resource_id in self.children_config:
visit(resource_id)
return order
def _create_plr_resource(self, resource_id: str, config: Dict[str, Any]):
"""创建PyLabRobot资源"""
try:
resource_type = config.get("type", "unknown")
data = config.get("data", {})
location_config = config.get("location", {})
# 创建位置坐标
location = PLRCoordinate(
x=location_config.get("x", 0.0),
y=location_config.get("y", 0.0),
z=location_config.get("z", 0.0)
)
# 根据类型创建资源
resource = self._create_resource_by_type(resource_id, resource_type, config, data, location)
if resource:
# 设置父子关系
parent_id = config.get("parent")
if parent_id and parent_id in self.plr_resources:
parent_resource = self.plr_resources[parent_id]
parent_resource.assign_child_resource(resource, location)
else:
# 直接放在deck上
self.plr_deck.assign_child_resource(resource, location)
# 保存资源引用
self.plr_resources[resource_id] = resource
# 注册到resource tracker
self.resource_tracker.add_resource(resource)
logger.debug(f"创建资源成功: {resource_id} ({resource_type})")
except Exception as e:
logger.error(f"创建资源失败 {resource_id}: {e}")
@abstractmethod
def _create_resource_by_type(
self,
resource_id: str,
resource_type: str,
config: Dict[str, Any],
data: Dict[str, Any],
location: PLRCoordinate
) -> Optional[PLRResource]:
"""根据类型创建资源 - 子类必须实现"""
pass
# ============ 物料查找接口 ============
def find_materials_by_type(self, material_type: str) -> List[PLRResource]:
"""按材料类型查找物料"""
return self.resource_tracker.find_by_category(material_type)
def find_material_by_id(self, resource_id: str) -> Optional[PLRResource]:
"""按ID查找物料"""
return self.plr_resources.get(resource_id)
def find_available_positions(self, position_type: str) -> List[PLRResource]:
"""查找可用位置"""
positions = self.resource_tracker.find_by_category(position_type)
available = []
for pos in positions:
if hasattr(pos, 'is_available') and pos.is_available():
available.append(pos)
elif hasattr(pos, 'children') and len(pos.children) == 0:
available.append(pos)
return available
def get_material_inventory(self) -> Dict[str, int]:
"""获取物料库存统计"""
inventory = {}
for resource in self._get_all_resources():
if hasattr(resource, 'category'):
category = resource.category
inventory[category] = inventory.get(category, 0) + 1
return inventory
# ============ 物料状态更新接口 ============
def update_material_location(self, material_id: str, new_location: PLRCoordinate) -> bool:
"""更新物料位置"""
try:
material = self.find_material_by_id(material_id)
if material:
material.location = new_location
return True
return False
except Exception as e:
logger.error(f"更新物料位置失败: {e}")
return False
def move_material(self, material_id: str, target_container_id: str) -> bool:
"""移动物料到目标容器"""
try:
material = self.find_material_by_id(material_id)
target = self.find_material_by_id(target_container_id)
if material and target:
# 从原位置移除
if material.parent:
material.parent.unassign_child_resource(material)
# 添加到新位置
target.assign_child_resource(material)
return True
return False
except Exception as e:
logger.error(f"移动物料失败: {e}")
return False
# ============ 资源转换接口 ============
def convert_to_unilab_format(self, plr_resource: PLRResource) -> Dict[str, Any]:
"""将PyLabRobot资源转换为UniLab格式"""
return resource_plr_to_ulab(plr_resource)
def convert_from_unilab_format(self, unilab_resource: Dict[str, Any]) -> PLRResource:
"""将UniLab格式转换为PyLabRobot资源"""
return resource_ulab_to_plr(unilab_resource)
def get_deck_state(self) -> Dict[str, Any]:
"""获取Deck状态"""
try:
return {
"deck_info": {
"name": self.plr_deck.name,
"size": {
"x": self.plr_deck.size_x,
"y": self.plr_deck.size_y,
"z": self.plr_deck.size_z
},
"children_count": len(self.plr_deck.children)
},
"resources": {
resource_id: self.convert_to_unilab_format(resource)
for resource_id, resource in self.plr_resources.items()
},
"inventory": self.get_material_inventory()
}
except Exception as e:
logger.error(f"获取Deck状态失败: {e}")
return {"error": str(e)}
# ============ 数据持久化接口 ============
def save_state_to_file(self, file_path: str) -> bool:
"""保存状态到文件"""
try:
state = self.get_deck_state()
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2, ensure_ascii=False)
logger.info(f"状态已保存到: {file_path}")
return True
except Exception as e:
logger.error(f"保存状态失败: {e}")
return False
def load_state_from_file(self, file_path: str) -> bool:
"""从文件加载状态"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
state = json.load(f)
# 重新创建资源
self._recreate_resources_from_state(state)
logger.info(f"状态已从文件加载: {file_path}")
return True
except Exception as e:
logger.error(f"加载状态失败: {e}")
return False
def _recreate_resources_from_state(self, state: Dict[str, Any]):
"""从状态重新创建资源"""
# 清除现有资源
self.plr_resources.clear()
self.plr_deck.children.clear()
# 从状态重新创建
resources_data = state.get("resources", {})
for resource_id, resource_data in resources_data.items():
try:
plr_resource = self.convert_from_unilab_format(resource_data)
self.plr_resources[resource_id] = plr_resource
self.plr_deck.assign_child_resource(plr_resource)
except Exception as e:
logger.error(f"重新创建资源失败 {resource_id}: {e}")
class CoinCellMaterialManagement(MaterialManagementBase):
"""纽扣电池物料管理类
从 button_battery_station 抽取的物料管理功能
"""
def _create_resource_by_type(
self,
resource_id: str,
resource_type: str,
config: Dict[str, Any],
data: Dict[str, Any],
location: PLRCoordinate
) -> Optional[PLRResource]:
"""根据类型创建纽扣电池相关资源"""
# 导入纽扣电池资源类
from unilabos.device_comms.button_battery_station import (
MaterialPlate, PlateSlot, ClipMagazine, BatteryPressSlot,
TipBox64, WasteTipBox, BottleRack, Battery, ElectrodeSheet
)
try:
if resource_type == "material_plate":
return self._create_material_plate(resource_id, config, data, location)
elif resource_type == "plate_slot":
return self._create_plate_slot(resource_id, config, data, location)
elif resource_type == "clip_magazine":
return self._create_clip_magazine(resource_id, config, data, location)
elif resource_type == "battery_press_slot":
return self._create_battery_press_slot(resource_id, config, data, location)
elif resource_type == "tip_box":
return self._create_tip_box(resource_id, config, data, location)
elif resource_type == "waste_tip_box":
return self._create_waste_tip_box(resource_id, config, data, location)
elif resource_type == "bottle_rack":
return self._create_bottle_rack(resource_id, config, data, location)
elif resource_type == "battery":
return self._create_battery(resource_id, config, data, location)
else:
logger.warning(f"未知的资源类型: {resource_type}")
return None
except Exception as e:
logger.error(f"创建资源失败 {resource_id} ({resource_type}): {e}")
return None
def _create_material_plate(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate):
"""创建料板"""
from unilabos.device_comms.button_battery_station import MaterialPlate, ElectrodeSheet
plate = MaterialPlate(
name=resource_id,
size_x=config.get("size_x", 80.0),
size_y=config.get("size_y", 80.0),
size_z=config.get("size_z", 10.0),
hole_diameter=config.get("hole_diameter", 15.0),
hole_depth=config.get("hole_depth", 8.0),
hole_spacing_x=config.get("hole_spacing_x", 20.0),
hole_spacing_y=config.get("hole_spacing_y", 20.0),
number=data.get("number", "")
)
plate.location = location
# 如果有预填充的极片数据,创建极片
electrode_sheets = data.get("electrode_sheets", [])
for i, sheet_data in enumerate(electrode_sheets):
if i < len(plate.children): # 确保不超过洞位数量
hole = plate.children[i]
sheet = ElectrodeSheet(
name=f"{resource_id}_sheet_{i}",
diameter=sheet_data.get("diameter", 14.0),
thickness=sheet_data.get("thickness", 0.1),
mass=sheet_data.get("mass", 0.01),
material_type=sheet_data.get("material_type", "cathode"),
info=sheet_data.get("info", "")
)
hole.place_electrode_sheet(sheet)
return plate
def _create_plate_slot(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate):
"""创建板槽位"""
from unilabos.device_comms.button_battery_station import PlateSlot
slot = PlateSlot(
name=resource_id,
max_plates=config.get("max_plates", 8)
)
slot.location = location
return slot
def _create_clip_magazine(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate):
"""创建子弹夹"""
from unilabos.device_comms.button_battery_station import ClipMagazine
magazine = ClipMagazine(
name=resource_id,
size_x=config.get("size_x", 150.0),
size_y=config.get("size_y", 100.0),
size_z=config.get("size_z", 50.0),
hole_diameter=config.get("hole_diameter", 15.0),
hole_depth=config.get("hole_depth", 40.0),
hole_spacing=config.get("hole_spacing", 25.0),
max_sheets_per_hole=config.get("max_sheets_per_hole", 100)
)
magazine.location = location
return magazine
def _create_battery_press_slot(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate):
"""创建电池压制槽"""
from unilabos.device_comms.button_battery_station import BatteryPressSlot
slot = BatteryPressSlot(
name=resource_id,
diameter=config.get("diameter", 20.0),
depth=config.get("depth", 15.0)
)
slot.location = location
return slot
def _create_tip_box(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate):
"""创建枪头盒"""
from unilabos.device_comms.button_battery_station import TipBox64
tip_box = TipBox64(
name=resource_id,
size_x=config.get("size_x", 127.8),
size_y=config.get("size_y", 85.5),
size_z=config.get("size_z", 60.0),
with_tips=data.get("with_tips", True)
)
tip_box.location = location
return tip_box
def _create_waste_tip_box(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate):
"""创建废枪头盒"""
from unilabos.device_comms.button_battery_station import WasteTipBox
waste_box = WasteTipBox(
name=resource_id,
size_x=config.get("size_x", 127.8),
size_y=config.get("size_y", 85.5),
size_z=config.get("size_z", 60.0),
max_tips=config.get("max_tips", 100)
)
waste_box.location = location
return waste_box
def _create_bottle_rack(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate):
"""创建瓶架"""
from unilabos.device_comms.button_battery_station import BottleRack
rack = BottleRack(
name=resource_id,
size_x=config.get("size_x", 210.0),
size_y=config.get("size_y", 140.0),
size_z=config.get("size_z", 100.0),
bottle_diameter=config.get("bottle_diameter", 30.0),
bottle_height=config.get("bottle_height", 100.0),
position_spacing=config.get("position_spacing", 35.0)
)
rack.location = location
return rack
def _create_battery(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate):
"""创建电池"""
from unilabos.device_comms.button_battery_station import Battery
battery = Battery(
name=resource_id,
diameter=config.get("diameter", 20.0),
height=config.get("height", 3.2),
max_volume=config.get("max_volume", 100.0),
barcode=data.get("barcode", "")
)
battery.location = location
return battery
# ============ 纽扣电池特定查找方法 ============
def find_material_plates(self):
"""查找所有料板"""
from unilabos.device_comms.button_battery_station import MaterialPlate
return self.resource_tracker.find_by_type(MaterialPlate)
def find_batteries(self):
"""查找所有电池"""
from unilabos.device_comms.button_battery_station import Battery
return self.resource_tracker.find_by_type(Battery)
def find_electrode_sheets(self):
"""查找所有极片"""
found = []
plates = self.find_material_plates()
for plate in plates:
for hole in plate.children:
if hasattr(hole, 'has_electrode_sheet') and hole.has_electrode_sheet():
found.append(hole._electrode_sheet)
return found
def find_plate_slots(self):
"""查找所有板槽位"""
from unilabos.device_comms.button_battery_station import PlateSlot
return self.resource_tracker.find_by_type(PlateSlot)
def find_clip_magazines(self):
"""查找所有子弹夹"""
from unilabos.device_comms.button_battery_station import ClipMagazine
return self.resource_tracker.find_by_type(ClipMagazine)
def find_press_slots(self):
"""查找所有压制槽"""
from unilabos.device_comms.button_battery_station import BatteryPressSlot
return self.resource_tracker.find_by_type(BatteryPressSlot)