1 Commits

Author SHA1 Message Date
Junhan Chang
71c9a777ba add unilabos/workflow and entrypoint 2025-12-07 15:23:51 +08:00
38 changed files with 248 additions and 1178 deletions

View File

@@ -1,35 +0,0 @@
import sys
from datetime import datetime
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
import pytest
from unilabos.workflow.convert_from_json import (
convert_from_json,
normalize_steps as _normalize_steps,
normalize_labware as _normalize_labware,
)
from unilabos.workflow.common import draw_protocol_graph_with_ports
@pytest.mark.parametrize(
"protocol_name",
[
"example_bio",
# "bioyond_materials_liquidhandling_1",
"example_prcxi",
],
)
def test_build_protocol_graph(protocol_name):
data_path = Path(__file__).with_name(f"{protocol_name}.json")
graph = convert_from_json(data_path, workstation_name="PRCXi")
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_path = data_path.with_name(f"{protocol_name}_graph_{timestamp}.png")
draw_protocol_graph_with_ports(graph, str(output_path))
print(graph)

View File

@@ -20,7 +20,6 @@ if unilabos_dir not in sys.path:
from unilabos.utils.banner_print import print_status, print_unilab_banner from unilabos.utils.banner_print import print_status, print_unilab_banner
from unilabos.config.config import load_config, BasicConfig, HTTPConfig from unilabos.config.config import load_config, BasicConfig, HTTPConfig
def load_config_from_file(config_path): def load_config_from_file(config_path):
if config_path is None: if config_path is None:
config_path = os.environ.get("UNILABOS_BASICCONFIG_CONFIG_PATH", None) config_path = os.environ.get("UNILABOS_BASICCONFIG_CONFIG_PATH", None)
@@ -42,7 +41,7 @@ def convert_argv_dashes_to_underscores(args: argparse.ArgumentParser):
for i, arg in enumerate(sys.argv): for i, arg in enumerate(sys.argv):
for option_string in option_strings: for option_string in option_strings:
if arg.startswith(option_string): if arg.startswith(option_string):
new_arg = arg[:2] + arg[2 : len(option_string)].replace("-", "_") + arg[len(option_string) :] new_arg = arg[:2] + arg[2:len(option_string)].replace("-", "_") + arg[len(option_string):]
sys.argv[i] = new_arg sys.argv[i] = new_arg
break break
@@ -156,54 +155,32 @@ def parse_args():
default=False, default=False,
help="Complete registry information", help="Complete registry information",
) )
# workflow upload subcommand
# label
workflow_parser = subparsers.add_parser( workflow_parser = subparsers.add_parser(
"workflow_upload", "workflow_upload",
aliases=["wf"],
help="Upload workflow from xdl/json/python files", help="Upload workflow from xdl/json/python files",
) )
workflow_parser.add_argument( workflow_parser.add_argument("-t", "--labeltype", default="singlepoint", type=str,
"-f", help="QM calculation type, support 'singlepoint', 'optimize' and 'dimer' currently")
"--workflow_file",
type=str,
required=True,
help="Path to the workflow file (JSON format)",
)
workflow_parser.add_argument(
"-n",
"--workflow_name",
type=str,
default=None,
help="Workflow name, if not provided will use the name from file or filename",
)
workflow_parser.add_argument(
"--tags",
type=str,
nargs="*",
default=[],
help="Tags for the workflow (space-separated)",
)
workflow_parser.add_argument(
"--published",
action="store_true",
default=False,
help="Whether to publish the workflow (default: False)",
)
return parser return parser
def main(): def main():
"""主函数""" """主函数"""
# 解析命令行参数 # 解析命令行参数
parser = parse_args() args = parse_args()
convert_argv_dashes_to_underscores(parser) convert_argv_dashes_to_underscores(args)
args = parser.parse_args() args_dict = vars(args.parse_args())
args_dict = vars(args)
# 显示启动横幅
print_unilab_banner(args_dict)
# 环境检查 - 检查并自动安装必需的包 (可选) # 环境检查 - 检查并自动安装必需的包 (可选)
if not args_dict.get("skip_env_check", False): if not args_dict.get("skip_env_check", False):
from unilabos.utils.environment_check import check_environment from unilabos.utils.environment_check import check_environment
print_status("正在进行环境依赖检查...", "info")
if not check_environment(auto_install=True): if not check_environment(auto_install=True):
print_status("环境检查失败,程序退出", "error") print_status("环境检查失败,程序退出", "error")
os._exit(1) os._exit(1)
@@ -256,18 +233,17 @@ def main():
logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.") logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.")
configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir) configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
if args.addr != parser.get_default("addr"): if args_dict["addr"] == "test":
if args.addr == "test": print_status("使用测试环境地址", "info")
print_status("使用测试环境地址", "info") HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1"
HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1" elif args_dict["addr"] == "uat":
elif args.addr == "uat": print_status("使用uat环境地址", "info")
print_status("使用uat环境地址", "info") HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1"
HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1" elif args_dict["addr"] == "local":
elif args.addr == "local": print_status("使用本地环境地址", "info")
print_status("使用本地环境地址", "info") HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1"
HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1" else:
else: HTTPConfig.remote_addr = args_dict.get("addr", "")
HTTPConfig.remote_addr = args.addr
# 设置BasicConfig参数 # 设置BasicConfig参数
if args_dict.get("ak", ""): if args_dict.get("ak", ""):
@@ -278,10 +254,18 @@ def main():
print_status("传入了sk参数优先采用传入参数", "info") print_status("传入了sk参数优先采用传入参数", "info")
BasicConfig.working_dir = working_dir BasicConfig.working_dir = working_dir
workflow_upload = args_dict.get("command") in ("workflow_upload", "wf") # 显示启动横幅
print_unilab_banner(args_dict)
#####################################
######## 启动设备接入端(主入口) ########
#####################################
launch(args_dict)
def launch(args_dict: Dict[str, Any]):
# 使用远程资源启动 # 使用远程资源启动
if not workflow_upload and args_dict["use_remote_resource"]: if args_dict["use_remote_resource"]:
print_status("使用远程资源启动", "info") print_status("使用远程资源启动", "info")
from unilabos.app.web import http_client from unilabos.app.web import http_client
@@ -317,36 +301,11 @@ def main():
from unilabos.resources.graphio import modify_to_backend_format from unilabos.resources.graphio import modify_to_backend_format
from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDict from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDict
# 显示启动横幅
print_unilab_banner(args_dict)
# 注册表 # 注册表
lab_registry = build_registry( lab_registry = build_registry(
args_dict["registry_path"], args_dict.get("complete_registry", False), BasicConfig.upload_registry args_dict["registry_path"], args_dict.get("complete_registry", False), args_dict["upload_registry"]
) )
if BasicConfig.upload_registry:
# 设备注册到服务端 - 需要 ak 和 sk
if BasicConfig.ak and BasicConfig.sk:
print_status("开始注册设备到服务端...", "info")
try:
register_devices_and_resources(lab_registry)
print_status("设备注册完成", "info")
except Exception as e:
print_status(f"设备注册失败: {e}", "error")
else:
print_status("未提供 ak 和 sk跳过设备注册", "info")
else:
print_status("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning")
# 处理 workflow_upload 子命令
if workflow_upload:
from unilabos.workflow.wf_utils import handle_workflow_upload_command
handle_workflow_upload_command(args_dict)
print_status("工作流上传完成,程序退出", "info")
os._exit(0)
if not BasicConfig.ak or not BasicConfig.sk: if not BasicConfig.ak or not BasicConfig.sk:
print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning") print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning")
os._exit(1) os._exit(1)
@@ -423,6 +382,20 @@ def main():
args_dict["devices_config"] = resource_tree_set args_dict["devices_config"] = resource_tree_set
args_dict["graph"] = graph_res.physical_setup_graph args_dict["graph"] = graph_res.physical_setup_graph
if BasicConfig.upload_registry:
# 设备注册到服务端 - 需要 ak 和 sk
if BasicConfig.ak and BasicConfig.sk:
print_status("开始注册设备到服务端...", "info")
try:
register_devices_and_resources(lab_registry)
print_status("设备注册完成", "info")
except Exception as e:
print_status(f"设备注册失败: {e}", "error")
else:
print_status("未提供 ak 和 sk跳过设备注册", "info")
else:
print_status("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning")
if args_dict["controllers"] is not None: if args_dict["controllers"] is not None:
args_dict["controllers_config"] = yaml.safe_load(open(args_dict["controllers"], encoding="utf-8")) args_dict["controllers_config"] = yaml.safe_load(open(args_dict["controllers"], encoding="utf-8"))
else: else:
@@ -437,7 +410,6 @@ def main():
comm_client = get_communication_client() comm_client = get_communication_client()
if "websocket" in args_dict["app_bridges"]: if "websocket" in args_dict["app_bridges"]:
args_dict["bridges"].append(comm_client) args_dict["bridges"].append(comm_client)
def _exit(signum, frame): def _exit(signum, frame):
comm_client.stop() comm_client.stop()
sys.exit(0) sys.exit(0)
@@ -479,13 +451,16 @@ def main():
resource_visualization.start() resource_visualization.start()
except OSError as e: except OSError as e:
if "AMENT_PREFIX_PATH" in str(e): if "AMENT_PREFIX_PATH" in str(e):
print_status(f"ROS 2环境未正确设置跳过3D可视化启动。错误详情: {e}", "warning") print_status(
f"ROS 2环境未正确设置跳过3D可视化启动。错误详情: {e}",
"warning"
)
print_status( print_status(
"建议解决方案:\n" "建议解决方案:\n"
"1. 激活Conda环境: conda activate unilab\n" "1. 激活Conda环境: conda activate unilab\n"
"2. 或使用 --backend simple 参数\n" "2. 或使用 --backend simple 参数\n"
"3. 或使用 --visual disable 参数禁用可视化", "3. 或使用 --visual disable 参数禁用可视化",
"info", "info"
) )
else: else:
raise raise

View File

@@ -76,8 +76,7 @@ class HTTPClient:
Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid} Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid}
""" """
with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "w", encoding="utf-8") as f: with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "w", encoding="utf-8") as f:
payload = {"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid} f.write(json.dumps({"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, indent=4))
f.write(json.dumps(payload, indent=4))
# 从序列化数据中提取所有节点的UUID保存旧UUID # 从序列化数据中提取所有节点的UUID保存旧UUID
old_uuids = {n.res_content.uuid: n for n in resources.all_nodes} old_uuids = {n.res_content.uuid: n for n in resources.all_nodes}
if not self.initialized or first_add: if not self.initialized or first_add:
@@ -332,67 +331,6 @@ class HTTPClient:
logger.error(f"响应内容: {response.text}") logger.error(f"响应内容: {response.text}")
return None return None
def workflow_import(
self,
name: str,
workflow_uuid: str,
workflow_name: str,
nodes: List[Dict[str, Any]],
edges: List[Dict[str, Any]],
tags: Optional[List[str]] = None,
published: bool = False,
) -> Dict[str, Any]:
"""
导入工作流到服务器
Args:
name: 工作流名称(顶层)
workflow_uuid: 工作流UUID
workflow_name: 工作流名称data内部
nodes: 工作流节点列表
edges: 工作流边列表
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
Returns:
Dict: API响应数据包含 code 和 data (uuid, name)
"""
# target_lab_uuid 暂时使用默认值,后续由后端根据 ak/sk 获取
payload = {
"target_lab_uuid": "28c38bb0-63f6-4352-b0d8-b5b8eb1766d5",
"name": name,
"data": {
"workflow_uuid": workflow_uuid,
"workflow_name": workflow_name,
"nodes": nodes,
"edges": edges,
"tags": tags if tags is not None else [],
"published": published,
},
}
# 保存请求到文件
with open(os.path.join(BasicConfig.working_dir, "req_workflow_upload.json"), "w", encoding="utf-8") as f:
f.write(json.dumps(payload, indent=4, ensure_ascii=False))
response = requests.post(
f"{self.remote_addr}/lab/workflow/owner/import",
json=payload,
headers={"Authorization": f"Lab {self.auth}"},
timeout=60,
)
# 保存响应到文件
with open(os.path.join(BasicConfig.working_dir, "res_workflow_upload.json"), "w", encoding="utf-8") as f:
f.write(f"{response.status_code}" + "\n" + response.text)
if response.status_code == 200:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"导入工作流失败: {response.text}")
return res
else:
logger.error(f"导入工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
# 创建默认客户端实例 # 创建默认客户端实例
http_client = HTTPClient() http_client = HTTPClient()

View File

@@ -438,7 +438,7 @@ class MessageProcessor:
self.connected = True self.connected = True
self.reconnect_count = 0 self.reconnect_count = 0
logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}") logger.info(f"[MessageProcessor] Connected to {self.websocket_url}")
# 启动发送协程 # 启动发送协程
send_task = asyncio.create_task(self._send_handler()) send_task = asyncio.create_task(self._send_handler())
@@ -503,7 +503,7 @@ class MessageProcessor:
async def _send_handler(self): async def _send_handler(self):
"""处理发送队列中的消息""" """处理发送队列中的消息"""
logger.trace("[MessageProcessor] Send handler started") logger.debug("[MessageProcessor] Send handler started")
try: try:
while self.connected and self.websocket: while self.connected and self.websocket:
@@ -965,7 +965,7 @@ class QueueProcessor:
def _run(self): def _run(self):
"""运行队列处理主循环""" """运行队列处理主循环"""
logger.trace("[QueueProcessor] Queue processor started") logger.debug("[QueueProcessor] Queue processor started")
while self.is_running: while self.is_running:
try: try:
@@ -1175,6 +1175,7 @@ class WebSocketClient(BaseCommunicationClient):
else: else:
url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule" url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule"
logger.debug(f"[WebSocketClient] URL: {url}")
return url return url
def start(self) -> None: def start(self) -> None:
@@ -1187,11 +1188,13 @@ class WebSocketClient(BaseCommunicationClient):
logger.error("[WebSocketClient] WebSocket URL not configured") logger.error("[WebSocketClient] WebSocket URL not configured")
return return
logger.info(f"[WebSocketClient] Starting connection to {self.websocket_url}")
# 启动两个核心线程 # 启动两个核心线程
self.message_processor.start() self.message_processor.start()
self.queue_processor.start() self.queue_processor.start()
logger.trace("[WebSocketClient] All threads started") logger.info("[WebSocketClient] All threads started")
def stop(self) -> None: def stop(self) -> None:
"""停止WebSocket客户端""" """停止WebSocket客户端"""

View File

@@ -21,8 +21,7 @@ class BasicConfig:
startup_json_path = None # 填写绝对路径 startup_json_path = None # 填写绝对路径
disable_browser = False # 禁止浏览器自动打开 disable_browser = False # 禁止浏览器自动打开
port = 8002 # 本地HTTP服务 port = 8002 # 本地HTTP服务
# 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' log_level: Literal['TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = "DEBUG" # 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG"
@classmethod @classmethod
def auth_secret(cls): def auth_secret(cls):
@@ -42,7 +41,7 @@ class WSConfig:
# HTTP配置 # HTTP配置
class HTTPConfig: class HTTPConfig:
remote_addr = "https://uni-lab.bohrium.com/api/v1" remote_addr = "http://127.0.0.1:48197/api/v1"
# ROS配置 # ROS配置
@@ -66,14 +65,13 @@ def _update_config_from_module(module):
if not attr.startswith("_"): if not attr.startswith("_"):
setattr(obj, attr, getattr(getattr(module, name), attr)) setattr(obj, attr, getattr(getattr(module, name), attr))
def _update_config_from_env(): def _update_config_from_env():
prefix = "UNILABOS_" prefix = "UNILABOS_"
for env_key, env_value in os.environ.items(): for env_key, env_value in os.environ.items():
if not env_key.startswith(prefix): if not env_key.startswith(prefix):
continue continue
try: try:
key_path = env_key[len(prefix) :] # Remove UNILAB_ prefix key_path = env_key[len(prefix):] # Remove UNILAB_ prefix
class_field = key_path.upper().split("_", 1) class_field = key_path.upper().split("_", 1)
if len(class_field) != 2: if len(class_field) != 2:
logger.warning(f"[ENV] 环境变量格式不正确:{env_key}") logger.warning(f"[ENV] 环境变量格式不正确:{env_key}")

View File

@@ -1,307 +0,0 @@
"""
LaiYu_Liquid 液体处理工作站集成模块
该模块提供了 LaiYu_Liquid 工作站与 UniLabOS 的完整集成,包括:
- 硬件后端和抽象接口
- 资源定义和管理
- 协议执行和液体传输
- 工作台配置和布局
主要组件:
- LaiYuLiquidBackend: 硬件后端实现
- LaiYuLiquid: 液体处理器抽象接口
- 各种资源类:枪头架、板、容器等
- 便捷创建函数和配置管理
使用示例:
from unilabos.devices.laiyu_liquid import (
LaiYuLiquid,
LaiYuLiquidBackend,
create_standard_deck,
create_tip_rack_1000ul
)
# 创建后端和液体处理器
backend = LaiYuLiquidBackend()
lh = LaiYuLiquid(backend=backend)
# 创建工作台
deck = create_standard_deck()
lh.deck = deck
# 设置和运行
await lh.setup()
"""
# 版本信息
__version__ = "1.0.0"
__author__ = "LaiYu_Liquid Integration Team"
__description__ = "LaiYu_Liquid 液体处理工作站 UniLabOS 集成模块"
# 驱动程序导入
from .drivers import (
XYZStepperController,
SOPAPipette,
MotorAxis,
MotorStatus,
SOPAConfig,
SOPAStatusCode,
StepperMotorDriver
)
# 控制器导入
from .controllers import (
XYZController,
PipetteController,
)
# 后端导入
from .backend.rviz_backend import (
LiquidHandlerRvizBackend,
)
# 资源类和创建函数导入
from .core.laiyu_liquid_res import (
LaiYuLiquidDeck,
LaiYuLiquidContainer,
LaiYuLiquidTipRack
)
# 主设备类和配置
from .core.laiyu_liquid_main import (
LaiYuLiquid,
LaiYuLiquidConfig,
LaiYuLiquidDeck,
LaiYuLiquidContainer,
LaiYuLiquidTipRack,
create_quick_setup
)
# 后端创建函数导入
from .backend import (
LaiYuLiquidBackend,
create_laiyu_backend,
)
# 导出所有公共接口
__all__ = [
# 版本信息
"__version__",
"__author__",
"__description__",
# 驱动程序
"SOPAPipette",
"SOPAConfig",
"StepperMotorDriver",
"XYZStepperController",
# 控制器
"PipetteController",
"XYZController",
# 后端
"LiquidHandlerRvizBackend",
# 资源创建函数
"create_tip_rack_1000ul",
"create_tip_rack_200ul",
"create_96_well_plate",
"create_deep_well_plate",
"create_8_tube_rack",
"create_standard_deck",
"create_waste_container",
"create_wash_container",
"create_reagent_container",
"load_deck_config",
# 后端创建函数
"create_laiyu_backend",
# 主要类
"LaiYuLiquid",
"LaiYuLiquidConfig",
"LaiYuLiquidBackend",
"LaiYuLiquidDeck",
# 工具函数
"get_version",
"get_supported_resources",
"create_quick_setup",
"validate_installation",
"print_module_info",
"setup_logging",
]
# 别名定义,为了向后兼容
LaiYuLiquidDevice = LaiYuLiquid # 主设备类别名
LaiYuLiquidController = XYZController # 控制器别名
LaiYuLiquidDriver = XYZStepperController # 驱动器别名
# 模块级别的便捷函数
def get_version() -> str:
"""
获取模块版本
Returns:
str: 版本号
"""
return __version__
def get_supported_resources() -> dict:
"""
获取支持的资源类型
Returns:
dict: 支持的资源类型字典
"""
return {
"tip_racks": {
"LaiYuLiquidTipRack": LaiYuLiquidTipRack,
},
"containers": {
"LaiYuLiquidContainer": LaiYuLiquidContainer,
},
"decks": {
"LaiYuLiquidDeck": LaiYuLiquidDeck,
},
"devices": {
"LaiYuLiquid": LaiYuLiquid,
}
}
def create_quick_setup() -> tuple:
"""
快速创建基本设置
Returns:
tuple: (backend, controllers, resources) 的元组
"""
# 创建后端
backend = LiquidHandlerRvizBackend()
# 创建控制器(使用默认端口进行演示)
pipette_controller = PipetteController(port="/dev/ttyUSB0", address=4)
xyz_controller = XYZController(port="/dev/ttyUSB1", auto_connect=False)
# 创建测试资源
tip_rack_1000 = create_tip_rack_1000ul("tip_rack_1000")
tip_rack_200 = create_tip_rack_200ul("tip_rack_200")
well_plate = create_96_well_plate("96_well_plate")
controllers = {
'pipette': pipette_controller,
'xyz': xyz_controller
}
resources = {
'tip_rack_1000': tip_rack_1000,
'tip_rack_200': tip_rack_200,
'well_plate': well_plate
}
return backend, controllers, resources
def validate_installation() -> bool:
"""
验证模块安装是否正确
Returns:
bool: 安装是否正确
"""
try:
# 检查核心类是否可以导入
from .core.laiyu_liquid_main import LaiYuLiquid, LaiYuLiquidConfig
from .backend import LaiYuLiquidBackend
from .controllers import XYZController, PipetteController
from .drivers import XYZStepperController, SOPAPipette
# 尝试创建基本对象
config = LaiYuLiquidConfig()
backend = create_laiyu_backend("validation_test")
print("模块安装验证成功")
return True
except Exception as e:
print(f"模块安装验证失败: {e}")
return False
def print_module_info():
"""打印模块信息"""
print(f"LaiYu_Liquid 集成模块")
print(f"版本: {__version__}")
print(f"作者: {__author__}")
print(f"描述: {__description__}")
print(f"")
print(f"支持的资源类型:")
resources = get_supported_resources()
for category, types in resources.items():
print(f" {category}:")
for type_name, type_class in types.items():
print(f" - {type_name}: {type_class.__name__}")
print(f"")
print(f"主要功能:")
print(f" - 硬件集成: LaiYuLiquidBackend")
print(f" - 抽象接口: LaiYuLiquid")
print(f" - 资源管理: 各种资源类和创建函数")
print(f" - 协议执行: transfer_liquid 和相关函数")
print(f" - 配置管理: deck.json 和加载函数")
# 模块初始化时的检查
def _check_dependencies():
"""检查依赖项"""
try:
import pylabrobot
import asyncio
import json
import logging
return True
except ImportError as e:
import logging
logging.warning(f"缺少依赖项 {e}")
return False
# 执行依赖检查
_dependencies_ok = _check_dependencies()
if not _dependencies_ok:
import logging
logging.warning("某些依赖项缺失,模块功能可能受限")
# 模块级别的日志配置
import logging
def setup_logging(level: str = "INFO"):
"""
设置模块日志
Args:
level: 日志级别 (DEBUG, INFO, WARNING, ERROR)
"""
logger = logging.getLogger("LaiYu_Liquid")
logger.setLevel(getattr(logging, level.upper()))
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
# 默认日志设置
_logger = setup_logging()

View File

@@ -988,18 +988,6 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
dis_vols = [float(dis_vols)] dis_vols = [float(dis_vols)]
else: else:
dis_vols = [float(v) for v in dis_vols] dis_vols = [float(v) for v in dis_vols]
# 统一混合次数为标量,防止数组/列表与 int 比较时报错
if mix_times is not None and not isinstance(mix_times, (int, float)):
try:
mix_times = mix_times[0] if len(mix_times) > 0 else None
except Exception:
try:
mix_times = next(iter(mix_times))
except Exception:
pass
if mix_times is not None:
mix_times = int(mix_times)
# 识别传输模式 # 识别传输模式
num_sources = len(sources) num_sources = len(sources)

View File

@@ -5,7 +5,6 @@ import json
import os import os
import socket import socket
import time import time
import uuid
from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal
from pylabrobot.liquid_handling import ( from pylabrobot.liquid_handling import (
@@ -857,30 +856,7 @@ class PRCXI9300Api:
def _raw_request(self, payload: str) -> str: def _raw_request(self, payload: str) -> str:
if self.debug: if self.debug:
# 调试/仿真模式下直接返回可解析的模拟 JSON避免后续 json.loads 报错 return " "
try:
req = json.loads(payload)
method = req.get("MethodName")
except Exception:
method = None
data: Any = True
if method in {"AddSolution"}:
data = str(uuid.uuid4())
elif method in {"AddWorkTabletMatrix", "AddWorkTabletMatrix2"}:
data = {"Success": True, "Message": "debug mock"}
elif method in {"GetErrorCode"}:
data = ""
elif method in {"RemoveErrorCodet", "Reset", "Start", "LoadSolution", "Pause", "Resume", "Stop"}:
data = True
elif method in {"GetStepStateList", "GetStepStatus", "GetStepState"}:
data = []
elif method in {"GetLocation"}:
data = {"X": 0, "Y": 0, "Z": 0}
elif method in {"GetResetStatus"}:
data = False
return json.dumps({"Success": True, "Msg": "debug mock", "Data": data})
with contextlib.closing(socket.socket()) as sock: with contextlib.closing(socket.socket()) as sock:
sock.settimeout(self.timeout) sock.settimeout(self.timeout)
sock.connect((self.host, self.port)) sock.connect((self.host, self.port))

View File

@@ -9333,34 +9333,7 @@ liquid_handler.prcxi:
touch_tip: false touch_tip: false
use_channels: use_channels:
- 0 - 0
handles: handles: {}
input:
- data_key: liquid
data_source: handle
data_type: resource
handler_key: sources
label: sources
- data_key: liquid
data_source: executor
data_type: resource
handler_key: targets
label: targets
- data_key: liquid
data_source: executor
data_type: resource
handler_key: tip_rack
label: tip_rack
output:
- data_key: liquid
data_source: handle
data_type: resource
handler_key: sources_out
label: sources
- data_key: liquid
data_source: executor
data_type: resource
handler_key: targets_out
label: targets
placeholder_keys: placeholder_keys:
sources: unilabos_resources sources: unilabos_resources
targets: unilabos_resources targets: unilabos_resources

View File

@@ -222,7 +222,7 @@ class Registry:
abs_path = Path(path).absolute() abs_path = Path(path).absolute()
resource_path = abs_path / "resources" resource_path = abs_path / "resources"
files = list(resource_path.glob("*/*.yaml")) files = list(resource_path.glob("*/*.yaml"))
logger.trace(f"[UniLab Registry] load resources? {resource_path.exists()}, total: {len(files)}") logger.debug(f"[UniLab Registry] resources: {resource_path.exists()}, total: {len(files)}")
current_resource_number = len(self.resource_type_registry) + 1 current_resource_number = len(self.resource_type_registry) + 1
for i, file in enumerate(files): for i, file in enumerate(files):
with open(file, encoding="utf-8", mode="r") as f: with open(file, encoding="utf-8", mode="r") as f:

View File

@@ -42,7 +42,7 @@ def canonicalize_nodes_data(
Returns: Returns:
ResourceTreeSet: 标准化后的资源树集合 ResourceTreeSet: 标准化后的资源树集合
""" """
print_status(f"{len(nodes)} Resources loaded", "info") print_status(f"{len(nodes)} Resources loaded:", "info")
# 第一步基本预处理处理graphml的label字段 # 第一步基本预处理处理graphml的label字段
outer_host_node_id = None outer_host_node_id = None

View File

@@ -66,8 +66,8 @@ class ResourceDict(BaseModel):
klass: str = Field(alias="class", description="Resource class name") klass: str = Field(alias="class", description="Resource class name")
pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
config: Dict[str, Any] = Field(description="Resource configuration") config: Dict[str, Any] = Field(description="Resource configuration")
data: Dict[str, Any] = Field(description="Resource data, eg: container liquid data") data: Dict[str, Any] = Field(description="Resource data")
extra: Dict[str, Any] = Field(description="Extra data, eg: slot index") extra: Dict[str, Any] = Field(description="Extra data")
@field_serializer("parent_uuid") @field_serializer("parent_uuid")
def _serialize_parent(self, parent_uuid: Optional["ResourceDict"]): def _serialize_parent(self, parent_uuid: Optional["ResourceDict"]):

View File

Before

Width:  |  Height:  |  Size: 148 KiB

After

Width:  |  Height:  |  Size: 148 KiB

View File

Before

Width:  |  Height:  |  Size: 140 KiB

After

Width:  |  Height:  |  Size: 140 KiB

View File

Before

Width:  |  Height:  |  Size: 117 KiB

After

Width:  |  Height:  |  Size: 117 KiB

View File

@@ -0,0 +1,94 @@
import json
import sys
from datetime import datetime
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
import pytest
from unilabos.workflow.common import build_protocol_graph, draw_protocol_graph, draw_protocol_graph_with_ports
ROOT_DIR = Path(__file__).resolve().parents[2]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
def _normalize_steps(data):
normalized = []
for step in data:
action = step.get("action") or step.get("operation")
if not action:
continue
raw_params = step.get("parameters") or step.get("action_args") or {}
params = dict(raw_params)
if "source" in raw_params and "sources" not in raw_params:
params["sources"] = raw_params["source"]
if "target" in raw_params and "targets" not in raw_params:
params["targets"] = raw_params["target"]
description = step.get("description") or step.get("purpose")
step_dict = {"action": action, "parameters": params}
if description:
step_dict["description"] = description
normalized.append(step_dict)
return normalized
def _normalize_labware(data):
labware = {}
for item in data:
reagent_name = item.get("reagent_name")
key = reagent_name or item.get("material_name") or item.get("name")
if not key:
continue
key = str(key)
idx = 1
original_key = key
while key in labware:
idx += 1
key = f"{original_key}_{idx}"
labware[key] = {
"slot": item.get("positions") or item.get("slot"),
"labware": item.get("material_name") or item.get("labware"),
"well": item.get("well", []),
"type": item.get("type", "reagent"),
"role": item.get("role", ""),
"name": key,
}
return labware
@pytest.mark.parametrize("protocol_name", [
"example_bio",
# "bioyond_materials_liquidhandling_1",
"example_prcxi",
])
def test_build_protocol_graph(protocol_name):
data_path = Path(__file__).with_name(f"{protocol_name}.json")
with data_path.open("r", encoding="utf-8") as fp:
d = json.load(fp)
if "workflow" in d and "reagent" in d:
protocol_steps = d["workflow"]
labware_info = d["reagent"]
elif "steps_info" in d and "labware_info" in d:
protocol_steps = _normalize_steps(d["steps_info"])
labware_info = _normalize_labware(d["labware_info"])
else:
raise ValueError("Unsupported protocol format")
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name="PRCXi",
)
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_path = data_path.with_name(f"{protocol_name}_graph_{timestamp}.png")
draw_protocol_graph_with_ports(graph, str(output_path))
print(graph)

View File

@@ -10,7 +10,6 @@ Json = Dict[str, Any]
# ---------------- Graph ---------------- # ---------------- Graph ----------------
class WorkflowGraph: class WorkflowGraph:
"""简单的有向图实现:使用 params 单层参数inputs 内含连线;支持 node-link 导出""" """简单的有向图实现:使用 params 单层参数inputs 内含连线;支持 node-link 导出"""
@@ -22,31 +21,20 @@ class WorkflowGraph:
self.nodes[node_id] = attrs self.nodes[node_id] = attrs
def add_edge(self, source: str, target: str, **attrs): def add_edge(self, source: str, target: str, **attrs):
# 将 source_port/target_port 映射为服务端期望的 source_handle_key/target_handle_key
source_handle_key = attrs.pop("source_port", "") or attrs.pop("source_handle_key", "")
target_handle_key = attrs.pop("target_port", "") or attrs.pop("target_handle_key", "")
edge = { edge = {
"source": source, "source": source,
"target": target, "target": target,
"source_node_uuid": source, "source_node_uuid": source,
"target_node_uuid": target, "target_node_uuid": target,
"source_handle_key": source_handle_key,
"source_handle_io": attrs.pop("source_handle_io", "source"), "source_handle_io": attrs.pop("source_handle_io", "source"),
"target_handle_key": target_handle_key,
"target_handle_io": attrs.pop("target_handle_io", "target"), "target_handle_io": attrs.pop("target_handle_io", "target"),
**attrs, **attrs
} }
self.edges.append(edge) self.edges.append(edge)
def _materialize_wiring_into_inputs( def _materialize_wiring_into_inputs(self, obj: Any, inputs: Dict[str, Any],
self, variable_sources: Dict[str, Dict[str, Any]],
obj: Any, target_node_id: str, base_path: List[str]):
inputs: Dict[str, Any],
variable_sources: Dict[str, Dict[str, Any]],
target_node_id: str,
base_path: List[str],
):
has_var = False has_var = False
def walk(node: Any, path: List[str]): def walk(node: Any, path: List[str]):
@@ -60,12 +48,9 @@ class WorkflowGraph:
if src: if src:
key = ".".join(path) # e.g. "params.foo.bar.0" key = ".".join(path) # e.g. "params.foo.bar.0"
inputs[key] = {"node": src["node_id"], "output": src.get("output_name", "result")} inputs[key] = {"node": src["node_id"], "output": src.get("output_name", "result")}
self.add_edge( self.add_edge(str(src["node_id"]), target_node_id,
str(src["node_id"]), source_handle_io=src.get("output_name", "result"),
target_node_id, target_handle_io=key)
source_handle_io=src.get("output_name", "result"),
target_handle_io=key,
)
return placeholder return placeholder
return {k: walk(v, path + [k]) for k, v in node.items()} return {k: walk(v, path + [k]) for k, v in node.items()}
if isinstance(node, list): if isinstance(node, list):
@@ -75,20 +60,18 @@ class WorkflowGraph:
replaced = walk(obj, base_path[:]) replaced = walk(obj, base_path[:])
return replaced, has_var return replaced, has_var
def add_workflow_node( def add_workflow_node(self,
self, node_id: int,
node_id: int, *,
*, device_key: Optional[str] = None, # 实例名,如 "ser"
device_key: Optional[str] = None, # 实例名,如 "ser" resource_name: Optional[str] = None, # registry key原 device_class
resource_name: Optional[str] = None, # registry key原 device_class module: Optional[str] = None,
module: Optional[str] = None, template_name: Optional[str] = None, # 动作/模板名(原 action_key
template_name: Optional[str] = None, # 动作/模板名(原 action_key params: Dict[str, Any],
params: Dict[str, Any], variable_sources: Dict[str, Dict[str, Any]],
variable_sources: Dict[str, Dict[str, Any]], add_ready_if_no_vars: bool = True,
add_ready_if_no_vars: bool = True, prev_node_id: Optional[int] = None,
prev_node_id: Optional[int] = None, **extra_attrs) -> None:
**extra_attrs,
) -> None:
"""添加工作流节点params 单层;自动变量连线与 ready 串联;支持附加属性""" """添加工作流节点params 单层;自动变量连线与 ready 串联;支持附加属性"""
node_id_str = str(node_id) node_id_str = str(node_id)
inputs: Dict[str, Any] = {} inputs: Dict[str, Any] = {}
@@ -104,9 +87,9 @@ class WorkflowGraph:
node_obj = { node_obj = {
"device_key": device_key, "device_key": device_key,
"resource_name": resource_name, # ✅ 新名字 "resource_name": resource_name, # ✅ 新名字
"module": module, "module": module,
"template_name": template_name, # ✅ 新名字 "template_name": template_name, # ✅ 新名字
"params": params, "params": params,
"inputs": inputs, "inputs": inputs,
} }
@@ -117,13 +100,13 @@ class WorkflowGraph:
def to_dict(self) -> List[Dict[str, Any]]: def to_dict(self) -> List[Dict[str, Any]]:
result = [] result = []
for node_id, attrs in self.nodes.items(): for node_id, attrs in self.nodes.items():
node = {"uuid": node_id} node = {"id": node_id}
params = dict(attrs.get("parameters", {}) or {}) params = dict(attrs.get("parameters", {}) or {})
flat = {k: v for k, v in attrs.items() if k != "parameters"} flat = {k: v for k, v in attrs.items() if k != "parameters"}
flat.update(params) flat.update(params)
node.update(flat) node.update(flat)
result.append(node) result.append(node)
return sorted(result, key=lambda n: int(n["uuid"]) if str(n["uuid"]).isdigit() else n["uuid"]) return sorted(result, key=lambda n: int(n["id"]) if str(n["id"]).isdigit() else n["id"])
# node-link 导出(含 edges # node-link 导出(含 edges
def to_node_link_dict(self) -> Dict[str, Any]: def to_node_link_dict(self) -> Dict[str, Any]:
@@ -132,27 +115,12 @@ class WorkflowGraph:
node_attrs = attrs.copy() node_attrs = attrs.copy()
params = node_attrs.pop("parameters", {}) or {} params = node_attrs.pop("parameters", {}) or {}
node_attrs.update(params) node_attrs.update(params)
nodes_list.append({"uuid": node_id, **node_attrs}) nodes_list.append({"id": node_id, **node_attrs})
return { return {"directed": True, "multigraph": False, "graph": {}, "nodes": nodes_list, "edges": self.edges, "links": self.edges}
"directed": True,
"multigraph": False,
"graph": {},
"nodes": nodes_list,
"edges": self.edges,
"links": self.edges,
}
def refactor_data( def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
data: List[Dict[str, Any]], """统一的数据重构函数,根据操作类型自动选择模板"""
action_resource_mapping: Optional[Dict[str, str]] = None,
) -> List[Dict[str, Any]]:
"""统一的数据重构函数,根据操作类型自动选择模板
Args:
data: 原始步骤数据列表
action_resource_mapping: action 到 resource_name 的映射字典,可选
"""
refactored_data = [] refactored_data = []
# 定义操作映射,包含生物实验和有机化学的所有操作 # 定义操作映射,包含生物实验和有机化学的所有操作
@@ -189,67 +157,43 @@ def refactor_data(
times = step.get("times", step.get("parameters", {}).get("times", 1)) times = step.get("times", step.get("parameters", {}).get("times", 1))
sub_steps = step.get("steps", step.get("parameters", {}).get("steps", [])) sub_steps = step.get("steps", step.get("parameters", {}).get("steps", []))
for i in range(int(times)): for i in range(int(times)):
sub_data = refactor_data(sub_steps, action_resource_mapping) sub_data = refactor_data(sub_steps)
refactored_data.extend(sub_data) refactored_data.extend(sub_data)
continue continue
# 获取模板名称 # 获取模板名称
template_name = OPERATION_MAPPING.get(operation) template = OPERATION_MAPPING.get(operation)
if not template_name: if not template:
# 自动推断模板类型 # 自动推断模板类型
if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]: if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]:
template_name = f"biomek-{operation}" template = f"biomek-{operation}"
else: else:
template_name = f"{operation}Protocol" template = f"{operation}Protocol"
# 获取 resource_name
resource_name = f"device.{operation.lower()}"
if action_resource_mapping:
resource_name = action_resource_mapping.get(operation, resource_name)
# 获取步骤编号,生成 name 字段
step_number = step.get("step_number")
name = f"Step {step_number}" if step_number is not None else None
# 创建步骤数据 # 创建步骤数据
step_data = { step_data = {
"template_name": template_name, "template": template,
"resource_name": resource_name,
"description": step.get("description", step.get("purpose", f"{operation} operation")), "description": step.get("description", step.get("purpose", f"{operation} operation")),
"lab_node_type": "Device", "lab_node_type": "Device",
"param": step.get("parameters", step.get("action_args", {})), "parameters": step.get("parameters", step.get("action_args", {})),
"footer": f"{template_name}-{resource_name}",
} }
if name:
step_data["name"] = name
refactored_data.append(step_data) refactored_data.append(step_data)
return refactored_data return refactored_data
def build_protocol_graph( def build_protocol_graph(
labware_info: List[Dict[str, Any]], labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str
protocol_steps: List[Dict[str, Any]],
workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None,
) -> WorkflowGraph: ) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑 """统一的协议图构建函数,根据设备类型自动选择构建逻辑"""
Args:
labware_info: labware 信息字典
protocol_steps: 协议步骤列表
workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选
"""
G = WorkflowGraph() G = WorkflowGraph()
resource_last_writer = {} resource_last_writer = {}
protocol_steps = refactor_data(protocol_steps, action_resource_mapping) protocol_steps = refactor_data(protocol_steps)
# 有机化学&移液站协议图构建 # 有机化学&移液站协议图构建
WORKSTATION_ID = workstation_name WORKSTATION_ID = workstation_name
# 为所有labware创建资源节点 # 为所有labware创建资源节点
res_index = 0
for labware_id, item in labware_info.items(): for labware_id, item in labware_info.items():
# item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}") # item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}")
node_id = str(uuid.uuid4()) node_id = str(uuid.uuid4())
@@ -273,16 +217,13 @@ def build_protocol_graph(
liquid_type = [labware_id] liquid_type = [labware_id]
liquid_volume = [1e5] liquid_volume = [1e5]
res_index += 1
G.add_node( G.add_node(
node_id, node_id,
template_name="create_resource", template_name=f"create_resource",
resource_name="host_node", resource_name="host_node",
name=f"Res {res_index}",
description=description, description=description,
lab_node_type=lab_node_type, lab_node_type=lab_node_type,
footer="create_resource-host_node", params={
param={
"res_id": labware_id, "res_id": labware_id,
"device_id": WORKSTATION_ID, "device_id": WORKSTATION_ID,
"class_name": "container", "class_name": "container",
@@ -293,6 +234,7 @@ def build_protocol_graph(
"liquid_volume": liquid_volume, "liquid_volume": liquid_volume,
"slot_on_deck": "", "slot_on_deck": "",
}, },
role=item.get("role", ""),
) )
resource_last_writer[labware_id] = f"{node_id}:labware" resource_last_writer[labware_id] = f"{node_id}:labware"
@@ -309,7 +251,7 @@ def build_protocol_graph(
last_control_node_id = node_id last_control_node_id = node_id
# 物料流 # 物料流
params = step.get("param", {}) params = step.get("parameters", {})
input_resources_possible_names = [ input_resources_possible_names = [
"vessel", "vessel",
"to_vessel", "to_vessel",
@@ -357,7 +299,7 @@ def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str):
G = nx.DiGraph() G = nx.DiGraph()
for node_id, attrs in protocol_graph.nodes.items(): for node_id, attrs in protocol_graph.nodes.items():
label = attrs.get("description", attrs.get("template_name", node_id[:8])) label = attrs.get("description", attrs.get("template", node_id[:8]))
G.add_node(node_id, label=label, **attrs) G.add_node(node_id, label=label, **attrs)
for edge in protocol_graph.edges: for edge in protocol_graph.edges:
@@ -389,13 +331,11 @@ def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str):
print(f" - Visualization saved to '{output_path}'") print(f" - Visualization saved to '{output_path}'")
COMPASS = {"n", "e", "s", "w", "ne", "nw", "se", "sw", "c"} COMPASS = {"n","e","s","w","ne","nw","se","sw","c"}
def _is_compass(port: str) -> bool: def _is_compass(port: str) -> bool:
return isinstance(port, str) and port.lower() in COMPASS return isinstance(port, str) and port.lower() in COMPASS
def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"): def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"):
""" """
使用 Graphviz 端口语法绘制协议工作流图。 使用 Graphviz 端口语法绘制协议工作流图。
@@ -410,22 +350,22 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 1) 先用 networkx 搭建有向图,保留端口属性 # 1) 先用 networkx 搭建有向图,保留端口属性
G = nx.DiGraph() G = nx.DiGraph()
for node_id, attrs in protocol_graph.nodes.items(): for node_id, attrs in protocol_graph.nodes.items():
label = attrs.get("description", attrs.get("template_name", node_id[:8])) label = attrs.get("description", attrs.get("template", node_id[:8]))
# 保留一个干净的“中心标签”,用于放在 record 的中间槽 # 保留一个干净的“中心标签”,用于放在 record 的中间槽
G.add_node(node_id, _core_label=str(label), **{k: v for k, v in attrs.items() if k not in ("label",)}) G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)})
edges_data = [] edges_data = []
in_ports_by_node = {} # 收集命名输入端口 in_ports_by_node = {} # 收集命名输入端口
out_ports_by_node = {} # 收集命名输出端口 out_ports_by_node = {} # 收集命名输出端口
for edge in protocol_graph.edges: for edge in protocol_graph.edges:
u = edge["source"] u = edge["source"]
v = edge["target"] v = edge["target"]
sp = edge.get("source_handle_key") or edge.get("source_port") sp = edge.get("source_port")
tp = edge.get("target_handle_key") or edge.get("target_port") tp = edge.get("target_port")
# 记录到图里(保留原始端口信息) # 记录到图里(保留原始端口信息)
G.add_edge(u, v, source_handle_key=sp, target_handle_key=tp) G.add_edge(u, v, source_port=sp, target_port=tp)
edges_data.append((u, v, sp, tp)) edges_data.append((u, v, sp, tp))
# 如果不是 compass就按“命名端口”先归类等会儿给节点造 record # 如果不是 compass就按“命名端口”先归类等会儿给节点造 record
@@ -437,9 +377,7 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 2) 转为 AGraph使用 Graphviz 渲染 # 2) 转为 AGraph使用 Graphviz 渲染
A = to_agraph(G) A = to_agraph(G)
A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10") A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10")
A.node_attr.update( A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica")
shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica"
)
A.edge_attr.update(arrowsize="0.8", color="#666666") A.edge_attr.update(arrowsize="0.8", color="#666666")
# 3) 为需要命名端口的节点设置 record 形状与 label # 3) 为需要命名端口的节点设置 record 形状与 label
@@ -448,19 +386,18 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
node = A.get_node(n) node = A.get_node(n)
core = G.nodes[n].get("_core_label", n) core = G.nodes[n].get("_core_label", n)
in_ports = sorted(in_ports_by_node.get(n, [])) in_ports = sorted(in_ports_by_node.get(n, []))
out_ports = sorted(out_ports_by_node.get(n, [])) out_ports = sorted(out_ports_by_node.get(n, []))
# 如果该节点涉及命名端口,则用 record否则保留原 box # 如果该节点涉及命名端口,则用 record否则保留原 box
if in_ports or out_ports: if in_ports or out_ports:
def port_fields(ports): def port_fields(ports):
if not ports: if not ports:
return " " # 必须留一个空槽占位 return " " # 必须留一个空槽占位
# 每个端口一个小格子,<p> name # 每个端口一个小格子,<p> name
return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports) return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports)
left = port_fields(in_ports) left = port_fields(in_ports)
right = port_fields(out_ports) right = port_fields(out_ports)
# 三栏:左(入) | 中(节点名) | 右(出) # 三栏:左(入) | 中(节点名) | 右(出)
@@ -473,7 +410,7 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 4) 给边设置 headport / tailport # 4) 给边设置 headport / tailport
# - 若端口为 compass直接用 compasse.g., headport="e" # - 若端口为 compass直接用 compasse.g., headport="e"
# - 若端口为命名端口:使用在 record 中定义的 <port> 名(同名即可) # - 若端口为命名端口:使用在 record 中定义的 <port> 名(同名即可)
for u, v, sp, tp in edges_data: for (u, v, sp, tp) in edges_data:
e = A.get_edge(u, v) e = A.get_edge(u, v)
# Graphviz 属性tail 是源head 是目标 # Graphviz 属性tail 是源head 是目标
@@ -482,13 +419,13 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
e.attr["tailport"] = sp.lower() e.attr["tailport"] = sp.lower()
else: else:
# 与 record label 中 <port> 名一致;特殊字符已在 label 中做了清洗 # 与 record label 中 <port> 名一致;特殊字符已在 label 中做了清洗
e.attr["tailport"] = re.sub(r"[^A-Za-z0-9_:.|-]", "_", str(sp)) e.attr["tailport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(sp))
if tp: if tp:
if _is_compass(tp): if _is_compass(tp):
e.attr["headport"] = tp.lower() e.attr["headport"] = tp.lower()
else: else:
e.attr["headport"] = re.sub(r"[^A-Za-z0-9_:.|-]", "_", str(tp)) e.attr["headport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(tp))
# 可选:若想让边更贴边缘,可设置 constraint/spline 等 # 可选:若想让边更贴边缘,可设置 constraint/spline 等
# e.attr["arrowhead"] = "vee" # e.attr["arrowhead"] = "vee"
@@ -496,14 +433,11 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 5) 输出 # 5) 输出
A.draw(output_path, prog="dot") A.draw(output_path, prog="dot")
print(f" - Port-aware workflow rendered to '{output_path}'") print(f" - Port-aware workflow rendered to '{output_path}'")
# ---------------- Registry Adapter ---------------- # ---------------- Registry Adapter ----------------
class RegistryAdapter: class RegistryAdapter:
"""根据 module 的类名(冒号右侧)反查 registry 的 resource_name原 device_class并抽取参数顺序""" """根据 module 的类名(冒号右侧)反查 registry 的 resource_name原 device_class并抽取参数顺序"""
def __init__(self, device_registry: Dict[str, Any]): def __init__(self, device_registry: Dict[str, Any]):
self.device_registry = device_registry or {} self.device_registry = device_registry or {}
self.module_class_to_resource = self._build_module_class_index() self.module_class_to_resource = self._build_module_class_index()
@@ -521,7 +455,8 @@ class RegistryAdapter:
def resolve_resource_by_classname(self, class_name: str) -> Optional[str]: def resolve_resource_by_classname(self, class_name: str) -> Optional[str]:
if not class_name: if not class_name:
return None return None
return self.module_class_to_resource.get(class_name) or self.module_class_to_resource.get(class_name.lower()) return (self.module_class_to_resource.get(class_name)
or self.module_class_to_resource.get(class_name.lower()))
def get_device_module(self, resource_name: Optional[str]) -> Optional[str]: def get_device_module(self, resource_name: Optional[str]) -> Optional[str]:
if not resource_name: if not resource_name:
@@ -531,7 +466,9 @@ class RegistryAdapter:
def get_actions(self, resource_name: Optional[str]) -> Dict[str, Any]: def get_actions(self, resource_name: Optional[str]) -> Dict[str, Any]:
if not resource_name: if not resource_name:
return {} return {}
return (self.device_registry.get(resource_name, {}).get("class", {}).get("action_value_mappings", {})) or {} return (self.device_registry.get(resource_name, {})
.get("class", {})
.get("action_value_mappings", {})) or {}
def get_action_schema(self, resource_name: Optional[str], template_name: str) -> Optional[Json]: def get_action_schema(self, resource_name: Optional[str], template_name: str) -> Optional[Json]:
return (self.get_actions(resource_name).get(template_name) or {}).get("schema") return (self.get_actions(resource_name).get(template_name) or {}).get("schema")

View File

@@ -1,356 +0,0 @@
"""
JSON 工作流转换模块
提供从多种 JSON 格式转换为统一工作流格式的功能。
支持的格式:
1. workflow/reagent 格式
2. steps_info/labware_info 格式
"""
import json
from os import PathLike
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from unilabos.workflow.common import WorkflowGraph, build_protocol_graph
from unilabos.registry.registry import lab_registry
def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List[str]]:
"""
从 registry 获取指定设备和动作的 handles 配置
Args:
resource_name: 设备资源名称,如 "liquid_handler.prcxi"
template_name: 动作模板名称,如 "transfer_liquid"
Returns:
包含 source 和 target handler_keys 的字典:
{"source": ["sources_out", "targets_out", ...], "target": ["sources", "targets", ...]}
"""
result = {"source": [], "target": []}
device_info = lab_registry.device_type_registry.get(resource_name, {})
if not device_info:
return result
action_mappings = device_info.get("class", {}).get("action_value_mappings", {})
action_config = action_mappings.get(template_name, {})
handles = action_config.get("handles", {})
if isinstance(handles, dict):
# 处理 input handles (作为 target)
for handle in handles.get("input", []):
handler_key = handle.get("handler_key", "")
if handler_key:
result["source"].append(handler_key)
# 处理 output handles (作为 source)
for handle in handles.get("output", []):
handler_key = handle.get("handler_key", "")
if handler_key:
result["target"].append(handler_key)
return result
def validate_workflow_handles(graph: WorkflowGraph) -> Tuple[bool, List[str]]:
"""
校验工作流图中所有边的句柄配置是否正确
Args:
graph: 工作流图对象
Returns:
(is_valid, errors): 是否有效,错误信息列表
"""
errors = []
nodes = graph.nodes
for edge in graph.edges:
left_uuid = edge.get("source")
right_uuid = edge.get("target")
# target_handle_key是target, right的输入节点入节点
# source_handle_key是source, left的输出节点出节点
right_source_conn_key = edge.get("target_handle_key", "")
left_target_conn_key = edge.get("source_handle_key", "")
# 获取源节点和目标节点信息
left_node = nodes.get(left_uuid, {})
right_node = nodes.get(right_uuid, {})
left_res_name = left_node.get("resource_name", "")
left_template_name = left_node.get("template_name", "")
right_res_name = right_node.get("resource_name", "")
right_template_name = right_node.get("template_name", "")
# 获取源节点的 output handles
left_node_handles = get_action_handles(left_res_name, left_template_name)
target_valid_keys = left_node_handles.get("target", [])
target_valid_keys.append("ready")
# 获取目标节点的 input handles
right_node_handles = get_action_handles(right_res_name, right_template_name)
source_valid_keys = right_node_handles.get("source", [])
source_valid_keys.append("ready")
# 如果节点配置了 output handles则 source_port 必须有效
if not right_source_conn_key:
node_name = left_node.get("name", left_uuid[:8])
errors.append(f"源节点 '{node_name}' 的 source_handle_key 为空," f"应设置为: {source_valid_keys}")
elif right_source_conn_key not in source_valid_keys:
node_name = left_node.get("name", left_uuid[:8])
errors.append(
f"源节点 '{node_name}' 的 source 端点 '{right_source_conn_key}' 不存在," f"支持的端点: {source_valid_keys}"
)
# 如果节点配置了 input handles则 target_port 必须有效
if not left_target_conn_key:
node_name = right_node.get("name", right_uuid[:8])
errors.append(f"目标节点 '{node_name}' 的 target_handle_key 为空," f"应设置为: {target_valid_keys}")
elif left_target_conn_key not in target_valid_keys:
node_name = right_node.get("name", right_uuid[:8])
errors.append(
f"目标节点 '{node_name}' 的 target 端点 '{left_target_conn_key}' 不存在,"
f"支持的端点: {target_valid_keys}"
)
return len(errors) == 0, errors
# action 到 resource_name 的映射
ACTION_RESOURCE_MAPPING: Dict[str, str] = {
# 生物实验操作
"transfer_liquid": "liquid_handler.prcxi",
"transfer": "liquid_handler.prcxi",
"incubation": "incubator.prcxi",
"move_labware": "labware_mover.prcxi",
"oscillation": "shaker.prcxi",
# 有机化学操作
"HeatChillToTemp": "heatchill.chemputer",
"StopHeatChill": "heatchill.chemputer",
"StartHeatChill": "heatchill.chemputer",
"HeatChill": "heatchill.chemputer",
"Dissolve": "stirrer.chemputer",
"Transfer": "liquid_handler.chemputer",
"Evaporate": "rotavap.chemputer",
"Recrystallize": "reactor.chemputer",
"Filter": "filter.chemputer",
"Dry": "dryer.chemputer",
"Add": "liquid_handler.chemputer",
}
def normalize_steps(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
将不同格式的步骤数据规范化为统一格式
支持的输入格式:
- action + parameters
- action + action_args
- operation + parameters
Args:
data: 原始步骤数据列表
Returns:
规范化后的步骤列表,格式为 [{"action": str, "parameters": dict, "description": str?, "step_number": int?}, ...]
"""
normalized = []
for idx, step in enumerate(data):
# 获取动作名称(支持 action 或 operation 字段)
action = step.get("action") or step.get("operation")
if not action:
continue
# 获取参数(支持 parameters 或 action_args 字段)
raw_params = step.get("parameters") or step.get("action_args") or {}
params = dict(raw_params)
# 规范化 source/target -> sources/targets
if "source" in raw_params and "sources" not in raw_params:
params["sources"] = raw_params["source"]
if "target" in raw_params and "targets" not in raw_params:
params["targets"] = raw_params["target"]
# 获取描述(支持 description 或 purpose 字段)
description = step.get("description") or step.get("purpose")
# 获取步骤编号(优先使用原始数据中的 step_number否则使用索引+1
step_number = step.get("step_number", idx + 1)
step_dict = {"action": action, "parameters": params, "step_number": step_number}
if description:
step_dict["description"] = description
normalized.append(step_dict)
return normalized
def normalize_labware(data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
将不同格式的 labware 数据规范化为统一的字典格式
支持的输入格式:
- reagent_name + material_name + positions
- name + labware + slot
Args:
data: 原始 labware 数据列表
Returns:
规范化后的 labware 字典,格式为 {name: {"slot": int, "labware": str, "well": list, "type": str, "role": str, "name": str}, ...}
"""
labware = {}
for item in data:
# 获取 key 名称(优先使用 reagent_name其次是 material_name 或 name
reagent_name = item.get("reagent_name")
key = reagent_name or item.get("material_name") or item.get("name")
if not key:
continue
key = str(key)
# 处理重复 key自动添加后缀
idx = 1
original_key = key
while key in labware:
idx += 1
key = f"{original_key}_{idx}"
labware[key] = {
"slot": item.get("positions") or item.get("slot"),
"labware": item.get("material_name") or item.get("labware"),
"well": item.get("well", []),
"type": item.get("type", "reagent"),
"role": item.get("role", ""),
"name": key,
}
return labware
def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
validate: bool = True,
) -> WorkflowGraph:
"""
从 JSON 数据或文件转换为 WorkflowGraph
支持的 JSON 格式:
1. {"workflow": [...], "reagent": {...}} - 直接格式
2. {"steps_info": [...], "labware_info": [...]} - 需要规范化的格式
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True
Returns:
WorkflowGraph: 构建好的工作流图
Raises:
ValueError: 不支持的 JSON 格式 或 句柄校验失败
FileNotFoundError: 文件不存在
json.JSONDecodeError: JSON 解析失败
"""
# 处理输入数据
if isinstance(data, (str, PathLike)):
path = Path(data)
if path.exists():
with path.open("r", encoding="utf-8") as fp:
json_data = json.load(fp)
elif isinstance(data, str):
# 尝试作为 JSON 字符串解析
json_data = json.loads(data)
else:
raise FileNotFoundError(f"文件不存在: {data}")
elif isinstance(data, dict):
json_data = data
else:
raise TypeError(f"不支持的数据类型: {type(data)}")
# 根据格式解析数据
if "workflow" in json_data and "reagent" in json_data:
# 格式1: workflow/reagent已经是规范格式
protocol_steps = json_data["workflow"]
labware_info = json_data["reagent"]
elif "steps_info" in json_data and "labware_info" in json_data:
# 格式2: steps_info/labware_info需要规范化
protocol_steps = normalize_steps(json_data["steps_info"])
labware_info = normalize_labware(json_data["labware_info"])
elif "steps" in json_data and "labware" in json_data:
# 格式3: steps/labware另一种常见格式
protocol_steps = normalize_steps(json_data["steps"])
if isinstance(json_data["labware"], list):
labware_info = normalize_labware(json_data["labware"])
else:
labware_info = json_data["labware"]
else:
raise ValueError(
"不支持的 JSON 格式。支持的格式:\n"
"1. {'workflow': [...], 'reagent': {...}}\n"
"2. {'steps_info': [...], 'labware_info': [...]}\n"
"3. {'steps': [...], 'labware': [...]}"
)
# 构建工作流图
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
)
# 校验句柄配置
if validate:
is_valid, errors = validate_workflow_handles(graph)
if not is_valid:
import warnings
for error in errors:
warnings.warn(f"句柄校验警告: {error}")
return graph
def convert_json_to_node_link(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
) -> Dict[str, Any]:
"""
将 JSON 数据转换为 node-link 格式的字典
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
Returns:
Dict: node-link 格式的工作流数据
"""
graph = convert_from_json(data, workstation_name)
return graph.to_node_link_dict()
def convert_json_to_workflow_list(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
) -> List[Dict[str, Any]]:
"""
将 JSON 数据转换为工作流列表格式
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
Returns:
List: 工作流节点列表
"""
graph = convert_from_json(data, workstation_name)
return graph.to_dict()
# 为了向后兼容,保留下划线前缀的别名
_normalize_steps = normalize_steps
_normalize_labware = normalize_labware

View File

@@ -0,0 +1,24 @@
import json
from os import PathLike
from unilabos.workflow.common import build_protocol_graph
def from_labwares_and_steps(data_path: PathLike):
with data_path.open("r", encoding="utf-8") as fp:
d = json.load(fp)
if "workflow" in d and "reagent" in d:
protocol_steps = d["workflow"]
labware_info = d["reagent"]
elif "steps_info" in d and "labware_info" in d:
protocol_steps = _normalize_steps(d["steps_info"])
labware_info = _normalize_labware(d["labware_info"])
else:
raise ValueError("Unsupported protocol format")
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name="PRCXi",
)

View File

@@ -1,138 +0,0 @@
"""
工作流工具模块
提供工作流上传等功能
"""
import json
import os
import uuid
from typing import Any, Dict, List, Optional
from unilabos.utils.banner_print import print_status
def _is_node_link_format(data: Dict[str, Any]) -> bool:
"""检查数据是否为 node-link 格式"""
return "nodes" in data and "edges" in data
def _convert_to_node_link(workflow_file: str, workflow_data: Dict[str, Any]) -> Dict[str, Any]:
"""
将非 node-link 格式的工作流数据转换为 node-link 格式
Args:
workflow_file: 工作流文件路径(用于日志)
workflow_data: 原始工作流数据
Returns:
node-link 格式的工作流数据
"""
from unilabos.workflow.convert_from_json import convert_json_to_node_link
print_status(f"检测到非 node-link 格式,正在转换...", "info")
node_link_data = convert_json_to_node_link(workflow_data)
print_status(f"转换完成", "success")
return node_link_data
def upload_workflow(
workflow_file: str,
workflow_name: Optional[str] = None,
tags: Optional[List[str]] = None,
published: bool = False,
) -> Dict[str, Any]:
"""
上传工作流到服务器
支持的输入格式:
1. node-link 格式: {"nodes": [...], "edges": [...]}
2. workflow/reagent 格式: {"workflow": [...], "reagent": {...}}
3. steps_info/labware_info 格式: {"steps_info": [...], "labware_info": [...]}
4. steps/labware 格式: {"steps": [...], "labware": [...]}
Args:
workflow_file: 工作流文件路径JSON格式
workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
Returns:
Dict: API响应数据
"""
# 延迟导入,避免在配置文件加载之前初始化 http_client
from unilabos.app.web import http_client
if not os.path.exists(workflow_file):
print_status(f"工作流文件不存在: {workflow_file}", "error")
return {"code": -1, "message": f"文件不存在: {workflow_file}"}
# 读取工作流文件
try:
with open(workflow_file, "r", encoding="utf-8") as f:
workflow_data = json.load(f)
except json.JSONDecodeError as e:
print_status(f"工作流文件JSON解析失败: {e}", "error")
return {"code": -1, "message": f"JSON解析失败: {e}"}
# 自动检测并转换格式
if not _is_node_link_format(workflow_data):
try:
workflow_data = _convert_to_node_link(workflow_file, workflow_data)
except Exception as e:
print_status(f"工作流格式转换失败: {e}", "error")
return {"code": -1, "message": f"格式转换失败: {e}"}
# 提取工作流数据
nodes = workflow_data.get("nodes", [])
edges = workflow_data.get("edges", [])
workflow_uuid_val = workflow_data.get("workflow_uuid", str(uuid.uuid4()))
wf_name_from_file = workflow_data.get("workflow_name", os.path.basename(workflow_file).replace(".json", ""))
# 确定工作流名称
final_name = workflow_name or wf_name_from_file
print_status(f"正在上传工作流: {final_name}", "info")
print_status(f" - 节点数量: {len(nodes)}", "info")
print_status(f" - 边数量: {len(edges)}", "info")
print_status(f" - 标签: {tags or []}", "info")
print_status(f" - 发布状态: {published}", "info")
# 调用 http_client 上传
result = http_client.workflow_import(
name=final_name,
workflow_uuid=workflow_uuid_val,
workflow_name=final_name,
nodes=nodes,
edges=edges,
tags=tags,
published=published,
)
if result.get("code") == 0:
data = result.get("data", {})
print_status("工作流上传成功!", "success")
print_status(f" - UUID: {data.get('uuid', 'N/A')}", "info")
print_status(f" - 名称: {data.get('name', 'N/A')}", "info")
else:
print_status(f"工作流上传失败: {result.get('message', '未知错误')}", "error")
return result
def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None:
"""
处理 workflow_upload 子命令
Args:
args_dict: 命令行参数字典
"""
workflow_file = args_dict.get("workflow_file")
workflow_name = args_dict.get("workflow_name")
tags = args_dict.get("tags", [])
published = args_dict.get("published", False)
if workflow_file:
upload_workflow(workflow_file, workflow_name, tags, published)
else:
print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error")