mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-18 05:21:19 +00:00
* Cleanup registry to be easy-understanding (#76) * delete deprecated mock devices * rename categories * combine chromatographic devices * rename rviz simulation nodes * organic virtual devices * parse vessel_id * run registry completion before merge --------- Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com> * fix: workstation handlers and vessel_id parsing * fix: working dir error when input config path feat: report publish topic when error * modify default discovery_interval to 15s * feat: add trace log level * feat: 添加ChinWe设备控制类,支持串口通信和电机控制功能 (#79) * fix: drop_tips not using auto resource select * fix: discard_tips error * fix: discard_tips * fix: prcxi_res * add: prcxi res fix: startup slow * feat: workstation example * fix pumps and liquid_handler handle * feat: 优化protocol node节点运行日志 * fix all protocol_compilers and remove deprecated devices * feat: 新增use_remote_resource参数 * fix and remove redundant info * bugfixes on organic protocols * fix filter protocol * fix protocol node * 临时兼容错误的driver写法 * fix: prcxi import error * use call_async in all service to avoid deadlock * fix: figure_resource * Update recipe.yaml * add workstation template and battery example * feat: add sk & ak * update workstation base * Create workstation_architecture.md * refactor: workstation_base 重构为仅含业务逻辑,通信和子设备管理交给 ProtocolNode * refactor: ProtocolNode→WorkstationNode * Add:msgs.action (#83) * update: Workstation dev 将版本号从 0.10.3 更新为 0.10.4 (#84) * Add:msgs.action * update: 将版本号从 0.10.3 更新为 0.10.4 * simplify resource system * uncompleted refactor * example for use WorkstationBase * feat: websocket * feat: websocket test * feat: workstation example * feat: action status * fix: station自己的方法注册错误 * fix: 还原protocol node处理方法 * fix: build * fix: missing job_id key * ws test version 1 * ws test version 2 * ws protocol * 增加物料关系上传日志 * 增加物料关系上传日志 * 修正物料关系上传 * 修复工站的tracker实例追踪失效问题 * 增加handle检测,增加material edge关系上传 * 修复event loop错误 * 修复edge上报错误 * 修复async错误 * 更新schema的title字段 * 主机节点信息等支持自动刷新 * 注册表编辑器 * 修复status密集发送时,消息出错 * 增加addr参数 * fix: addr param * fix: addr param * 取消labid 和 强制config输入 * Add action definitions for LiquidHandlerSetGroup and LiquidHandlerTransferGroup - Created LiquidHandlerSetGroup.action with fields for group name, wells, and volumes. - Created LiquidHandlerTransferGroup.action with fields for source and target group names and unit volume. - Both actions include response fields for return information and success status. * Add LiquidHandlerSetGroup and LiquidHandlerTransferGroup actions to CMakeLists * Add set_group and transfer_group methods to PRCXI9300Handler and update liquid_handler.yaml * result_info改为字典类型 * 新增uat的地址替换 * runze multiple pump support (cherry picked from commit49354fcf39) * remove runze multiple software obtainer (cherry picked from commit8bcc92a394) * support multiple backbone (cherry picked from commit4771ff2347) * Update runze pump format * Correct runze multiple backbone * Update runze_multiple_backbone * Correct runze pump multiple receive method. * Correct runze pump multiple receive method. * 对于PRCXI9320的transfer_group,一对多和多对多 * 移除MQTT,更新launch文档,提供注册表示例文件,更新到0.10.5 * fix import error * fix dupe upload registry * refactor ws client * add server timeout * Fix: run-column with correct vessel id (#86) * fix run_column * Update run_column_protocol.py (cherry picked from commite5aa4d940a) * resource_update use resource_add * 新增版位推荐功能 * 重新规定了版位推荐的入参 * update registry with nested obj * fix protocol node log_message, added create_resource return value * fix protocol node log_message, added create_resource return value * try fix add protocol * fix resource_add * 修复移液站错误的aspirate注册表 * Feature/xprbalance-zhida (#80) * feat(devices): add Zhida GC/MS pretreatment automation workstation * feat(devices): add mettler_toledo xpr balance * balance * 重新补全zhida注册表 * PRCXI9320 json * PRCXI9320 json * PRCXI9320 json * fix resource download * remove class for resource * bump version to 0.10.6 * 更新所有注册表 * 修复protocolnode的兼容性 * 修复protocolnode的兼容性 * Update install md * Add Defaultlayout * 更新物料接口 * fix dict to tree/nested-dict converter * coin_cell_station draft * refactor: rename "station_resource" to "deck" * add standardized BIOYOND resources: bottle_carrier, bottle * refactor and add BIOYOND resources tests * add BIOYOND deck assignment and pass all tests * fix: update resource with correct structure; remove deprecated liquid_handler set_group action * feat: 将新威电池测试系统驱动与配置文件并入 workstation_dev_YB2 (#92) * feat: 新威电池测试系统驱动与注册文件 * feat: bring neware driver & battery.json into workstation_dev_YB2 * add bioyond studio draft * bioyond station with communication init and resource sync * fix bioyond station and registry * fix: update resource with correct structure; remove deprecated liquid_handler set_group action * frontend_docs * create/update resources with POST/PUT for big amount/ small amount data * create/update resources with POST/PUT for big amount/ small amount data * refactor: add itemized_carrier instead of carrier consists of ResourceHolder * create warehouse by factory func * update bioyond launch json * add child_size for itemized_carrier * fix bioyond resource io * Workstation templates: Resources and its CRUD, and workstation tasks (#95) * coin_cell_station draft * refactor: rename "station_resource" to "deck" * add standardized BIOYOND resources: bottle_carrier, bottle * refactor and add BIOYOND resources tests * add BIOYOND deck assignment and pass all tests * fix: update resource with correct structure; remove deprecated liquid_handler set_group action * feat: 将新威电池测试系统驱动与配置文件并入 workstation_dev_YB2 (#92) * feat: 新威电池测试系统驱动与注册文件 * feat: bring neware driver & battery.json into workstation_dev_YB2 * add bioyond studio draft * bioyond station with communication init and resource sync * fix bioyond station and registry * create/update resources with POST/PUT for big amount/ small amount data * refactor: add itemized_carrier instead of carrier consists of ResourceHolder * create warehouse by factory func * update bioyond launch json * add child_size for itemized_carrier * fix bioyond resource io --------- Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com> Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com> * 更新物料接口 * Workstation dev yb2 (#100) * Refactor and extend reaction station action messages * Refactor dispensing station tasks to enhance parameter clarity and add batch processing capabilities - Updated `create_90_10_vial_feeding_task` to include detailed parameters for 90%/10% vial feeding, improving clarity and usability. - Introduced `create_batch_90_10_vial_feeding_task` for batch processing of 90%/10% vial feeding tasks with JSON formatted input. - Added `create_batch_diamine_solution_task` for batch preparation of diamine solution, also utilizing JSON formatted input. - Refined `create_diamine_solution_task` to include additional parameters for better task configuration. - Enhanced schema descriptions and default values for improved user guidance. * 修复to_plr_resources * add update remove * 支持选择器注册表自动生成 支持转运物料 * 修复资源添加 * 修复transfer_resource_to_another生成 * 更新transfer_resource_to_another参数,支持spot入参 * 新增test_resource动作 * fix host_node error * fix host_node test_resource error * fix host_node test_resource error * 过滤本地动作 * 移动内部action以兼容host node * 修复同步任务报错不显示的bug * feat: 允许返回非本节点物料,后面可以通过decoration进行区分,就不进行warning了 * update todo * modify bioyond/plr converter, bioyond resource registry, and tests * pass the tests * update todo * add conda-pack-build.yml * add auto install script for conda-pack-build.yml (cherry picked from commit172599adcf) * update conda-pack-build.yml * update conda-pack-build.yml * update conda-pack-build.yml * update conda-pack-build.yml * update conda-pack-build.yml * Add version in __init__.py Update conda-pack-build.yml Add create_zip_archive.py * Update conda-pack-build.yml * Update conda-pack-build.yml (with mamba) * Update conda-pack-build.yml * Fix FileNotFoundError * Try fix 'charmap' codec can't encode characters in position 16-23: character maps to <undefined> * Fix unilabos msgs search error * Fix environment_check.py * Update recipe.yaml * Update registry. Update uuid loop figure method. Update install docs. * Fix nested conda pack * Fix one-key installation path error * Bump version to 0.10.7 * Workshop bj (#99) * Add LaiYu Liquid device integration and tests Introduce LaiYu Liquid device implementation, including backend, controllers, drivers, configuration, and resource files. Add hardware connection, tip pickup, and simplified test scripts, as well as experiment and registry configuration for LaiYu Liquid. Documentation and .gitignore for the device are also included. * feat(LaiYu_Liquid): 重构设备模块结构并添加硬件文档 refactor: 重新组织LaiYu_Liquid模块目录结构 docs: 添加SOPA移液器和步进电机控制指令文档 fix: 修正设备配置中的最大体积默认值 test: 新增工作台配置测试用例 chore: 删除过时的测试脚本和配置文件 * add * 重构: 将 LaiYu_Liquid.py 重命名为 laiyu_liquid_main.py 并更新所有导入引用 - 使用 git mv 将 LaiYu_Liquid.py 重命名为 laiyu_liquid_main.py - 更新所有相关文件中的导入引用 - 保持代码功能不变,仅改善命名一致性 - 测试确认所有导入正常工作 * 修复: 在 core/__init__.py 中添加 LaiYuLiquidBackend 导出 - 添加 LaiYuLiquidBackend 到导入列表 - 添加 LaiYuLiquidBackend 到 __all__ 导出列表 - 确保所有主要类都可以正确导入 * 修复大小写文件夹名字 * 电池装配工站二次开发教程(带目录)上传至dev (#94) * 电池装配工站二次开发教程 * Update intro.md * 物料教程 * 更新物料教程,json格式注释 * Update prcxi driver & fix transfer_liquid mix_times (#90) * Update prcxi driver & fix transfer_liquid mix_times * fix: correct mix_times type * Update liquid_handler registry * test: prcxi.py * Update registry from pr * fix ony-key script not exist * clean files --------- Co-authored-by: Junhan Chang <changjh@dp.tech> Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com> Co-authored-by: Guangxin Zhang <guangxin.zhang.bio@gmail.com> Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com> Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com> Co-authored-by: LccLink <1951855008@qq.com> Co-authored-by: lixinyu1011 <61094742+lixinyu1011@users.noreply.github.com> Co-authored-by: shiyubo0410 <shiyubo@dp.tech>
355 lines
12 KiB
Python
355 lines
12 KiB
Python
"""
|
||
工作站基类
|
||
Workstation Base Class - 简化版
|
||
|
||
基于PLR Deck的简化工作站架构
|
||
专注于核心物料系统和工作流管理
|
||
"""
|
||
|
||
import collections
|
||
import time
|
||
from typing import Dict, Any, List, Optional, Union
|
||
from abc import ABC, abstractmethod
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
from pylabrobot.resources import Deck, Plate, Resource as PLRResource
|
||
|
||
from pylabrobot.resources.coordinate import Coordinate
|
||
from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode
|
||
|
||
from unilabos.utils.log import logger
|
||
|
||
|
||
class WorkflowStatus(Enum):
|
||
"""工作流状态"""
|
||
|
||
IDLE = "idle"
|
||
INITIALIZING = "initializing"
|
||
RUNNING = "running"
|
||
PAUSED = "paused"
|
||
STOPPING = "stopping"
|
||
STOPPED = "stopped"
|
||
ERROR = "error"
|
||
COMPLETED = "completed"
|
||
|
||
|
||
@dataclass
|
||
class WorkflowInfo:
|
||
"""工作流信息"""
|
||
|
||
name: str
|
||
description: str
|
||
estimated_duration: float # 预估持续时间(秒)
|
||
required_materials: List[str] # 所需物料类型
|
||
output_product: str # 输出产品类型
|
||
parameters_schema: Dict[str, Any] # 参数架构
|
||
|
||
|
||
class WorkStationContainer(Plate):
|
||
"""
|
||
WorkStation 专用 Container 类,继承自 Plate和TipRack
|
||
注意这个物料必须通过plr_additional_res_reg.py注册到edge,才能正常序列化
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
name: str,
|
||
size_x: float,
|
||
size_y: float,
|
||
size_z: float,
|
||
category: str,
|
||
ordering: collections.OrderedDict,
|
||
model: Optional[str] = None,
|
||
):
|
||
"""
|
||
这里的初始化入参要和plr的保持一致
|
||
"""
|
||
super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model)
|
||
self._unilabos_state = {} # 必须有此行,自己的类描述的是物料的
|
||
|
||
def load_state(self, state: Dict[str, Any]) -> None:
|
||
"""从给定的状态加载工作台信息。"""
|
||
super().load_state(state)
|
||
self._unilabos_state = state
|
||
|
||
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
|
||
data = super().serialize_state()
|
||
data.update(
|
||
self._unilabos_state
|
||
) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等)
|
||
return data
|
||
|
||
|
||
def get_workstation_plate_resource(name: str) -> PLRResource: # 要给定一个返回plr的方法
|
||
"""
|
||
用于获取一些模板,例如返回一个带有特定信息/子物料的 Plate,这里需要到注册表注册,例如unilabos/registry/resources/organic/workstation.yaml
|
||
可以直接运行该函数或者利用注册表补全机制,来检查是否资源出错
|
||
:param name: 资源名称
|
||
:return: Resource对象
|
||
"""
|
||
plate = WorkStationContainer(
|
||
name, size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict()
|
||
)
|
||
tip_rack = WorkStationContainer(
|
||
"tip_rack_inside_plate",
|
||
size_x=50,
|
||
size_y=50,
|
||
size_z=10,
|
||
category="tip_rack",
|
||
ordering=collections.OrderedDict(),
|
||
)
|
||
plate.assign_child_resource(tip_rack, Coordinate.zero())
|
||
return plate
|
||
|
||
|
||
class ResourceSynchronizer(ABC):
|
||
"""资源同步器基类
|
||
|
||
负责与外部物料系统的同步,并对 self.deck 做修改
|
||
"""
|
||
|
||
def __init__(self, workstation: "WorkstationBase"):
|
||
self.workstation = workstation
|
||
|
||
@abstractmethod
|
||
def sync_from_external(self) -> bool:
|
||
"""从外部系统同步物料到本地deck"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def sync_to_external(self, plr_resource: PLRResource) -> bool:
|
||
"""将本地物料同步到外部系统"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def handle_external_change(self, change_info: Dict[str, Any]) -> bool:
|
||
"""处理外部系统的变更通知"""
|
||
pass
|
||
|
||
|
||
class WorkstationBase(ABC):
|
||
"""工作站基类 - 简化版
|
||
|
||
核心功能:
|
||
1. 基于 PLR Deck 的物料系统,支持格式转换
|
||
2. 可选的资源同步器支持外部物料系统
|
||
3. 简化的工作流管理
|
||
"""
|
||
|
||
_ros_node: ROS2WorkstationNode
|
||
|
||
@property
|
||
def _children(self) -> Dict[str, Any]: # 不要删除这个下划线,不然会自动导入注册表,后面改成装饰器识别
|
||
return self._ros_node.children
|
||
|
||
async def update_resource_example(self):
|
||
return await self._ros_node.update_resource([get_workstation_plate_resource("test")])
|
||
|
||
def __init__(
|
||
self,
|
||
deck: Deck,
|
||
*args,
|
||
**kwargs, # 必须有kwargs
|
||
):
|
||
# PLR 物料系统
|
||
self.deck: Optional[Deck] = deck
|
||
self.plr_resources: Dict[str, PLRResource] = {}
|
||
|
||
self.resource_synchronizer = None # type: Optional[ResourceSynchronizer]
|
||
# 硬件接口
|
||
self.hardware_interface: Union[Any, str] = None
|
||
|
||
# 工作流状态
|
||
self.current_workflow_status = WorkflowStatus.IDLE
|
||
self.current_workflow_info = None
|
||
self.workflow_start_time = None
|
||
self.workflow_parameters = {}
|
||
|
||
# 支持的工作流(静态预定义)
|
||
self.supported_workflows: Dict[str, WorkflowInfo] = {}
|
||
|
||
def post_init(self, ros_node: ROS2WorkstationNode) -> None:
|
||
# 初始化物料系统
|
||
self._ros_node = ros_node
|
||
self._ros_node.update_resource([self.deck])
|
||
|
||
def _build_resource_mappings(self, deck: Deck):
|
||
"""递归构建资源映射"""
|
||
|
||
def add_resource_recursive(resource: PLRResource):
|
||
if hasattr(resource, "name"):
|
||
self.plr_resources[resource.name] = resource
|
||
|
||
if hasattr(resource, "children"):
|
||
for child in resource.children:
|
||
add_resource_recursive(child)
|
||
|
||
add_resource_recursive(deck)
|
||
|
||
# ============ 硬件接口管理 ============
|
||
|
||
def set_hardware_interface(self, hardware_interface: Union[Any, str]):
|
||
"""设置硬件接口"""
|
||
self.hardware_interface = hardware_interface
|
||
logger.info(f"工作站 {self._ros_node.device_id} 硬件接口设置: {type(hardware_interface).__name__}")
|
||
|
||
def set_workstation_node(self, workstation_node: "ROS2WorkstationNode"):
|
||
"""设置协议节点引用(用于代理模式)"""
|
||
self._ros_node = workstation_node
|
||
logger.info(f"工作站 {self._ros_node.device_id} 关联协议节点")
|
||
|
||
# ============ 设备操作接口 ============
|
||
|
||
def call_device_method(self, method: str, *args, **kwargs) -> Any:
|
||
"""调用设备方法的统一接口"""
|
||
# 1. 代理模式:通过协议节点转发
|
||
if isinstance(self.hardware_interface, str) and self.hardware_interface.startswith("proxy:"):
|
||
if not self._ros_node:
|
||
raise RuntimeError("代理模式需要设置workstation_node")
|
||
|
||
device_id = self.hardware_interface[6:] # 移除 "proxy:" 前缀
|
||
return self._ros_node.call_device_method(device_id, method, *args, **kwargs)
|
||
|
||
# 2. 直接模式:直接调用硬件接口方法
|
||
elif self.hardware_interface and hasattr(self.hardware_interface, method):
|
||
return getattr(self.hardware_interface, method)(*args, **kwargs)
|
||
|
||
else:
|
||
raise AttributeError(f"硬件接口不支持方法: {method}")
|
||
|
||
def get_device_status(self) -> Dict[str, Any]:
|
||
"""获取设备状态"""
|
||
try:
|
||
return self.call_device_method("get_status")
|
||
except AttributeError:
|
||
# 如果设备不支持get_status方法,返回基础状态
|
||
return {
|
||
"status": "unknown",
|
||
"interface_type": type(self.hardware_interface).__name__,
|
||
"timestamp": time.time(),
|
||
}
|
||
|
||
def is_device_available(self) -> bool:
|
||
"""检查设备是否可用"""
|
||
try:
|
||
self.get_device_status()
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
# ============ 物料系统接口 ============
|
||
|
||
def get_deck(self) -> Deck:
|
||
"""获取主 Deck"""
|
||
return self.deck
|
||
|
||
def get_all_resources(self) -> Dict[str, PLRResource]:
|
||
"""获取所有 PLR 资源"""
|
||
return self.plr_resources.copy()
|
||
|
||
def find_resource_by_name(self, name: str) -> Optional[PLRResource]:
|
||
"""按名称查找资源"""
|
||
return self.plr_resources.get(name)
|
||
|
||
def find_resources_by_type(self, resource_type: type) -> List[PLRResource]:
|
||
"""按类型查找资源"""
|
||
return [res for res in self.plr_resources.values() if isinstance(res, resource_type)]
|
||
|
||
def sync_with_external_system(self) -> bool:
|
||
"""与外部物料系统同步"""
|
||
if not self.resource_synchronizer:
|
||
logger.info(f"工作站 {self._ros_node.device_id} 没有配置资源同步器")
|
||
return True
|
||
|
||
try:
|
||
success = self.resource_synchronizer.sync_from_external()
|
||
if success:
|
||
logger.info(f"工作站 {self._ros_node.device_id} 外部同步成功")
|
||
else:
|
||
logger.warning(f"工作站 {self._ros_node.device_id} 外部同步失败")
|
||
return success
|
||
except Exception as e:
|
||
logger.error(f"工作站 {self._ros_node.device_id} 外部同步异常: {e}")
|
||
return False
|
||
|
||
# ============ 简化的工作流控制 ============
|
||
|
||
def execute_workflow(self, workflow_name: str, parameters: Dict[str, Any]) -> bool:
|
||
"""执行工作流"""
|
||
try:
|
||
# 设置工作流状态
|
||
self.current_workflow_status = WorkflowStatus.INITIALIZING
|
||
self.workflow_parameters = parameters
|
||
self.workflow_start_time = time.time()
|
||
|
||
# 委托给子类实现
|
||
success = self._execute_workflow_impl(workflow_name, parameters)
|
||
|
||
if success:
|
||
self.current_workflow_status = WorkflowStatus.RUNNING
|
||
logger.info(f"工作站 {self._ros_node.device_id} 工作流 {workflow_name} 启动成功")
|
||
else:
|
||
self.current_workflow_status = WorkflowStatus.ERROR
|
||
logger.error(f"工作站 {self._ros_node.device_id} 工作流 {workflow_name} 启动失败")
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
self.current_workflow_status = WorkflowStatus.ERROR
|
||
logger.error(f"工作站 {self._ros_node.device_id} 执行工作流失败: {e}")
|
||
return False
|
||
|
||
def stop_workflow(self, emergency: bool = False) -> bool:
|
||
"""停止工作流"""
|
||
try:
|
||
if self.current_workflow_status in [WorkflowStatus.IDLE, WorkflowStatus.STOPPED]:
|
||
logger.warning(f"工作站 {self._ros_node.device_id} 没有正在运行的工作流")
|
||
return True
|
||
|
||
self.current_workflow_status = WorkflowStatus.STOPPING
|
||
|
||
# 委托给子类实现
|
||
success = self._stop_workflow_impl(emergency)
|
||
|
||
if success:
|
||
self.current_workflow_status = WorkflowStatus.STOPPED
|
||
logger.info(f"工作站 {self._ros_node.device_id} 工作流停止成功 (紧急: {emergency})")
|
||
else:
|
||
self.current_workflow_status = WorkflowStatus.ERROR
|
||
logger.error(f"工作站 {self._ros_node.device_id} 工作流停止失败")
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
self.current_workflow_status = WorkflowStatus.ERROR
|
||
logger.error(f"工作站 {self._ros_node.device_id} 停止工作流失败: {e}")
|
||
return False
|
||
|
||
# ============ 状态属性 ============
|
||
|
||
@property
|
||
def workflow_status(self) -> WorkflowStatus:
|
||
"""获取当前工作流状态"""
|
||
return self.current_workflow_status
|
||
|
||
@property
|
||
def is_busy(self) -> bool:
|
||
"""检查工作站是否忙碌"""
|
||
return self.current_workflow_status in [
|
||
WorkflowStatus.INITIALIZING,
|
||
WorkflowStatus.RUNNING,
|
||
WorkflowStatus.STOPPING,
|
||
]
|
||
|
||
@property
|
||
def workflow_runtime(self) -> float:
|
||
"""获取工作流运行时间(秒)"""
|
||
if self.workflow_start_time is None:
|
||
return 0.0
|
||
return time.time() - self.workflow_start_time
|
||
|
||
|
||
class ProtocolNode(WorkstationBase):
|
||
def __init__(self, deck: Optional[PLRResource], *args, **kwargs):
|
||
super().__init__(deck, *args, **kwargs)
|