1 Commits

Author SHA1 Message Date
Junhan Chang
71c9a777ba add unilabos/workflow and entrypoint 2025-12-07 15:23:51 +08:00
52 changed files with 318 additions and 2331 deletions

View File

@@ -39,9 +39,7 @@ Uni-Lab-OS recommends using `mamba` for environment management. Choose the appro
```bash ```bash
# Create new environment # Create new environment
mamba create -n unilab python=3.11.11 mamba create -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge
mamba activate unilab
mamba install -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge
``` ```
## Install Dev Uni-Lab-OS ## Install Dev Uni-Lab-OS

View File

@@ -41,9 +41,7 @@ Uni-Lab-OS 建议使用 `mamba` 管理环境。根据您的操作系统选择适
```bash ```bash
# 创建新环境 # 创建新环境
mamba create -n unilab python=3.11.11 mamba create -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge
mamba activate unilab
mamba install -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge
``` ```
2. 安装开发版 Uni-Lab-OS: 2. 安装开发版 Uni-Lab-OS:

View File

@@ -317,6 +317,45 @@ unilab --help
如果所有命令都正常输出,说明开发环境配置成功! 如果所有命令都正常输出,说明开发环境配置成功!
### 开发工具推荐
#### IDE
- **PyCharm Professional**: 强大的 Python IDE支持远程调试
- **VS Code**: 轻量级,配合 Python 扩展使用
- **Vim/Emacs**: 适合终端开发
#### 推荐的 VS Code 扩展
- Python
- Pylance
- ROS
- URDF
- YAML
#### 调试工具
```bash
# 安装调试工具
pip install ipdb pytest pytest-cov -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
# 代码质量检查
pip install black flake8 mypy -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
```
### 设置 pre-commit 钩子(可选)
```bash
# 安装 pre-commit
pip install pre-commit -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
# 设置钩子
pre-commit install
# 手动运行检查
pre-commit run --all-files
```
--- ---
## 验证安装 ## 验证安装

View File

@@ -1,35 +0,0 @@
import sys
from datetime import datetime
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
import pytest
from unilabos.workflow.convert_from_json import (
convert_from_json,
normalize_steps as _normalize_steps,
normalize_labware as _normalize_labware,
)
from unilabos.workflow.common import draw_protocol_graph_with_ports
@pytest.mark.parametrize(
"protocol_name",
[
"example_bio",
# "bioyond_materials_liquidhandling_1",
"example_prcxi",
],
)
def test_build_protocol_graph(protocol_name):
data_path = Path(__file__).with_name(f"{protocol_name}.json")
graph = convert_from_json(data_path, workstation_name="PRCXi")
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_path = data_path.with_name(f"{protocol_name}_graph_{timestamp}.png")
draw_protocol_graph_with_ports(graph, str(output_path))
print(graph)

View File

@@ -20,7 +20,6 @@ if unilabos_dir not in sys.path:
from unilabos.utils.banner_print import print_status, print_unilab_banner from unilabos.utils.banner_print import print_status, print_unilab_banner
from unilabos.config.config import load_config, BasicConfig, HTTPConfig from unilabos.config.config import load_config, BasicConfig, HTTPConfig
def load_config_from_file(config_path): def load_config_from_file(config_path):
if config_path is None: if config_path is None:
config_path = os.environ.get("UNILABOS_BASICCONFIG_CONFIG_PATH", None) config_path = os.environ.get("UNILABOS_BASICCONFIG_CONFIG_PATH", None)
@@ -42,7 +41,7 @@ def convert_argv_dashes_to_underscores(args: argparse.ArgumentParser):
for i, arg in enumerate(sys.argv): for i, arg in enumerate(sys.argv):
for option_string in option_strings: for option_string in option_strings:
if arg.startswith(option_string): if arg.startswith(option_string):
new_arg = arg[:2] + arg[2 : len(option_string)].replace("-", "_") + arg[len(option_string) :] new_arg = arg[:2] + arg[2:len(option_string)].replace("-", "_") + arg[len(option_string):]
sys.argv[i] = new_arg sys.argv[i] = new_arg
break break
@@ -156,54 +155,32 @@ def parse_args():
default=False, default=False,
help="Complete registry information", help="Complete registry information",
) )
# workflow upload subcommand
# label
workflow_parser = subparsers.add_parser( workflow_parser = subparsers.add_parser(
"workflow_upload", "workflow_upload",
aliases=["wf"],
help="Upload workflow from xdl/json/python files", help="Upload workflow from xdl/json/python files",
) )
workflow_parser.add_argument( workflow_parser.add_argument("-t", "--labeltype", default="singlepoint", type=str,
"-f", help="QM calculation type, support 'singlepoint', 'optimize' and 'dimer' currently")
"--workflow_file",
type=str,
required=True,
help="Path to the workflow file (JSON format)",
)
workflow_parser.add_argument(
"-n",
"--workflow_name",
type=str,
default=None,
help="Workflow name, if not provided will use the name from file or filename",
)
workflow_parser.add_argument(
"--tags",
type=str,
nargs="*",
default=[],
help="Tags for the workflow (space-separated)",
)
workflow_parser.add_argument(
"--published",
action="store_true",
default=False,
help="Whether to publish the workflow (default: False)",
)
return parser return parser
def main(): def main():
"""主函数""" """主函数"""
# 解析命令行参数 # 解析命令行参数
parser = parse_args() args = parse_args()
convert_argv_dashes_to_underscores(parser) convert_argv_dashes_to_underscores(args)
args = parser.parse_args() args_dict = vars(args.parse_args())
args_dict = vars(args)
# 显示启动横幅
print_unilab_banner(args_dict)
# 环境检查 - 检查并自动安装必需的包 (可选) # 环境检查 - 检查并自动安装必需的包 (可选)
if not args_dict.get("skip_env_check", False): if not args_dict.get("skip_env_check", False):
from unilabos.utils.environment_check import check_environment from unilabos.utils.environment_check import check_environment
print_status("正在进行环境依赖检查...", "info")
if not check_environment(auto_install=True): if not check_environment(auto_install=True):
print_status("环境检查失败,程序退出", "error") print_status("环境检查失败,程序退出", "error")
os._exit(1) os._exit(1)
@@ -256,18 +233,17 @@ def main():
logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.") logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.")
configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir) configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
if args.addr != parser.get_default("addr"): if args_dict["addr"] == "test":
if args.addr == "test":
print_status("使用测试环境地址", "info") print_status("使用测试环境地址", "info")
HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1" HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1"
elif args.addr == "uat": elif args_dict["addr"] == "uat":
print_status("使用uat环境地址", "info") print_status("使用uat环境地址", "info")
HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1" HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1"
elif args.addr == "local": elif args_dict["addr"] == "local":
print_status("使用本地环境地址", "info") print_status("使用本地环境地址", "info")
HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1" HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1"
else: else:
HTTPConfig.remote_addr = args.addr HTTPConfig.remote_addr = args_dict.get("addr", "")
# 设置BasicConfig参数 # 设置BasicConfig参数
if args_dict.get("ak", ""): if args_dict.get("ak", ""):
@@ -278,10 +254,18 @@ def main():
print_status("传入了sk参数优先采用传入参数", "info") print_status("传入了sk参数优先采用传入参数", "info")
BasicConfig.working_dir = working_dir BasicConfig.working_dir = working_dir
workflow_upload = args_dict.get("command") in ("workflow_upload", "wf") # 显示启动横幅
print_unilab_banner(args_dict)
#####################################
######## 启动设备接入端(主入口) ########
#####################################
launch(args_dict)
def launch(args_dict: Dict[str, Any]):
# 使用远程资源启动 # 使用远程资源启动
if not workflow_upload and args_dict["use_remote_resource"]: if args_dict["use_remote_resource"]:
print_status("使用远程资源启动", "info") print_status("使用远程资源启动", "info")
from unilabos.app.web import http_client from unilabos.app.web import http_client
@@ -317,36 +301,11 @@ def main():
from unilabos.resources.graphio import modify_to_backend_format from unilabos.resources.graphio import modify_to_backend_format
from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDict from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDict
# 显示启动横幅
print_unilab_banner(args_dict)
# 注册表 # 注册表
lab_registry = build_registry( lab_registry = build_registry(
args_dict["registry_path"], args_dict.get("complete_registry", False), BasicConfig.upload_registry args_dict["registry_path"], args_dict.get("complete_registry", False), args_dict["upload_registry"]
) )
if BasicConfig.upload_registry:
# 设备注册到服务端 - 需要 ak 和 sk
if BasicConfig.ak and BasicConfig.sk:
print_status("开始注册设备到服务端...", "info")
try:
register_devices_and_resources(lab_registry)
print_status("设备注册完成", "info")
except Exception as e:
print_status(f"设备注册失败: {e}", "error")
else:
print_status("未提供 ak 和 sk跳过设备注册", "info")
else:
print_status("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning")
# 处理 workflow_upload 子命令
if workflow_upload:
from unilabos.workflow.wf_utils import handle_workflow_upload_command
handle_workflow_upload_command(args_dict)
print_status("工作流上传完成,程序退出", "info")
os._exit(0)
if not BasicConfig.ak or not BasicConfig.sk: if not BasicConfig.ak or not BasicConfig.sk:
print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning") print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning")
os._exit(1) os._exit(1)
@@ -423,6 +382,20 @@ def main():
args_dict["devices_config"] = resource_tree_set args_dict["devices_config"] = resource_tree_set
args_dict["graph"] = graph_res.physical_setup_graph args_dict["graph"] = graph_res.physical_setup_graph
if BasicConfig.upload_registry:
# 设备注册到服务端 - 需要 ak 和 sk
if BasicConfig.ak and BasicConfig.sk:
print_status("开始注册设备到服务端...", "info")
try:
register_devices_and_resources(lab_registry)
print_status("设备注册完成", "info")
except Exception as e:
print_status(f"设备注册失败: {e}", "error")
else:
print_status("未提供 ak 和 sk跳过设备注册", "info")
else:
print_status("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning")
if args_dict["controllers"] is not None: if args_dict["controllers"] is not None:
args_dict["controllers_config"] = yaml.safe_load(open(args_dict["controllers"], encoding="utf-8")) args_dict["controllers_config"] = yaml.safe_load(open(args_dict["controllers"], encoding="utf-8"))
else: else:
@@ -437,7 +410,6 @@ def main():
comm_client = get_communication_client() comm_client = get_communication_client()
if "websocket" in args_dict["app_bridges"]: if "websocket" in args_dict["app_bridges"]:
args_dict["bridges"].append(comm_client) args_dict["bridges"].append(comm_client)
def _exit(signum, frame): def _exit(signum, frame):
comm_client.stop() comm_client.stop()
sys.exit(0) sys.exit(0)
@@ -479,13 +451,16 @@ def main():
resource_visualization.start() resource_visualization.start()
except OSError as e: except OSError as e:
if "AMENT_PREFIX_PATH" in str(e): if "AMENT_PREFIX_PATH" in str(e):
print_status(f"ROS 2环境未正确设置跳过3D可视化启动。错误详情: {e}", "warning") print_status(
f"ROS 2环境未正确设置跳过3D可视化启动。错误详情: {e}",
"warning"
)
print_status( print_status(
"建议解决方案:\n" "建议解决方案:\n"
"1. 激活Conda环境: conda activate unilab\n" "1. 激活Conda环境: conda activate unilab\n"
"2. 或使用 --backend simple 参数\n" "2. 或使用 --backend simple 参数\n"
"3. 或使用 --visual disable 参数禁用可视化", "3. 或使用 --visual disable 参数禁用可视化",
"info", "info"
) )
else: else:
raise raise

View File

@@ -76,8 +76,7 @@ class HTTPClient:
Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid} Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid}
""" """
with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "w", encoding="utf-8") as f: with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "w", encoding="utf-8") as f:
payload = {"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid} f.write(json.dumps({"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, indent=4))
f.write(json.dumps(payload, indent=4))
# 从序列化数据中提取所有节点的UUID保存旧UUID # 从序列化数据中提取所有节点的UUID保存旧UUID
old_uuids = {n.res_content.uuid: n for n in resources.all_nodes} old_uuids = {n.res_content.uuid: n for n in resources.all_nodes}
if not self.initialized or first_add: if not self.initialized or first_add:
@@ -332,67 +331,6 @@ class HTTPClient:
logger.error(f"响应内容: {response.text}") logger.error(f"响应内容: {response.text}")
return None return None
def workflow_import(
self,
name: str,
workflow_uuid: str,
workflow_name: str,
nodes: List[Dict[str, Any]],
edges: List[Dict[str, Any]],
tags: Optional[List[str]] = None,
published: bool = False,
) -> Dict[str, Any]:
"""
导入工作流到服务器
Args:
name: 工作流名称(顶层)
workflow_uuid: 工作流UUID
workflow_name: 工作流名称data内部
nodes: 工作流节点列表
edges: 工作流边列表
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
Returns:
Dict: API响应数据包含 code 和 data (uuid, name)
"""
# target_lab_uuid 暂时使用默认值,后续由后端根据 ak/sk 获取
payload = {
"target_lab_uuid": "28c38bb0-63f6-4352-b0d8-b5b8eb1766d5",
"name": name,
"data": {
"workflow_uuid": workflow_uuid,
"workflow_name": workflow_name,
"nodes": nodes,
"edges": edges,
"tags": tags if tags is not None else [],
"published": published,
},
}
# 保存请求到文件
with open(os.path.join(BasicConfig.working_dir, "req_workflow_upload.json"), "w", encoding="utf-8") as f:
f.write(json.dumps(payload, indent=4, ensure_ascii=False))
response = requests.post(
f"{self.remote_addr}/lab/workflow/owner/import",
json=payload,
headers={"Authorization": f"Lab {self.auth}"},
timeout=60,
)
# 保存响应到文件
with open(os.path.join(BasicConfig.working_dir, "res_workflow_upload.json"), "w", encoding="utf-8") as f:
f.write(f"{response.status_code}" + "\n" + response.text)
if response.status_code == 200:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"导入工作流失败: {response.text}")
return res
else:
logger.error(f"导入工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
# 创建默认客户端实例 # 创建默认客户端实例
http_client = HTTPClient() http_client = HTTPClient()

View File

@@ -438,7 +438,7 @@ class MessageProcessor:
self.connected = True self.connected = True
self.reconnect_count = 0 self.reconnect_count = 0
logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}") logger.info(f"[MessageProcessor] Connected to {self.websocket_url}")
# 启动发送协程 # 启动发送协程
send_task = asyncio.create_task(self._send_handler()) send_task = asyncio.create_task(self._send_handler())
@@ -503,7 +503,7 @@ class MessageProcessor:
async def _send_handler(self): async def _send_handler(self):
"""处理发送队列中的消息""" """处理发送队列中的消息"""
logger.trace("[MessageProcessor] Send handler started") logger.debug("[MessageProcessor] Send handler started")
try: try:
while self.connected and self.websocket: while self.connected and self.websocket:
@@ -965,7 +965,7 @@ class QueueProcessor:
def _run(self): def _run(self):
"""运行队列处理主循环""" """运行队列处理主循环"""
logger.trace("[QueueProcessor] Queue processor started") logger.debug("[QueueProcessor] Queue processor started")
while self.is_running: while self.is_running:
try: try:
@@ -1175,6 +1175,7 @@ class WebSocketClient(BaseCommunicationClient):
else: else:
url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule" url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule"
logger.debug(f"[WebSocketClient] URL: {url}")
return url return url
def start(self) -> None: def start(self) -> None:
@@ -1187,11 +1188,13 @@ class WebSocketClient(BaseCommunicationClient):
logger.error("[WebSocketClient] WebSocket URL not configured") logger.error("[WebSocketClient] WebSocket URL not configured")
return return
logger.info(f"[WebSocketClient] Starting connection to {self.websocket_url}")
# 启动两个核心线程 # 启动两个核心线程
self.message_processor.start() self.message_processor.start()
self.queue_processor.start() self.queue_processor.start()
logger.trace("[WebSocketClient] All threads started") logger.info("[WebSocketClient] All threads started")
def stop(self) -> None: def stop(self) -> None:
"""停止WebSocket客户端""" """停止WebSocket客户端"""

View File

@@ -21,8 +21,7 @@ class BasicConfig:
startup_json_path = None # 填写绝对路径 startup_json_path = None # 填写绝对路径
disable_browser = False # 禁止浏览器自动打开 disable_browser = False # 禁止浏览器自动打开
port = 8002 # 本地HTTP服务 port = 8002 # 本地HTTP服务
# 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' log_level: Literal['TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = "DEBUG" # 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG"
@classmethod @classmethod
def auth_secret(cls): def auth_secret(cls):
@@ -42,7 +41,7 @@ class WSConfig:
# HTTP配置 # HTTP配置
class HTTPConfig: class HTTPConfig:
remote_addr = "https://uni-lab.bohrium.com/api/v1" remote_addr = "http://127.0.0.1:48197/api/v1"
# ROS配置 # ROS配置
@@ -66,14 +65,13 @@ def _update_config_from_module(module):
if not attr.startswith("_"): if not attr.startswith("_"):
setattr(obj, attr, getattr(getattr(module, name), attr)) setattr(obj, attr, getattr(getattr(module, name), attr))
def _update_config_from_env(): def _update_config_from_env():
prefix = "UNILABOS_" prefix = "UNILABOS_"
for env_key, env_value in os.environ.items(): for env_key, env_value in os.environ.items():
if not env_key.startswith(prefix): if not env_key.startswith(prefix):
continue continue
try: try:
key_path = env_key[len(prefix) :] # Remove UNILAB_ prefix key_path = env_key[len(prefix):] # Remove UNILAB_ prefix
class_field = key_path.upper().split("_", 1) class_field = key_path.upper().split("_", 1)
if len(class_field) != 2: if len(class_field) != 2:
logger.warning(f"[ENV] 环境变量格式不正确:{env_key}") logger.warning(f"[ENV] 环境变量格式不正确:{env_key}")

View File

@@ -1,712 +0,0 @@
#!/usr/bin/env python3
import asyncio
import json
import subprocess
import sys
import threading
from typing import Optional, Dict, Any
import logging
import requests
import websockets
logging.getLogger("zeep").setLevel(logging.WARNING)
logging.getLogger("zeep.xsd.schema").setLevel(logging.WARNING)
logging.getLogger("zeep.xsd.schema.schema").setLevel(logging.WARNING)
from onvif import ONVIFCamera # 新增ONVIF PTZ 控制
# ======================= 独立的 PTZController =======================
class PTZController:
def __init__(self, host: str, port: int, user: str, password: str):
"""
:param host: 摄像机 IP 或域名(和 RTSP 的一样即可)
:param port: ONVIF 端口(多数为 80看你的设备
:param user: 摄像机用户名
:param password: 摄像机密码
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.cam: Optional[ONVIFCamera] = None
self.media_service = None
self.ptz_service = None
self.profile = None
def connect(self) -> bool:
"""
建立 ONVIF 连接并初始化 PTZ 能力,失败返回 False不抛异常
Note: 首先 pip install onvif-zeep
"""
try:
self.cam = ONVIFCamera(self.host, self.port, self.user, self.password)
self.media_service = self.cam.create_media_service()
self.ptz_service = self.cam.create_ptz_service()
profiles = self.media_service.GetProfiles()
if not profiles:
print("[PTZ] No media profiles found on camera.", file=sys.stderr)
return False
self.profile = profiles[0]
return True
except Exception as e:
print(f"[PTZ] Failed to init ONVIF PTZ: {e}", file=sys.stderr)
return False
def _continuous_move(self, pan: float, tilt: float, zoom: float, duration: float) -> bool:
"""
连续移动一段时间(秒),之后自动停止。
此函数为阻塞模式:只有在 Stop 调用结束后,才返回 True/False。
"""
if not self.ptz_service or not self.profile:
print("[PTZ] _continuous_move: ptz_service or profile not ready", file=sys.stderr)
return False
# 进入前先强行停一下,避免前一次残留动作
self._force_stop()
req = self.ptz_service.create_type("ContinuousMove")
req.ProfileToken = self.profile.token
req.Velocity = {
"PanTilt": {"x": pan, "y": tilt},
"Zoom": {"x": zoom},
}
try:
print(f"[PTZ] ContinuousMove start: pan={pan}, tilt={tilt}, zoom={zoom}, duration={duration}", file=sys.stderr)
self.ptz_service.ContinuousMove(req)
except Exception as e:
print(f"[PTZ] ContinuousMove failed: {e}", file=sys.stderr)
return False
# 阻塞等待:这里决定“运动时间”
import time
wait_seconds = max(2 * duration, 0.0)
time.sleep(wait_seconds)
# 运动完成后强制停止
return self._force_stop()
def stop(self) -> bool:
"""
阻塞调用 Stop带重试成功 True失败 False。
"""
return self._force_stop()
# ------- 对外动作接口(给 CameraController 调用) -------
# 所有接口都为“阻塞模式”:只有在运动 + Stop 完成后才返回 True/False
def move_up(self, speed: float = 0.5, duration: float = 1.0) -> bool:
print(f"[PTZ] move_up called, speed={speed}, duration={duration}", file=sys.stderr)
return self._continuous_move(pan=0.0, tilt=+speed, zoom=0.0, duration=duration)
def move_down(self, speed: float = 0.5, duration: float = 1.0) -> bool:
print(f"[PTZ] move_down called, speed={speed}, duration={duration}", file=sys.stderr)
return self._continuous_move(pan=0.0, tilt=-speed, zoom=0.0, duration=duration)
def move_left(self, speed: float = 0.2, duration: float = 1.0) -> bool:
print(f"[PTZ] move_left called, speed={speed}, duration={duration}", file=sys.stderr)
return self._continuous_move(pan=-speed, tilt=0.0, zoom=0.0, duration=duration)
def move_right(self, speed: float = 0.2, duration: float = 1.0) -> bool:
print(f"[PTZ] move_right called, speed={speed}, duration={duration}", file=sys.stderr)
return self._continuous_move(pan=+speed, tilt=0.0, zoom=0.0, duration=duration)
# ------- 占位的变倍接口(当前设备不支持) -------
def zoom_in(self, speed: float = 0.2, duration: float = 1.0) -> bool:
"""
当前设备不支持变倍;保留方法只是避免上层调用时报错。
"""
print("[PTZ] zoom_in is disabled for this device.", file=sys.stderr)
return False
def zoom_out(self, speed: float = 0.2, duration: float = 1.0) -> bool:
"""
当前设备不支持变倍;保留方法只是避免上层调用时报错。
"""
print("[PTZ] zoom_out is disabled for this device.", file=sys.stderr)
return False
def _force_stop(self, retries: int = 3, delay: float = 0.1) -> bool:
"""
尝试多次调用 Stop作为“强制停止”手段。
:param retries: 重试次数
:param delay: 每次重试间隔(秒)
"""
if not self.ptz_service or not self.profile:
print("[PTZ] _force_stop: ptz_service or profile not ready", file=sys.stderr)
return False
import time
last_error = None
for i in range(retries):
try:
print(f"[PTZ] _force_stop: calling Stop(), attempt={i+1}", file=sys.stderr)
self.ptz_service.Stop({"ProfileToken": self.profile.token})
print("[PTZ] _force_stop: Stop() returned OK", file=sys.stderr)
return True
except Exception as e:
last_error = e
print(f"[PTZ] _force_stop: Stop() failed at attempt {i+1}: {e}", file=sys.stderr)
time.sleep(delay)
print(f"[PTZ] _force_stop: all {retries} attempts failed, last error: {last_error}", file=sys.stderr)
return False
# ======================= CameraController加入 PTZ =======================
class CameraController:
"""
Uni-Lab-OS 摄像头驱动driver 形式)
启动 Uni-Lab-OS 后,立即开始推流
- WebSocket 信令:通过 signal_backend_url 连接到后端
例如: wss://sciol.ac.cn/api/realtime/signal/host/<host_id>
- 媒体服务器:通过 rtmp_url / webrtc_api / webrtc_stream_url
当前配置为 SRS与独立 HostSimulator 独立运行脚本保持一致。
"""
def __init__(
self,
host_id: str = "demo-host",
# 1信令后端WebSocket
signal_backend_url: str = "wss://sciol.ac.cn/api/realtime/signal/host",
# 2媒体后端RTMP + WebRTC API
rtmp_url: str = "rtmp://srs.sciol.ac.cn:4499/live/camera-01",
webrtc_api: str = "https://srs.sciol.ac.cn/rtc/v1/play/",
webrtc_stream_url: str = "webrtc://srs.sciol.ac.cn:4500/live/camera-01",
camera_rtsp_url: str = "",
# 3PTZ 控制相关ONVIF
ptz_host: str = "", # 一般就是摄像头 IP比如 "192.168.31.164"
ptz_port: int = 80, # ONVIF 端口,不一定是 80按实际情况改
ptz_user: str = "", # admin
ptz_password: str = "", # admin123
):
self.host_id = host_id
self.camera_rtsp_url = camera_rtsp_url
# 拼接最终的 WebSocket URL.../host/<host_id>
signal_backend_url = signal_backend_url.rstrip("/")
if not signal_backend_url.endswith("/host"):
signal_backend_url = signal_backend_url + "/host"
self.signal_backend_url = f"{signal_backend_url}/{host_id}"
# 媒体服务器配置
self.rtmp_url = rtmp_url
self.webrtc_api = webrtc_api
self.webrtc_stream_url = webrtc_stream_url
# PTZ 控制
self.ptz_host = ptz_host
self.ptz_port = ptz_port
self.ptz_user = ptz_user
self.ptz_password = ptz_password
self._ptz: Optional[PTZController] = None
self._init_ptz_if_possible()
# 运行时状态
self._ws: Optional[object] = None
self._ffmpeg_process: Optional[subprocess.Popen] = None
self._running = False
self._loop_task: Optional[asyncio.Future] = None
# 事件循环 & 线程
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None
try:
self.start()
except Exception as e:
print(f"[CameraController] __init__ auto start failed: {e}", file=sys.stderr)
# ------------------------ PTZ 初始化 ------------------------
# ------------------------ PTZ 公开动作方法(一个动作一个函数) ------------------------
def ptz_move_up(self, speed: float = 0.5, duration: float = 1.0) -> bool:
print(f"[CameraController] ptz_move_up called, speed={speed}, duration={duration}")
return self._ptz.move_up(speed=speed, duration=duration)
def ptz_move_down(self, speed: float = 0.5, duration: float = 1.0) -> bool:
print(f"[CameraController] ptz_move_down called, speed={speed}, duration={duration}")
return self._ptz.move_down(speed=speed, duration=duration)
def ptz_move_left(self, speed: float = 0.2, duration: float = 1.0) -> bool:
print(f"[CameraController] ptz_move_left called, speed={speed}, duration={duration}")
return self._ptz.move_left(speed=speed, duration=duration)
def ptz_move_right(self, speed: float = 0.2, duration: float = 1.0) -> bool:
print(f"[CameraController] ptz_move_right called, speed={speed}, duration={duration}")
return self._ptz.move_right(speed=speed, duration=duration)
def zoom_in(self, speed: float = 0.2, duration: float = 1.0) -> bool:
"""
当前设备不支持变倍;保留方法只是避免上层调用时报错。
"""
print("[PTZ] zoom_in is disabled for this device.", file=sys.stderr)
return False
def zoom_out(self, speed: float = 0.2, duration: float = 1.0) -> bool:
"""
当前设备不支持变倍;保留方法只是避免上层调用时报错。
"""
print("[PTZ] zoom_out is disabled for this device.", file=sys.stderr)
return False
def ptz_stop(self):
if self._ptz is None:
print("[CameraController] PTZ not initialized.", file=sys.stderr)
return
self._ptz.stop()
def _init_ptz_if_possible(self):
"""
根据 ptz_host / user / password 初始化 PTZ
如果配置信息不全则不启用 PTZ静默
"""
if not (self.ptz_host and self.ptz_user and self.ptz_password):
return
ctrl = PTZController(
host=self.ptz_host,
port=self.ptz_port,
user=self.ptz_user,
password=self.ptz_password,
)
if ctrl.connect():
self._ptz = ctrl
else:
self._ptz = None
# ---------------------------------------------------------------------
# 对外暴露的方法:供 Uni-Lab-OS 调用
# ---------------------------------------------------------------------
def start(self, config: Optional[Dict[str, Any]] = None):
"""
启动 Camera 连接 & 消息循环,并在启动时就开启 FFmpeg 推流,
"""
if self._running:
return {"status": "already_running", "host_id": self.host_id}
# 应用 config 覆盖(如果有)
if config:
self.camera_rtsp_url = config.get("camera_rtsp_url", self.camera_rtsp_url)
cfg_host_id = config.get("host_id")
if cfg_host_id:
self.host_id = cfg_host_id
signal_backend_url = config.get("signal_backend_url")
if signal_backend_url:
signal_backend_url = signal_backend_url.rstrip("/")
if not signal_backend_url.endswith("/host"):
signal_backend_url = signal_backend_url + "/host"
self.signal_backend_url = f"{signal_backend_url}/{self.host_id}"
self.rtmp_url = config.get("rtmp_url", self.rtmp_url)
self.webrtc_api = config.get("webrtc_api", self.webrtc_api)
self.webrtc_stream_url = config.get(
"webrtc_stream_url", self.webrtc_stream_url
)
# PTZ 相关配置也允许通过 config 注入
self.ptz_host = config.get("ptz_host", self.ptz_host)
self.ptz_port = int(config.get("ptz_port", self.ptz_port))
self.ptz_user = config.get("ptz_user", self.ptz_user)
self.ptz_password = config.get("ptz_password", self.ptz_password)
self._init_ptz_if_possible()
self._running = True
# === start 时启动 FFmpeg 推流 ===
self._start_ffmpeg()
# 创建新的事件循环和线程(用于 WebSocket 信令)
self._loop = asyncio.new_event_loop()
def loop_runner(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except Exception as e:
print(f"[CameraController] event loop error: {e}", file=sys.stderr)
self._loop_thread = threading.Thread(
target=loop_runner, args=(self._loop,), daemon=True
)
self._loop_thread.start()
self._loop_task = asyncio.run_coroutine_threadsafe(
self._run_main_loop(), self._loop
)
return {
"status": "started",
"host_id": self.host_id,
"signal_backend_url": self.signal_backend_url,
"rtmp_url": self.rtmp_url,
"webrtc_api": self.webrtc_api,
"webrtc_stream_url": self.webrtc_stream_url,
}
def stop(self) -> Dict[str, Any]:
"""
停止推流 & 断开 WebSocket并关闭事件循环线程。
"""
self._running = False
self._stop_ffmpeg()
if self._ws and self._loop is not None:
async def close_ws():
try:
await self._ws.close()
except Exception as e:
print(
f"[CameraController] error when closing WebSocket: {e}",
file=sys.stderr,
)
asyncio.run_coroutine_threadsafe(close_ws(), self._loop)
if self._loop_task is not None:
if not self._loop_task.done():
self._loop_task.cancel()
try:
self._loop_task.result()
except asyncio.CancelledError:
pass
except Exception as e:
print(
f"[CameraController] main loop task error in stop(): {e}",
file=sys.stderr,
)
finally:
self._loop_task = None
if self._loop is not None:
try:
self._loop.call_soon_threadsafe(self._loop.stop)
except Exception as e:
print(
f"[CameraController] error when stopping event loop: {e}",
file=sys.stderr,
)
if self._loop_thread is not None:
try:
self._loop_thread.join(timeout=5)
except Exception as e:
print(
f"[CameraController] error when joining loop thread: {e}",
file=sys.stderr,
)
finally:
self._loop_thread = None
self._ws = None
self._loop = None
return {"status": "stopped", "host_id": self.host_id}
def get_status(self) -> Dict[str, Any]:
"""
查询当前状态,方便在 Uni-Lab-OS 中做监控。
"""
ws_closed = None
if self._ws is not None:
ws_closed = getattr(self._ws, "closed", None)
if ws_closed is None:
websocket_connected = self._ws is not None
else:
websocket_connected = (self._ws is not None) and (not ws_closed)
return {
"host_id": self.host_id,
"running": self._running,
"websocket_connected": websocket_connected,
"ffmpeg_running": bool(
self._ffmpeg_process and self._ffmpeg_process.poll() is None
),
"signal_backend_url": self.signal_backend_url,
"rtmp_url": self.rtmp_url,
}
# ---------------------------------------------------------------------
# 内部实现逻辑WebSocket 循环 / FFmpeg / WebRTC Offer 处理
# ---------------------------------------------------------------------
async def _run_main_loop(self):
try:
while self._running:
try:
async with websockets.connect(self.signal_backend_url) as ws:
self._ws = ws
await self._recv_loop()
except asyncio.CancelledError:
raise
except Exception as e:
if self._running:
print(
f"[CameraController] WebSocket connection error: {e}",
file=sys.stderr,
)
await asyncio.sleep(3)
except asyncio.CancelledError:
pass
async def _recv_loop(self):
assert self._ws is not None
ws = self._ws
async for message in ws:
try:
data = json.loads(message)
except json.JSONDecodeError:
print(
f"[CameraController] received non-JSON message: {message}",
file=sys.stderr,
)
continue
try:
await self._handle_message(data)
except Exception as e:
print(
f"[CameraController] error while handling message {data}: {e}",
file=sys.stderr,
)
async def _handle_message(self, data: Dict[str, Any]):
"""
处理来自信令后端的消息:
- command: start_stream / stop_stream / ptz_xxx
- type: offer (WebRTC)
"""
cmd = data.get("command")
# ---------- 推流控制 ----------
if cmd == "start_stream":
try:
self._start_ffmpeg()
except Exception as e:
print(
f"[CameraController] error when starting FFmpeg on start_stream: {e}",
file=sys.stderr,
)
return
if cmd == "stop_stream":
try:
self._stop_ffmpeg()
except Exception as e:
print(
f"[CameraController] error when stopping FFmpeg on stop_stream: {e}",
file=sys.stderr,
)
return
# # ---------- PTZ 控制 ----------
# # 例如信令可以发:
# # {"command": "ptz_move", "direction": "down", "speed": 0.5, "duration": 0.5}
# if cmd == "ptz_move":
# if self._ptz is None:
# # 没有初始化 PTZ静默忽略或打印一条
# print("[CameraController] PTZ not initialized.", file=sys.stderr)
# return
# direction = data.get("direction", "")
# speed = float(data.get("speed", 0.5))
# duration = float(data.get("duration", 0.5))
# try:
# if direction == "up":
# self._ptz.move_up(speed=speed, duration=duration)
# elif direction == "down":
# self._ptz.move_down(speed=speed, duration=duration)
# elif direction == "left":
# self._ptz.move_left(speed=speed, duration=duration)
# elif direction == "right":
# self._ptz.move_right(speed=speed, duration=duration)
# elif direction == "zoom_in":
# self._ptz.zoom_in(speed=speed, duration=duration)
# elif direction == "zoom_out":
# self._ptz.zoom_out(speed=speed, duration=duration)
# elif direction == "stop":
# self._ptz.stop()
# else:
# # 未知方向,忽略
# pass
# except Exception as e:
# print(
# f"[CameraController] error when handling PTZ move: {e}",
# file=sys.stderr,
# )
# return
# ---------- WebRTC Offer ----------
if data.get("type") == "offer":
offer_sdp = data.get("sdp", "")
camera_id = data.get("cameraId", "camera-01")
try:
answer_sdp = await self._handle_webrtc_offer(offer_sdp)
except Exception as e:
print(
f"[CameraController] error when handling WebRTC offer: {e}",
file=sys.stderr,
)
return
if self._ws:
answer_payload = {
"type": "answer",
"sdp": answer_sdp,
"cameraId": camera_id,
"hostId": self.host_id,
}
try:
await self._ws.send(json.dumps(answer_payload))
except Exception as e:
print(
f"[CameraController] error when sending WebRTC answer: {e}",
file=sys.stderr,
)
# ------------------------ FFmpeg 相关 ------------------------
def _start_ffmpeg(self):
if self._ffmpeg_process and self._ffmpeg_process.poll() is None:
return
cmd = [
"ffmpeg",
"-rtsp_transport", "tcp",
"-i", self.camera_rtsp_url,
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-profile:v", "baseline",
"-b:v", "1M",
"-maxrate", "1M",
"-bufsize", "2M",
"-g", "10",
"-keyint_min", "10",
"-sc_threshold", "0",
"-pix_fmt", "yuv420p",
"-x264-params", "bframes=0",
"-c:a", "aac",
"-ar", "44100",
"-ac", "1",
"-b:a", "64k",
"-f", "flv",
self.rtmp_url,
]
try:
self._ffmpeg_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
shell=False,
)
except Exception as e:
print(f"[CameraController] failed to start FFmpeg: {e}", file=sys.stderr)
self._ffmpeg_process = None
raise
def _stop_ffmpeg(self):
proc = self._ffmpeg_process
if proc and proc.poll() is None:
try:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
proc.kill()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
print(
f"[CameraController] FFmpeg process did not exit even after kill (pid={proc.pid})",
file=sys.stderr,
)
except Exception as e:
print(
f"[CameraController] failed to kill FFmpeg process: {e}",
file=sys.stderr,
)
except Exception as e:
print(
f"[CameraController] error when stopping FFmpeg: {e}",
file=sys.stderr,
)
self._ffmpeg_process = None
# ------------------------ WebRTC Offer 相关 ------------------------
async def _handle_webrtc_offer(self, offer_sdp: str) -> str:
payload = {
"api": self.webrtc_api,
"streamurl": self.webrtc_stream_url,
"sdp": offer_sdp,
}
headers = {"Content-Type": "application/json"}
def _do_request():
return requests.post(
self.webrtc_api,
json=payload,
headers=headers,
timeout=10,
)
try:
loop = asyncio.get_running_loop()
resp = await loop.run_in_executor(None, _do_request)
except Exception as e:
print(
f"[CameraController] failed to send offer to media server: {e}",
file=sys.stderr,
)
raise
try:
resp.raise_for_status()
except Exception as e:
print(
f"[CameraController] media server HTTP error: {e}, "
f"status={resp.status_code}, body={resp.text[:200]}",
file=sys.stderr,
)
raise
try:
data = resp.json()
except Exception as e:
print(
f"[CameraController] failed to parse media server JSON: {e}, "
f"raw={resp.text[:200]}",
file=sys.stderr,
)
raise
answer_sdp = data.get("sdp", "")
if not answer_sdp:
msg = f"empty SDP from media server: {data}"
print(f"[CameraController] {msg}", file=sys.stderr)
raise RuntimeError(msg)
return answer_sdp

View File

@@ -1,401 +0,0 @@
#!/usr/bin/env python3
import asyncio
import json
import subprocess
import sys
import threading
from typing import Optional, Dict, Any
import requests
import websockets
class CameraController:
"""
Uni-Lab-OS 摄像头驱动Linux USB 摄像头版,无 PTZ
- WebSocket 信令signal_backend_url 连接到后端
例如: wss://sciol.ac.cn/api/realtime/signal/host/<host_id>
- 媒体服务器RTMP 推流到 rtmp_urlWebRTC offer 转发到 SRS 的 webrtc_api
- 视频源:本地 USB 摄像头V4L2默认 /dev/video0
"""
def __init__(
self,
host_id: str = "demo-host",
signal_backend_url: str = "wss://sciol.ac.cn/api/realtime/signal/host",
rtmp_url: str = "rtmp://srs.sciol.ac.cn:4499/live/camera-01",
webrtc_api: str = "https://srs.sciol.ac.cn/rtc/v1/play/",
webrtc_stream_url: str = "webrtc://srs.sciol.ac.cn:4500/live/camera-01",
video_device: str = "/dev/video0",
width: int = 1280,
height: int = 720,
fps: int = 30,
video_bitrate: str = "1500k",
audio_device: Optional[str] = None, # 比如 "hw:1,0",没有音频就保持 None
audio_bitrate: str = "64k",
):
self.host_id = host_id
# 拼接最终 WebSocket URL.../host/<host_id>
signal_backend_url = signal_backend_url.rstrip("/")
if not signal_backend_url.endswith("/host"):
signal_backend_url = signal_backend_url + "/host"
self.signal_backend_url = f"{signal_backend_url}/{host_id}"
# 媒体服务器配置
self.rtmp_url = rtmp_url
self.webrtc_api = webrtc_api
self.webrtc_stream_url = webrtc_stream_url
# 本地采集配置
self.video_device = video_device
self.width = int(width)
self.height = int(height)
self.fps = int(fps)
self.video_bitrate = video_bitrate
self.audio_device = audio_device
self.audio_bitrate = audio_bitrate
# 运行时状态
self._ws: Optional[object] = None
self._ffmpeg_process: Optional[subprocess.Popen] = None
self._running = False
self._loop_task: Optional[asyncio.Future] = None
# 事件循环 & 线程
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None
try:
self.start()
except Exception as e:
print(f"[CameraController] __init__ auto start failed: {e}", file=sys.stderr)
# ---------------------------------------------------------------------
# 对外方法
# ---------------------------------------------------------------------
def start(self, config: Optional[Dict[str, Any]] = None):
if self._running:
return {"status": "already_running", "host_id": self.host_id}
# 应用 config 覆盖(如果有)
if config:
cfg_host_id = config.get("host_id")
if cfg_host_id:
self.host_id = cfg_host_id
signal_backend_url = config.get("signal_backend_url")
if signal_backend_url:
signal_backend_url = signal_backend_url.rstrip("/")
if not signal_backend_url.endswith("/host"):
signal_backend_url = signal_backend_url + "/host"
self.signal_backend_url = f"{signal_backend_url}/{self.host_id}"
self.rtmp_url = config.get("rtmp_url", self.rtmp_url)
self.webrtc_api = config.get("webrtc_api", self.webrtc_api)
self.webrtc_stream_url = config.get("webrtc_stream_url", self.webrtc_stream_url)
self.video_device = config.get("video_device", self.video_device)
self.width = int(config.get("width", self.width))
self.height = int(config.get("height", self.height))
self.fps = int(config.get("fps", self.fps))
self.video_bitrate = config.get("video_bitrate", self.video_bitrate)
self.audio_device = config.get("audio_device", self.audio_device)
self.audio_bitrate = config.get("audio_bitrate", self.audio_bitrate)
self._running = True
print("[CameraController] start(): starting FFmpeg streaming...", file=sys.stderr)
self._start_ffmpeg()
self._loop = asyncio.new_event_loop()
def loop_runner(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except Exception as e:
print(f"[CameraController] event loop error: {e}", file=sys.stderr)
self._loop_thread = threading.Thread(target=loop_runner, args=(self._loop,), daemon=True)
self._loop_thread.start()
self._loop_task = asyncio.run_coroutine_threadsafe(self._run_main_loop(), self._loop)
return {
"status": "started",
"host_id": self.host_id,
"signal_backend_url": self.signal_backend_url,
"rtmp_url": self.rtmp_url,
"webrtc_api": self.webrtc_api,
"webrtc_stream_url": self.webrtc_stream_url,
"video_device": self.video_device,
"width": self.width,
"height": self.height,
"fps": self.fps,
"video_bitrate": self.video_bitrate,
"audio_device": self.audio_device,
}
def stop(self) -> Dict[str, Any]:
self._running = False
# 先取消主任务(让 ws connect/sleep 尽快退出)
if self._loop_task is not None and not self._loop_task.done():
self._loop_task.cancel()
# 停止推流
self._stop_ffmpeg()
# 关闭 WebSocket在 loop 中执行)
if self._ws and self._loop is not None:
async def close_ws():
try:
await self._ws.close()
except Exception as e:
print(f"[CameraController] error closing WebSocket: {e}", file=sys.stderr)
try:
asyncio.run_coroutine_threadsafe(close_ws(), self._loop)
except Exception:
pass
# 停止事件循环
if self._loop is not None:
try:
self._loop.call_soon_threadsafe(self._loop.stop)
except Exception as e:
print(f"[CameraController] error stopping loop: {e}", file=sys.stderr)
# 等待线程退出
if self._loop_thread is not None:
try:
self._loop_thread.join(timeout=5)
except Exception as e:
print(f"[CameraController] error joining loop thread: {e}", file=sys.stderr)
self._ws = None
self._loop_task = None
self._loop = None
self._loop_thread = None
return {"status": "stopped", "host_id": self.host_id}
def get_status(self) -> Dict[str, Any]:
ws_closed = None
if self._ws is not None:
ws_closed = getattr(self._ws, "closed", None)
if ws_closed is None:
websocket_connected = self._ws is not None
else:
websocket_connected = (self._ws is not None) and (not ws_closed)
return {
"host_id": self.host_id,
"running": self._running,
"websocket_connected": websocket_connected,
"ffmpeg_running": bool(self._ffmpeg_process and self._ffmpeg_process.poll() is None),
"signal_backend_url": self.signal_backend_url,
"rtmp_url": self.rtmp_url,
"video_device": self.video_device,
"width": self.width,
"height": self.height,
"fps": self.fps,
"video_bitrate": self.video_bitrate,
}
# ---------------------------------------------------------------------
# WebSocket / 信令
# ---------------------------------------------------------------------
async def _run_main_loop(self):
print("[CameraController] main loop started", file=sys.stderr)
try:
while self._running:
try:
async with websockets.connect(self.signal_backend_url) as ws:
self._ws = ws
print(f"[CameraController] WebSocket connected: {self.signal_backend_url}", file=sys.stderr)
await self._recv_loop()
except asyncio.CancelledError:
raise
except Exception as e:
if self._running:
print(f"[CameraController] WebSocket connection error: {e}", file=sys.stderr)
await asyncio.sleep(3)
except asyncio.CancelledError:
pass
finally:
print("[CameraController] main loop exited", file=sys.stderr)
async def _recv_loop(self):
assert self._ws is not None
ws = self._ws
async for message in ws:
try:
data = json.loads(message)
except json.JSONDecodeError:
print(f"[CameraController] non-JSON message: {message}", file=sys.stderr)
continue
try:
await self._handle_message(data)
except Exception as e:
print(f"[CameraController] error handling message {data}: {e}", file=sys.stderr)
async def _handle_message(self, data: Dict[str, Any]):
cmd = data.get("command")
if cmd == "start_stream":
self._start_ffmpeg()
return
if cmd == "stop_stream":
self._stop_ffmpeg()
return
if data.get("type") == "offer":
offer_sdp = data.get("sdp", "")
camera_id = data.get("cameraId", "camera-01")
answer_sdp = await self._handle_webrtc_offer(offer_sdp)
if self._ws:
answer_payload = {
"type": "answer",
"sdp": answer_sdp,
"cameraId": camera_id,
"hostId": self.host_id,
}
await self._ws.send(json.dumps(answer_payload))
# ---------------------------------------------------------------------
# FFmpeg 推流V4L2 USB 摄像头)
# ---------------------------------------------------------------------
def _start_ffmpeg(self):
if self._ffmpeg_process and self._ffmpeg_process.poll() is None:
return
# 兼容性优先:不强制输入像素格式;失败再通过外部调整 width/height/fps
video_size = f"{self.width}x{self.height}"
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"warning",
# video input
"-f", "v4l2",
"-framerate", str(self.fps),
"-video_size", video_size,
"-i", self.video_device,
]
# optional audio input
if self.audio_device:
cmd += [
"-f", "alsa",
"-i", self.audio_device,
"-c:a", "aac",
"-b:a", self.audio_bitrate,
"-ar", "44100",
"-ac", "1",
]
else:
cmd += ["-an"]
# video encode + rtmp out
cmd += [
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-profile:v", "baseline",
"-pix_fmt", "yuv420p",
"-b:v", self.video_bitrate,
"-maxrate", self.video_bitrate,
"-bufsize", "2M",
"-g", str(max(self.fps, 10)),
"-keyint_min", str(max(self.fps, 10)),
"-sc_threshold", "0",
"-x264-params", "bframes=0",
"-f", "flv",
self.rtmp_url,
]
print(f"[CameraController] starting FFmpeg: {' '.join(cmd)}", file=sys.stderr)
try:
# 不再丢弃日志,至少能看到 ffmpeg 报错(调试很关键)
self._ffmpeg_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=sys.stderr,
shell=False,
)
except Exception as e:
self._ffmpeg_process = None
print(f"[CameraController] failed to start FFmpeg: {e}", file=sys.stderr)
def _stop_ffmpeg(self):
proc = self._ffmpeg_process
if proc and proc.poll() is None:
try:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
except Exception as e:
print(f"[CameraController] error stopping FFmpeg: {e}", file=sys.stderr)
self._ffmpeg_process = None
# ---------------------------------------------------------------------
# WebRTC offer -> SRS
# ---------------------------------------------------------------------
async def _handle_webrtc_offer(self, offer_sdp: str) -> str:
payload = {
"api": self.webrtc_api,
"streamurl": self.webrtc_stream_url,
"sdp": offer_sdp,
}
headers = {"Content-Type": "application/json"}
def _do_post():
return requests.post(self.webrtc_api, json=payload, headers=headers, timeout=10)
loop = asyncio.get_running_loop()
resp = await loop.run_in_executor(None, _do_post)
resp.raise_for_status()
data = resp.json()
answer_sdp = data.get("sdp", "")
if not answer_sdp:
raise RuntimeError(f"empty SDP from media server: {data}")
return answer_sdp
if __name__ == "__main__":
# 直接运行用于手动测试
c = CameraController(
host_id="demo-host",
video_device="/dev/video0",
width=1280,
height=720,
fps=30,
video_bitrate="1500k",
audio_device=None,
)
try:
while True:
asyncio.sleep(1)
except KeyboardInterrupt:
c.stop()

View File

@@ -1,51 +0,0 @@
#!/usr/bin/env python3
import time
import json
from cameraUSB import CameraController
def main():
# 按你的实际情况改
cfg = dict(
host_id="demo-host",
signal_backend_url="wss://sciol.ac.cn/api/realtime/signal/host",
rtmp_url="rtmp://srs.sciol.ac.cn:4499/live/camera-01",
webrtc_api="https://srs.sciol.ac.cn/rtc/v1/play/",
webrtc_stream_url="webrtc://srs.sciol.ac.cn:4500/live/camera-01",
video_device="/dev/video7",
width=1280,
height=720,
fps=30,
video_bitrate="1500k",
audio_device=None,
)
c = CameraController(**cfg)
# 可选:如果你不想依赖 __init__ 自动 start可以这样显式调用
# c = CameraController(host_id=cfg["host_id"])
# c.start(cfg)
run_seconds = 30 # 测试运行时长
t0 = time.time()
try:
while True:
st = c.get_status()
print(json.dumps(st, ensure_ascii=False, indent=2))
if time.time() - t0 >= run_seconds:
break
time.sleep(2)
except KeyboardInterrupt:
print("Interrupted, stopping...")
finally:
print("Stopping controller...")
c.stop()
print("Done.")
if __name__ == "__main__":
main()

View File

@@ -1,36 +0,0 @@
import cv2
# 推荐把 @ 进行 URL 编码:@ -> %40
RTSP_URL = "rtsp://admin:admin123@192.168.31.164:554/stream1"
OUTPUT_IMAGE = "rtsp_test_frame.jpg"
def main():
print(f"尝试连接 RTSP 流: {RTSP_URL}")
cap = cv2.VideoCapture(RTSP_URL)
if not cap.isOpened():
print("错误:无法打开 RTSP 流,请检查:")
print(" 1. IP/端口是否正确")
print(" 2. 账号密码(尤其是 @ 是否已转成 %40是否正确")
print(" 3. 摄像头是否允许当前主机访问(同一网段、防火墙等)")
return
print("连接成功,开始读取一帧...")
ret, frame = cap.read()
if not ret or frame is None:
print("错误:已连接但未能读取到帧数据(可能是码流未开启或网络抖动)")
cap.release()
return
# 保存当前帧
success = cv2.imwrite(OUTPUT_IMAGE, frame)
cap.release()
if success:
print(f"成功截取一帧并保存为: {OUTPUT_IMAGE}")
else:
print("错误:写入图片失败,请检查磁盘权限/路径")
if __name__ == "__main__":
main()

View File

@@ -1,21 +0,0 @@
# run_camera_push.py
import time
from cameraDriver import CameraController # 这里根据你的文件名调整
if __name__ == "__main__":
controller = CameraController(
host_id="demo-host",
signal_backend_url="wss://sciol.ac.cn/api/realtime/signal/host",
rtmp_url="rtmp://srs.sciol.ac.cn:4499/live/camera-01",
webrtc_api="https://srs.sciol.ac.cn/rtc/v1/play/",
webrtc_stream_url="webrtc://srs.sciol.ac.cn:4500/live/camera-01",
camera_rtsp_url="rtsp://admin:admin123@192.168.31.164:554/stream1",
)
try:
while True:
status = controller.get_status()
print(status)
time.sleep(5)
except KeyboardInterrupt:
controller.stop()

View File

@@ -1,78 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用 CameraController 来测试 PTZ
让摄像头按顺序向下、向上、向左、向右运动几次。
"""
import time
import sys
# 根据你的工程结构修改导入路径:
# 假设 CameraController 定义在 cameraController.py 里
from cameraDriver import CameraController
def main():
# === 根据你的实际情况填 IP、端口、账号密码 ===
ptz_host = "192.168.31.164"
ptz_port = 2020 # 注意要和你单独测试 PTZController 时保持一致
ptz_user = "admin"
ptz_password = "admin123"
# 1. 创建 CameraController 实例
cam = CameraController(
# 其他摄像机相关参数按你类的 __init__ 来补充
ptz_host=ptz_host,
ptz_port=ptz_port,
ptz_user=ptz_user,
ptz_password=ptz_password,
)
# 2. 启动 / 初始化(如果你的 CameraController 有 start(config) 之类的接口)
# 这里给一个最小的 config重点是 PTZ 相关字段
config = {
"ptz_host": ptz_host,
"ptz_port": ptz_port,
"ptz_user": ptz_user,
"ptz_password": ptz_password,
}
try:
cam.start(config)
except Exception as e:
print(f"[TEST] CameraController start() 失败: {e}", file=sys.stderr)
return
# 这里可以判断一下内部 _ptz 是否初始化成功(如果你对 CameraController 做了封装)
if getattr(cam, "_ptz", None) is None:
print("[TEST] CameraController 内部 PTZ 未初始化成功,请检查 ptz_host/port/user/password 配置。", file=sys.stderr)
return
# 3. 依次调用 CameraController 的 PTZ 方法
# 这里假设你在 CameraController 中提供了这几个对外方法:
# ptz_move_down / ptz_move_up / ptz_move_left / ptz_move_right
# 如果你命名不一样,把下面调用名改成你的即可。
print("向下移动(通过 CameraController...")
cam.ptz_move_down(speed=0.5, duration=1.0)
time.sleep(1)
print("向上移动(通过 CameraController...")
cam.ptz_move_up(speed=0.5, duration=1.0)
time.sleep(1)
print("向左移动(通过 CameraController...")
cam.ptz_move_left(speed=0.5, duration=1.0)
time.sleep(1)
print("向右移动(通过 CameraController...")
cam.ptz_move_right(speed=0.5, duration=1.0)
time.sleep(1)
print("测试结束。")
if __name__ == "__main__":
main()

View File

@@ -1,50 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试 cameraDriver.py中的 PTZController 类,让摄像头按顺序运动几次
"""
import time
from cameraDriver import PTZController
def main():
# 根据你的实际情况填 IP、端口、账号密码
host = "192.168.31.164"
port = 80
user = "admin"
password = "admin123"
ptz = PTZController(host=host, port=port, user=user, password=password)
# 1. 连接摄像头
if not ptz.connect():
print("连接 PTZ 失败,检查 IP/用户名/密码/端口。")
return
# 2. 依次测试几个动作
# 每个动作之间 sleep 一下方便观察
print("向下移动...")
ptz.move_down(speed=0.5, duration=1.0)
time.sleep(1)
print("向上移动...")
ptz.move_up(speed=0.5, duration=1.0)
time.sleep(1)
print("向左移动...")
ptz.move_left(speed=0.5, duration=1.0)
time.sleep(1)
print("向右移动...")
ptz.move_right(speed=0.5, duration=1.0)
time.sleep(1)
print("测试结束。")
if __name__ == "__main__":
main()

View File

@@ -989,18 +989,6 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
else: else:
dis_vols = [float(v) for v in dis_vols] dis_vols = [float(v) for v in dis_vols]
# 统一混合次数为标量,防止数组/列表与 int 比较时报错
if mix_times is not None and not isinstance(mix_times, (int, float)):
try:
mix_times = mix_times[0] if len(mix_times) > 0 else None
except Exception:
try:
mix_times = next(iter(mix_times))
except Exception:
pass
if mix_times is not None:
mix_times = int(mix_times)
# 识别传输模式 # 识别传输模式
num_sources = len(sources) num_sources = len(sources)
num_targets = len(targets) num_targets = len(targets)

View File

@@ -5,7 +5,6 @@ import json
import os import os
import socket import socket
import time import time
import uuid
from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal
from pylabrobot.liquid_handling import ( from pylabrobot.liquid_handling import (
@@ -857,30 +856,7 @@ class PRCXI9300Api:
def _raw_request(self, payload: str) -> str: def _raw_request(self, payload: str) -> str:
if self.debug: if self.debug:
# 调试/仿真模式下直接返回可解析的模拟 JSON避免后续 json.loads 报错 return " "
try:
req = json.loads(payload)
method = req.get("MethodName")
except Exception:
method = None
data: Any = True
if method in {"AddSolution"}:
data = str(uuid.uuid4())
elif method in {"AddWorkTabletMatrix", "AddWorkTabletMatrix2"}:
data = {"Success": True, "Message": "debug mock"}
elif method in {"GetErrorCode"}:
data = ""
elif method in {"RemoveErrorCodet", "Reset", "Start", "LoadSolution", "Pause", "Resume", "Stop"}:
data = True
elif method in {"GetStepStateList", "GetStepStatus", "GetStepState"}:
data = []
elif method in {"GetLocation"}:
data = {"X": 0, "Y": 0, "Z": 0}
elif method in {"GetResetStatus"}:
data = False
return json.dumps({"Success": True, "Msg": "debug mock", "Data": data})
with contextlib.closing(socket.socket()) as sock: with contextlib.closing(socket.socket()) as sock:
sock.settimeout(self.timeout) sock.settimeout(self.timeout)
sock.connect((self.host, self.port)) sock.connect((self.host, self.port))

View File

@@ -174,6 +174,35 @@ bioyond_dispensing_station:
title: query_resource_by_name参数 title: query_resource_by_name参数
type: object type: object
type: UniLabJsonCommand type: UniLabJsonCommand
auto-transfer_materials_to_reaction_station:
feedback: {}
goal: {}
goal_default:
target_device_id: null
transfer_groups: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
target_device_id:
type: string
transfer_groups:
type: array
required:
- target_device_id
- transfer_groups
type: object
result: {}
required:
- goal
title: transfer_materials_to_reaction_station参数
type: object
type: UniLabJsonCommand
auto-workflow_sample_locations: auto-workflow_sample_locations:
feedback: {} feedback: {}
goal: {} goal: {}

View File

@@ -1,105 +0,0 @@
cameracontroller_device:
category:
- cameraSII
class:
action_value_mappings:
auto-start:
feedback: {}
goal: {}
goal_default:
config: null
handles: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
config:
type: string
required: []
type: object
result: {}
required:
- goal
title: start参数
type: object
type: UniLabJsonCommand
auto-stop:
feedback: {}
goal: {}
goal_default: {}
handles: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties: {}
required: []
type: object
result: {}
required:
- goal
title: stop参数
type: object
type: UniLabJsonCommand
module: unilabos.devices.cameraSII.cameraUSB:CameraController
status_types:
status: dict
type: python
config_info: []
description: Uni-Lab-OS 摄像头驱动Linux USB 摄像头版,无 PTZ
handles: []
icon: ''
init_param_schema:
config:
properties:
audio_bitrate:
default: 64k
type: string
audio_device:
type: string
fps:
default: 30
type: integer
height:
default: 720
type: integer
host_id:
default: demo-host
type: string
rtmp_url:
default: rtmp://srs.sciol.ac.cn:4499/live/camera-01
type: string
signal_backend_url:
default: wss://sciol.ac.cn/api/realtime/signal/host
type: string
video_bitrate:
default: 1500k
type: string
video_device:
default: /dev/video0
type: string
webrtc_api:
default: https://srs.sciol.ac.cn/rtc/v1/play/
type: string
webrtc_stream_url:
default: webrtc://srs.sciol.ac.cn:4500/live/camera-01
type: string
width:
default: 1280
type: integer
required: []
type: object
data:
properties:
status:
type: object
required:
- status
type: object
registry_type: device
version: 1.0.0

View File

@@ -9333,34 +9333,7 @@ liquid_handler.prcxi:
touch_tip: false touch_tip: false
use_channels: use_channels:
- 0 - 0
handles: handles: {}
input:
- data_key: liquid
data_source: handle
data_type: resource
handler_key: sources
label: sources
- data_key: liquid
data_source: executor
data_type: resource
handler_key: targets
label: targets
- data_key: liquid
data_source: executor
data_type: resource
handler_key: tip_rack
label: tip_rack
output:
- data_key: liquid
data_source: handle
data_type: resource
handler_key: sources_out
label: sources
- data_key: liquid
data_source: executor
data_type: resource
handler_key: targets_out
label: targets
placeholder_keys: placeholder_keys:
sources: unilabos_resources sources: unilabos_resources
targets: unilabos_resources targets: unilabos_resources

View File

@@ -222,7 +222,7 @@ class Registry:
abs_path = Path(path).absolute() abs_path = Path(path).absolute()
resource_path = abs_path / "resources" resource_path = abs_path / "resources"
files = list(resource_path.glob("*/*.yaml")) files = list(resource_path.glob("*/*.yaml"))
logger.trace(f"[UniLab Registry] load resources? {resource_path.exists()}, total: {len(files)}") logger.debug(f"[UniLab Registry] resources: {resource_path.exists()}, total: {len(files)}")
current_resource_number = len(self.resource_type_registry) + 1 current_resource_number = len(self.resource_type_registry) + 1
for i, file in enumerate(files): for i, file in enumerate(files):
with open(file, encoding="utf-8", mode="r") as f: with open(file, encoding="utf-8", mode="r") as f:

View File

@@ -42,7 +42,7 @@ def canonicalize_nodes_data(
Returns: Returns:
ResourceTreeSet: 标准化后的资源树集合 ResourceTreeSet: 标准化后的资源树集合
""" """
print_status(f"{len(nodes)} Resources loaded", "info") print_status(f"{len(nodes)} Resources loaded:", "info")
# 第一步基本预处理处理graphml的label字段 # 第一步基本预处理处理graphml的label字段
outer_host_node_id = None outer_host_node_id = None

View File

@@ -66,8 +66,8 @@ class ResourceDict(BaseModel):
klass: str = Field(alias="class", description="Resource class name") klass: str = Field(alias="class", description="Resource class name")
pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
config: Dict[str, Any] = Field(description="Resource configuration") config: Dict[str, Any] = Field(description="Resource configuration")
data: Dict[str, Any] = Field(description="Resource data, eg: container liquid data") data: Dict[str, Any] = Field(description="Resource data")
extra: Dict[str, Any] = Field(description="Extra data, eg: slot index") extra: Dict[str, Any] = Field(description="Extra data")
@field_serializer("parent_uuid") @field_serializer("parent_uuid")
def _serialize_parent(self, parent_uuid: Optional["ResourceDict"]): def _serialize_parent(self, parent_uuid: Optional["ResourceDict"]):

View File

Before

Width:  |  Height:  |  Size: 148 KiB

After

Width:  |  Height:  |  Size: 148 KiB

View File

Before

Width:  |  Height:  |  Size: 140 KiB

After

Width:  |  Height:  |  Size: 140 KiB

View File

Before

Width:  |  Height:  |  Size: 117 KiB

After

Width:  |  Height:  |  Size: 117 KiB

View File

@@ -0,0 +1,94 @@
import json
import sys
from datetime import datetime
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
import pytest
from unilabos.workflow.common import build_protocol_graph, draw_protocol_graph, draw_protocol_graph_with_ports
ROOT_DIR = Path(__file__).resolve().parents[2]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
def _normalize_steps(data):
normalized = []
for step in data:
action = step.get("action") or step.get("operation")
if not action:
continue
raw_params = step.get("parameters") or step.get("action_args") or {}
params = dict(raw_params)
if "source" in raw_params and "sources" not in raw_params:
params["sources"] = raw_params["source"]
if "target" in raw_params and "targets" not in raw_params:
params["targets"] = raw_params["target"]
description = step.get("description") or step.get("purpose")
step_dict = {"action": action, "parameters": params}
if description:
step_dict["description"] = description
normalized.append(step_dict)
return normalized
def _normalize_labware(data):
labware = {}
for item in data:
reagent_name = item.get("reagent_name")
key = reagent_name or item.get("material_name") or item.get("name")
if not key:
continue
key = str(key)
idx = 1
original_key = key
while key in labware:
idx += 1
key = f"{original_key}_{idx}"
labware[key] = {
"slot": item.get("positions") or item.get("slot"),
"labware": item.get("material_name") or item.get("labware"),
"well": item.get("well", []),
"type": item.get("type", "reagent"),
"role": item.get("role", ""),
"name": key,
}
return labware
@pytest.mark.parametrize("protocol_name", [
"example_bio",
# "bioyond_materials_liquidhandling_1",
"example_prcxi",
])
def test_build_protocol_graph(protocol_name):
data_path = Path(__file__).with_name(f"{protocol_name}.json")
with data_path.open("r", encoding="utf-8") as fp:
d = json.load(fp)
if "workflow" in d and "reagent" in d:
protocol_steps = d["workflow"]
labware_info = d["reagent"]
elif "steps_info" in d and "labware_info" in d:
protocol_steps = _normalize_steps(d["steps_info"])
labware_info = _normalize_labware(d["labware_info"])
else:
raise ValueError("Unsupported protocol format")
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name="PRCXi",
)
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_path = data_path.with_name(f"{protocol_name}_graph_{timestamp}.png")
draw_protocol_graph_with_ports(graph, str(output_path))
print(graph)

View File

@@ -10,7 +10,6 @@ Json = Dict[str, Any]
# ---------------- Graph ---------------- # ---------------- Graph ----------------
class WorkflowGraph: class WorkflowGraph:
"""简单的有向图实现:使用 params 单层参数inputs 内含连线;支持 node-link 导出""" """简单的有向图实现:使用 params 单层参数inputs 内含连线;支持 node-link 导出"""
@@ -22,31 +21,20 @@ class WorkflowGraph:
self.nodes[node_id] = attrs self.nodes[node_id] = attrs
def add_edge(self, source: str, target: str, **attrs): def add_edge(self, source: str, target: str, **attrs):
# 将 source_port/target_port 映射为服务端期望的 source_handle_key/target_handle_key
source_handle_key = attrs.pop("source_port", "") or attrs.pop("source_handle_key", "")
target_handle_key = attrs.pop("target_port", "") or attrs.pop("target_handle_key", "")
edge = { edge = {
"source": source, "source": source,
"target": target, "target": target,
"source_node_uuid": source, "source_node_uuid": source,
"target_node_uuid": target, "target_node_uuid": target,
"source_handle_key": source_handle_key,
"source_handle_io": attrs.pop("source_handle_io", "source"), "source_handle_io": attrs.pop("source_handle_io", "source"),
"target_handle_key": target_handle_key,
"target_handle_io": attrs.pop("target_handle_io", "target"), "target_handle_io": attrs.pop("target_handle_io", "target"),
**attrs, **attrs
} }
self.edges.append(edge) self.edges.append(edge)
def _materialize_wiring_into_inputs( def _materialize_wiring_into_inputs(self, obj: Any, inputs: Dict[str, Any],
self,
obj: Any,
inputs: Dict[str, Any],
variable_sources: Dict[str, Dict[str, Any]], variable_sources: Dict[str, Dict[str, Any]],
target_node_id: str, target_node_id: str, base_path: List[str]):
base_path: List[str],
):
has_var = False has_var = False
def walk(node: Any, path: List[str]): def walk(node: Any, path: List[str]):
@@ -60,12 +48,9 @@ class WorkflowGraph:
if src: if src:
key = ".".join(path) # e.g. "params.foo.bar.0" key = ".".join(path) # e.g. "params.foo.bar.0"
inputs[key] = {"node": src["node_id"], "output": src.get("output_name", "result")} inputs[key] = {"node": src["node_id"], "output": src.get("output_name", "result")}
self.add_edge( self.add_edge(str(src["node_id"]), target_node_id,
str(src["node_id"]),
target_node_id,
source_handle_io=src.get("output_name", "result"), source_handle_io=src.get("output_name", "result"),
target_handle_io=key, target_handle_io=key)
)
return placeholder return placeholder
return {k: walk(v, path + [k]) for k, v in node.items()} return {k: walk(v, path + [k]) for k, v in node.items()}
if isinstance(node, list): if isinstance(node, list):
@@ -75,8 +60,7 @@ class WorkflowGraph:
replaced = walk(obj, base_path[:]) replaced = walk(obj, base_path[:])
return replaced, has_var return replaced, has_var
def add_workflow_node( def add_workflow_node(self,
self,
node_id: int, node_id: int,
*, *,
device_key: Optional[str] = None, # 实例名,如 "ser" device_key: Optional[str] = None, # 实例名,如 "ser"
@@ -87,8 +71,7 @@ class WorkflowGraph:
variable_sources: Dict[str, Dict[str, Any]], variable_sources: Dict[str, Dict[str, Any]],
add_ready_if_no_vars: bool = True, add_ready_if_no_vars: bool = True,
prev_node_id: Optional[int] = None, prev_node_id: Optional[int] = None,
**extra_attrs, **extra_attrs) -> None:
) -> None:
"""添加工作流节点params 单层;自动变量连线与 ready 串联;支持附加属性""" """添加工作流节点params 单层;自动变量连线与 ready 串联;支持附加属性"""
node_id_str = str(node_id) node_id_str = str(node_id)
inputs: Dict[str, Any] = {} inputs: Dict[str, Any] = {}
@@ -117,13 +100,13 @@ class WorkflowGraph:
def to_dict(self) -> List[Dict[str, Any]]: def to_dict(self) -> List[Dict[str, Any]]:
result = [] result = []
for node_id, attrs in self.nodes.items(): for node_id, attrs in self.nodes.items():
node = {"uuid": node_id} node = {"id": node_id}
params = dict(attrs.get("parameters", {}) or {}) params = dict(attrs.get("parameters", {}) or {})
flat = {k: v for k, v in attrs.items() if k != "parameters"} flat = {k: v for k, v in attrs.items() if k != "parameters"}
flat.update(params) flat.update(params)
node.update(flat) node.update(flat)
result.append(node) result.append(node)
return sorted(result, key=lambda n: int(n["uuid"]) if str(n["uuid"]).isdigit() else n["uuid"]) return sorted(result, key=lambda n: int(n["id"]) if str(n["id"]).isdigit() else n["id"])
# node-link 导出(含 edges # node-link 导出(含 edges
def to_node_link_dict(self) -> Dict[str, Any]: def to_node_link_dict(self) -> Dict[str, Any]:
@@ -132,27 +115,12 @@ class WorkflowGraph:
node_attrs = attrs.copy() node_attrs = attrs.copy()
params = node_attrs.pop("parameters", {}) or {} params = node_attrs.pop("parameters", {}) or {}
node_attrs.update(params) node_attrs.update(params)
nodes_list.append({"uuid": node_id, **node_attrs}) nodes_list.append({"id": node_id, **node_attrs})
return { return {"directed": True, "multigraph": False, "graph": {}, "nodes": nodes_list, "edges": self.edges, "links": self.edges}
"directed": True,
"multigraph": False,
"graph": {},
"nodes": nodes_list,
"edges": self.edges,
"links": self.edges,
}
def refactor_data( def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
data: List[Dict[str, Any]], """统一的数据重构函数,根据操作类型自动选择模板"""
action_resource_mapping: Optional[Dict[str, str]] = None,
) -> List[Dict[str, Any]]:
"""统一的数据重构函数,根据操作类型自动选择模板
Args:
data: 原始步骤数据列表
action_resource_mapping: action 到 resource_name 的映射字典,可选
"""
refactored_data = [] refactored_data = []
# 定义操作映射,包含生物实验和有机化学的所有操作 # 定义操作映射,包含生物实验和有机化学的所有操作
@@ -189,67 +157,43 @@ def refactor_data(
times = step.get("times", step.get("parameters", {}).get("times", 1)) times = step.get("times", step.get("parameters", {}).get("times", 1))
sub_steps = step.get("steps", step.get("parameters", {}).get("steps", [])) sub_steps = step.get("steps", step.get("parameters", {}).get("steps", []))
for i in range(int(times)): for i in range(int(times)):
sub_data = refactor_data(sub_steps, action_resource_mapping) sub_data = refactor_data(sub_steps)
refactored_data.extend(sub_data) refactored_data.extend(sub_data)
continue continue
# 获取模板名称 # 获取模板名称
template_name = OPERATION_MAPPING.get(operation) template = OPERATION_MAPPING.get(operation)
if not template_name: if not template:
# 自动推断模板类型 # 自动推断模板类型
if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]: if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]:
template_name = f"biomek-{operation}" template = f"biomek-{operation}"
else: else:
template_name = f"{operation}Protocol" template = f"{operation}Protocol"
# 获取 resource_name
resource_name = f"device.{operation.lower()}"
if action_resource_mapping:
resource_name = action_resource_mapping.get(operation, resource_name)
# 获取步骤编号,生成 name 字段
step_number = step.get("step_number")
name = f"Step {step_number}" if step_number is not None else None
# 创建步骤数据 # 创建步骤数据
step_data = { step_data = {
"template_name": template_name, "template": template,
"resource_name": resource_name,
"description": step.get("description", step.get("purpose", f"{operation} operation")), "description": step.get("description", step.get("purpose", f"{operation} operation")),
"lab_node_type": "Device", "lab_node_type": "Device",
"param": step.get("parameters", step.get("action_args", {})), "parameters": step.get("parameters", step.get("action_args", {})),
"footer": f"{template_name}-{resource_name}",
} }
if name:
step_data["name"] = name
refactored_data.append(step_data) refactored_data.append(step_data)
return refactored_data return refactored_data
def build_protocol_graph( def build_protocol_graph(
labware_info: List[Dict[str, Any]], labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str
protocol_steps: List[Dict[str, Any]],
workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None,
) -> WorkflowGraph: ) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑 """统一的协议图构建函数,根据设备类型自动选择构建逻辑"""
Args:
labware_info: labware 信息字典
protocol_steps: 协议步骤列表
workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选
"""
G = WorkflowGraph() G = WorkflowGraph()
resource_last_writer = {} resource_last_writer = {}
protocol_steps = refactor_data(protocol_steps, action_resource_mapping) protocol_steps = refactor_data(protocol_steps)
# 有机化学&移液站协议图构建 # 有机化学&移液站协议图构建
WORKSTATION_ID = workstation_name WORKSTATION_ID = workstation_name
# 为所有labware创建资源节点 # 为所有labware创建资源节点
res_index = 0
for labware_id, item in labware_info.items(): for labware_id, item in labware_info.items():
# item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}") # item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}")
node_id = str(uuid.uuid4()) node_id = str(uuid.uuid4())
@@ -273,16 +217,13 @@ def build_protocol_graph(
liquid_type = [labware_id] liquid_type = [labware_id]
liquid_volume = [1e5] liquid_volume = [1e5]
res_index += 1
G.add_node( G.add_node(
node_id, node_id,
template_name="create_resource", template_name=f"create_resource",
resource_name="host_node", resource_name="host_node",
name=f"Res {res_index}",
description=description, description=description,
lab_node_type=lab_node_type, lab_node_type=lab_node_type,
footer="create_resource-host_node", params={
param={
"res_id": labware_id, "res_id": labware_id,
"device_id": WORKSTATION_ID, "device_id": WORKSTATION_ID,
"class_name": "container", "class_name": "container",
@@ -293,6 +234,7 @@ def build_protocol_graph(
"liquid_volume": liquid_volume, "liquid_volume": liquid_volume,
"slot_on_deck": "", "slot_on_deck": "",
}, },
role=item.get("role", ""),
) )
resource_last_writer[labware_id] = f"{node_id}:labware" resource_last_writer[labware_id] = f"{node_id}:labware"
@@ -309,7 +251,7 @@ def build_protocol_graph(
last_control_node_id = node_id last_control_node_id = node_id
# 物料流 # 物料流
params = step.get("param", {}) params = step.get("parameters", {})
input_resources_possible_names = [ input_resources_possible_names = [
"vessel", "vessel",
"to_vessel", "to_vessel",
@@ -357,7 +299,7 @@ def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str):
G = nx.DiGraph() G = nx.DiGraph()
for node_id, attrs in protocol_graph.nodes.items(): for node_id, attrs in protocol_graph.nodes.items():
label = attrs.get("description", attrs.get("template_name", node_id[:8])) label = attrs.get("description", attrs.get("template", node_id[:8]))
G.add_node(node_id, label=label, **attrs) G.add_node(node_id, label=label, **attrs)
for edge in protocol_graph.edges: for edge in protocol_graph.edges:
@@ -389,13 +331,11 @@ def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str):
print(f" - Visualization saved to '{output_path}'") print(f" - Visualization saved to '{output_path}'")
COMPASS = {"n", "e", "s", "w", "ne", "nw", "se", "sw", "c"} COMPASS = {"n","e","s","w","ne","nw","se","sw","c"}
def _is_compass(port: str) -> bool: def _is_compass(port: str) -> bool:
return isinstance(port, str) and port.lower() in COMPASS return isinstance(port, str) and port.lower() in COMPASS
def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"): def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"):
""" """
使用 Graphviz 端口语法绘制协议工作流图。 使用 Graphviz 端口语法绘制协议工作流图。
@@ -410,9 +350,9 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 1) 先用 networkx 搭建有向图,保留端口属性 # 1) 先用 networkx 搭建有向图,保留端口属性
G = nx.DiGraph() G = nx.DiGraph()
for node_id, attrs in protocol_graph.nodes.items(): for node_id, attrs in protocol_graph.nodes.items():
label = attrs.get("description", attrs.get("template_name", node_id[:8])) label = attrs.get("description", attrs.get("template", node_id[:8]))
# 保留一个干净的“中心标签”,用于放在 record 的中间槽 # 保留一个干净的“中心标签”,用于放在 record 的中间槽
G.add_node(node_id, _core_label=str(label), **{k: v for k, v in attrs.items() if k not in ("label",)}) G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)})
edges_data = [] edges_data = []
in_ports_by_node = {} # 收集命名输入端口 in_ports_by_node = {} # 收集命名输入端口
@@ -421,11 +361,11 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
for edge in protocol_graph.edges: for edge in protocol_graph.edges:
u = edge["source"] u = edge["source"]
v = edge["target"] v = edge["target"]
sp = edge.get("source_handle_key") or edge.get("source_port") sp = edge.get("source_port")
tp = edge.get("target_handle_key") or edge.get("target_port") tp = edge.get("target_port")
# 记录到图里(保留原始端口信息) # 记录到图里(保留原始端口信息)
G.add_edge(u, v, source_handle_key=sp, target_handle_key=tp) G.add_edge(u, v, source_port=sp, target_port=tp)
edges_data.append((u, v, sp, tp)) edges_data.append((u, v, sp, tp))
# 如果不是 compass就按“命名端口”先归类等会儿给节点造 record # 如果不是 compass就按“命名端口”先归类等会儿给节点造 record
@@ -437,9 +377,7 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 2) 转为 AGraph使用 Graphviz 渲染 # 2) 转为 AGraph使用 Graphviz 渲染
A = to_agraph(G) A = to_agraph(G)
A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10") A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10")
A.node_attr.update( A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica")
shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica"
)
A.edge_attr.update(arrowsize="0.8", color="#666666") A.edge_attr.update(arrowsize="0.8", color="#666666")
# 3) 为需要命名端口的节点设置 record 形状与 label # 3) 为需要命名端口的节点设置 record 形状与 label
@@ -453,7 +391,6 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 如果该节点涉及命名端口,则用 record否则保留原 box # 如果该节点涉及命名端口,则用 record否则保留原 box
if in_ports or out_ports: if in_ports or out_ports:
def port_fields(ports): def port_fields(ports):
if not ports: if not ports:
return " " # 必须留一个空槽占位 return " " # 必须留一个空槽占位
@@ -473,7 +410,7 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 4) 给边设置 headport / tailport # 4) 给边设置 headport / tailport
# - 若端口为 compass直接用 compasse.g., headport="e" # - 若端口为 compass直接用 compasse.g., headport="e"
# - 若端口为命名端口:使用在 record 中定义的 <port> 名(同名即可) # - 若端口为命名端口:使用在 record 中定义的 <port> 名(同名即可)
for u, v, sp, tp in edges_data: for (u, v, sp, tp) in edges_data:
e = A.get_edge(u, v) e = A.get_edge(u, v)
# Graphviz 属性tail 是源head 是目标 # Graphviz 属性tail 是源head 是目标
@@ -482,13 +419,13 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
e.attr["tailport"] = sp.lower() e.attr["tailport"] = sp.lower()
else: else:
# 与 record label 中 <port> 名一致;特殊字符已在 label 中做了清洗 # 与 record label 中 <port> 名一致;特殊字符已在 label 中做了清洗
e.attr["tailport"] = re.sub(r"[^A-Za-z0-9_:.|-]", "_", str(sp)) e.attr["tailport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(sp))
if tp: if tp:
if _is_compass(tp): if _is_compass(tp):
e.attr["headport"] = tp.lower() e.attr["headport"] = tp.lower()
else: else:
e.attr["headport"] = re.sub(r"[^A-Za-z0-9_:.|-]", "_", str(tp)) e.attr["headport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(tp))
# 可选:若想让边更贴边缘,可设置 constraint/spline 等 # 可选:若想让边更贴边缘,可设置 constraint/spline 等
# e.attr["arrowhead"] = "vee" # e.attr["arrowhead"] = "vee"
@@ -496,14 +433,11 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 5) 输出 # 5) 输出
A.draw(output_path, prog="dot") A.draw(output_path, prog="dot")
print(f" - Port-aware workflow rendered to '{output_path}'") print(f" - Port-aware workflow rendered to '{output_path}'")
# ---------------- Registry Adapter ---------------- # ---------------- Registry Adapter ----------------
class RegistryAdapter: class RegistryAdapter:
"""根据 module 的类名(冒号右侧)反查 registry 的 resource_name原 device_class并抽取参数顺序""" """根据 module 的类名(冒号右侧)反查 registry 的 resource_name原 device_class并抽取参数顺序"""
def __init__(self, device_registry: Dict[str, Any]): def __init__(self, device_registry: Dict[str, Any]):
self.device_registry = device_registry or {} self.device_registry = device_registry or {}
self.module_class_to_resource = self._build_module_class_index() self.module_class_to_resource = self._build_module_class_index()
@@ -521,7 +455,8 @@ class RegistryAdapter:
def resolve_resource_by_classname(self, class_name: str) -> Optional[str]: def resolve_resource_by_classname(self, class_name: str) -> Optional[str]:
if not class_name: if not class_name:
return None return None
return self.module_class_to_resource.get(class_name) or self.module_class_to_resource.get(class_name.lower()) return (self.module_class_to_resource.get(class_name)
or self.module_class_to_resource.get(class_name.lower()))
def get_device_module(self, resource_name: Optional[str]) -> Optional[str]: def get_device_module(self, resource_name: Optional[str]) -> Optional[str]:
if not resource_name: if not resource_name:
@@ -531,7 +466,9 @@ class RegistryAdapter:
def get_actions(self, resource_name: Optional[str]) -> Dict[str, Any]: def get_actions(self, resource_name: Optional[str]) -> Dict[str, Any]:
if not resource_name: if not resource_name:
return {} return {}
return (self.device_registry.get(resource_name, {}).get("class", {}).get("action_value_mappings", {})) or {} return (self.device_registry.get(resource_name, {})
.get("class", {})
.get("action_value_mappings", {})) or {}
def get_action_schema(self, resource_name: Optional[str], template_name: str) -> Optional[Json]: def get_action_schema(self, resource_name: Optional[str], template_name: str) -> Optional[Json]:
return (self.get_actions(resource_name).get(template_name) or {}).get("schema") return (self.get_actions(resource_name).get(template_name) or {}).get("schema")

View File

@@ -1,356 +0,0 @@
"""
JSON 工作流转换模块
提供从多种 JSON 格式转换为统一工作流格式的功能。
支持的格式:
1. workflow/reagent 格式
2. steps_info/labware_info 格式
"""
import json
from os import PathLike
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from unilabos.workflow.common import WorkflowGraph, build_protocol_graph
from unilabos.registry.registry import lab_registry
def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List[str]]:
"""
从 registry 获取指定设备和动作的 handles 配置
Args:
resource_name: 设备资源名称,如 "liquid_handler.prcxi"
template_name: 动作模板名称,如 "transfer_liquid"
Returns:
包含 source 和 target handler_keys 的字典:
{"source": ["sources_out", "targets_out", ...], "target": ["sources", "targets", ...]}
"""
result = {"source": [], "target": []}
device_info = lab_registry.device_type_registry.get(resource_name, {})
if not device_info:
return result
action_mappings = device_info.get("class", {}).get("action_value_mappings", {})
action_config = action_mappings.get(template_name, {})
handles = action_config.get("handles", {})
if isinstance(handles, dict):
# 处理 input handles (作为 target)
for handle in handles.get("input", []):
handler_key = handle.get("handler_key", "")
if handler_key:
result["source"].append(handler_key)
# 处理 output handles (作为 source)
for handle in handles.get("output", []):
handler_key = handle.get("handler_key", "")
if handler_key:
result["target"].append(handler_key)
return result
def validate_workflow_handles(graph: WorkflowGraph) -> Tuple[bool, List[str]]:
"""
校验工作流图中所有边的句柄配置是否正确
Args:
graph: 工作流图对象
Returns:
(is_valid, errors): 是否有效,错误信息列表
"""
errors = []
nodes = graph.nodes
for edge in graph.edges:
left_uuid = edge.get("source")
right_uuid = edge.get("target")
# target_handle_key是target, right的输入节点入节点
# source_handle_key是source, left的输出节点出节点
right_source_conn_key = edge.get("target_handle_key", "")
left_target_conn_key = edge.get("source_handle_key", "")
# 获取源节点和目标节点信息
left_node = nodes.get(left_uuid, {})
right_node = nodes.get(right_uuid, {})
left_res_name = left_node.get("resource_name", "")
left_template_name = left_node.get("template_name", "")
right_res_name = right_node.get("resource_name", "")
right_template_name = right_node.get("template_name", "")
# 获取源节点的 output handles
left_node_handles = get_action_handles(left_res_name, left_template_name)
target_valid_keys = left_node_handles.get("target", [])
target_valid_keys.append("ready")
# 获取目标节点的 input handles
right_node_handles = get_action_handles(right_res_name, right_template_name)
source_valid_keys = right_node_handles.get("source", [])
source_valid_keys.append("ready")
# 如果节点配置了 output handles则 source_port 必须有效
if not right_source_conn_key:
node_name = left_node.get("name", left_uuid[:8])
errors.append(f"源节点 '{node_name}' 的 source_handle_key 为空," f"应设置为: {source_valid_keys}")
elif right_source_conn_key not in source_valid_keys:
node_name = left_node.get("name", left_uuid[:8])
errors.append(
f"源节点 '{node_name}' 的 source 端点 '{right_source_conn_key}' 不存在," f"支持的端点: {source_valid_keys}"
)
# 如果节点配置了 input handles则 target_port 必须有效
if not left_target_conn_key:
node_name = right_node.get("name", right_uuid[:8])
errors.append(f"目标节点 '{node_name}' 的 target_handle_key 为空," f"应设置为: {target_valid_keys}")
elif left_target_conn_key not in target_valid_keys:
node_name = right_node.get("name", right_uuid[:8])
errors.append(
f"目标节点 '{node_name}' 的 target 端点 '{left_target_conn_key}' 不存在,"
f"支持的端点: {target_valid_keys}"
)
return len(errors) == 0, errors
# action 到 resource_name 的映射
ACTION_RESOURCE_MAPPING: Dict[str, str] = {
# 生物实验操作
"transfer_liquid": "liquid_handler.prcxi",
"transfer": "liquid_handler.prcxi",
"incubation": "incubator.prcxi",
"move_labware": "labware_mover.prcxi",
"oscillation": "shaker.prcxi",
# 有机化学操作
"HeatChillToTemp": "heatchill.chemputer",
"StopHeatChill": "heatchill.chemputer",
"StartHeatChill": "heatchill.chemputer",
"HeatChill": "heatchill.chemputer",
"Dissolve": "stirrer.chemputer",
"Transfer": "liquid_handler.chemputer",
"Evaporate": "rotavap.chemputer",
"Recrystallize": "reactor.chemputer",
"Filter": "filter.chemputer",
"Dry": "dryer.chemputer",
"Add": "liquid_handler.chemputer",
}
def normalize_steps(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
将不同格式的步骤数据规范化为统一格式
支持的输入格式:
- action + parameters
- action + action_args
- operation + parameters
Args:
data: 原始步骤数据列表
Returns:
规范化后的步骤列表,格式为 [{"action": str, "parameters": dict, "description": str?, "step_number": int?}, ...]
"""
normalized = []
for idx, step in enumerate(data):
# 获取动作名称(支持 action 或 operation 字段)
action = step.get("action") or step.get("operation")
if not action:
continue
# 获取参数(支持 parameters 或 action_args 字段)
raw_params = step.get("parameters") or step.get("action_args") or {}
params = dict(raw_params)
# 规范化 source/target -> sources/targets
if "source" in raw_params and "sources" not in raw_params:
params["sources"] = raw_params["source"]
if "target" in raw_params and "targets" not in raw_params:
params["targets"] = raw_params["target"]
# 获取描述(支持 description 或 purpose 字段)
description = step.get("description") or step.get("purpose")
# 获取步骤编号(优先使用原始数据中的 step_number否则使用索引+1
step_number = step.get("step_number", idx + 1)
step_dict = {"action": action, "parameters": params, "step_number": step_number}
if description:
step_dict["description"] = description
normalized.append(step_dict)
return normalized
def normalize_labware(data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
将不同格式的 labware 数据规范化为统一的字典格式
支持的输入格式:
- reagent_name + material_name + positions
- name + labware + slot
Args:
data: 原始 labware 数据列表
Returns:
规范化后的 labware 字典,格式为 {name: {"slot": int, "labware": str, "well": list, "type": str, "role": str, "name": str}, ...}
"""
labware = {}
for item in data:
# 获取 key 名称(优先使用 reagent_name其次是 material_name 或 name
reagent_name = item.get("reagent_name")
key = reagent_name or item.get("material_name") or item.get("name")
if not key:
continue
key = str(key)
# 处理重复 key自动添加后缀
idx = 1
original_key = key
while key in labware:
idx += 1
key = f"{original_key}_{idx}"
labware[key] = {
"slot": item.get("positions") or item.get("slot"),
"labware": item.get("material_name") or item.get("labware"),
"well": item.get("well", []),
"type": item.get("type", "reagent"),
"role": item.get("role", ""),
"name": key,
}
return labware
def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
validate: bool = True,
) -> WorkflowGraph:
"""
从 JSON 数据或文件转换为 WorkflowGraph
支持的 JSON 格式:
1. {"workflow": [...], "reagent": {...}} - 直接格式
2. {"steps_info": [...], "labware_info": [...]} - 需要规范化的格式
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True
Returns:
WorkflowGraph: 构建好的工作流图
Raises:
ValueError: 不支持的 JSON 格式 或 句柄校验失败
FileNotFoundError: 文件不存在
json.JSONDecodeError: JSON 解析失败
"""
# 处理输入数据
if isinstance(data, (str, PathLike)):
path = Path(data)
if path.exists():
with path.open("r", encoding="utf-8") as fp:
json_data = json.load(fp)
elif isinstance(data, str):
# 尝试作为 JSON 字符串解析
json_data = json.loads(data)
else:
raise FileNotFoundError(f"文件不存在: {data}")
elif isinstance(data, dict):
json_data = data
else:
raise TypeError(f"不支持的数据类型: {type(data)}")
# 根据格式解析数据
if "workflow" in json_data and "reagent" in json_data:
# 格式1: workflow/reagent已经是规范格式
protocol_steps = json_data["workflow"]
labware_info = json_data["reagent"]
elif "steps_info" in json_data and "labware_info" in json_data:
# 格式2: steps_info/labware_info需要规范化
protocol_steps = normalize_steps(json_data["steps_info"])
labware_info = normalize_labware(json_data["labware_info"])
elif "steps" in json_data and "labware" in json_data:
# 格式3: steps/labware另一种常见格式
protocol_steps = normalize_steps(json_data["steps"])
if isinstance(json_data["labware"], list):
labware_info = normalize_labware(json_data["labware"])
else:
labware_info = json_data["labware"]
else:
raise ValueError(
"不支持的 JSON 格式。支持的格式:\n"
"1. {'workflow': [...], 'reagent': {...}}\n"
"2. {'steps_info': [...], 'labware_info': [...]}\n"
"3. {'steps': [...], 'labware': [...]}"
)
# 构建工作流图
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
)
# 校验句柄配置
if validate:
is_valid, errors = validate_workflow_handles(graph)
if not is_valid:
import warnings
for error in errors:
warnings.warn(f"句柄校验警告: {error}")
return graph
def convert_json_to_node_link(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
) -> Dict[str, Any]:
"""
将 JSON 数据转换为 node-link 格式的字典
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
Returns:
Dict: node-link 格式的工作流数据
"""
graph = convert_from_json(data, workstation_name)
return graph.to_node_link_dict()
def convert_json_to_workflow_list(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
) -> List[Dict[str, Any]]:
"""
将 JSON 数据转换为工作流列表格式
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
Returns:
List: 工作流节点列表
"""
graph = convert_from_json(data, workstation_name)
return graph.to_dict()
# 为了向后兼容,保留下划线前缀的别名
_normalize_steps = normalize_steps
_normalize_labware = normalize_labware

View File

@@ -0,0 +1,24 @@
import json
from os import PathLike
from unilabos.workflow.common import build_protocol_graph
def from_labwares_and_steps(data_path: PathLike):
with data_path.open("r", encoding="utf-8") as fp:
d = json.load(fp)
if "workflow" in d and "reagent" in d:
protocol_steps = d["workflow"]
labware_info = d["reagent"]
elif "steps_info" in d and "labware_info" in d:
protocol_steps = _normalize_steps(d["steps_info"])
labware_info = _normalize_labware(d["labware_info"])
else:
raise ValueError("Unsupported protocol format")
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name="PRCXi",
)

View File

@@ -1,138 +0,0 @@
"""
工作流工具模块
提供工作流上传等功能
"""
import json
import os
import uuid
from typing import Any, Dict, List, Optional
from unilabos.utils.banner_print import print_status
def _is_node_link_format(data: Dict[str, Any]) -> bool:
"""检查数据是否为 node-link 格式"""
return "nodes" in data and "edges" in data
def _convert_to_node_link(workflow_file: str, workflow_data: Dict[str, Any]) -> Dict[str, Any]:
"""
将非 node-link 格式的工作流数据转换为 node-link 格式
Args:
workflow_file: 工作流文件路径(用于日志)
workflow_data: 原始工作流数据
Returns:
node-link 格式的工作流数据
"""
from unilabos.workflow.convert_from_json import convert_json_to_node_link
print_status(f"检测到非 node-link 格式,正在转换...", "info")
node_link_data = convert_json_to_node_link(workflow_data)
print_status(f"转换完成", "success")
return node_link_data
def upload_workflow(
workflow_file: str,
workflow_name: Optional[str] = None,
tags: Optional[List[str]] = None,
published: bool = False,
) -> Dict[str, Any]:
"""
上传工作流到服务器
支持的输入格式:
1. node-link 格式: {"nodes": [...], "edges": [...]}
2. workflow/reagent 格式: {"workflow": [...], "reagent": {...}}
3. steps_info/labware_info 格式: {"steps_info": [...], "labware_info": [...]}
4. steps/labware 格式: {"steps": [...], "labware": [...]}
Args:
workflow_file: 工作流文件路径JSON格式
workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
Returns:
Dict: API响应数据
"""
# 延迟导入,避免在配置文件加载之前初始化 http_client
from unilabos.app.web import http_client
if not os.path.exists(workflow_file):
print_status(f"工作流文件不存在: {workflow_file}", "error")
return {"code": -1, "message": f"文件不存在: {workflow_file}"}
# 读取工作流文件
try:
with open(workflow_file, "r", encoding="utf-8") as f:
workflow_data = json.load(f)
except json.JSONDecodeError as e:
print_status(f"工作流文件JSON解析失败: {e}", "error")
return {"code": -1, "message": f"JSON解析失败: {e}"}
# 自动检测并转换格式
if not _is_node_link_format(workflow_data):
try:
workflow_data = _convert_to_node_link(workflow_file, workflow_data)
except Exception as e:
print_status(f"工作流格式转换失败: {e}", "error")
return {"code": -1, "message": f"格式转换失败: {e}"}
# 提取工作流数据
nodes = workflow_data.get("nodes", [])
edges = workflow_data.get("edges", [])
workflow_uuid_val = workflow_data.get("workflow_uuid", str(uuid.uuid4()))
wf_name_from_file = workflow_data.get("workflow_name", os.path.basename(workflow_file).replace(".json", ""))
# 确定工作流名称
final_name = workflow_name or wf_name_from_file
print_status(f"正在上传工作流: {final_name}", "info")
print_status(f" - 节点数量: {len(nodes)}", "info")
print_status(f" - 边数量: {len(edges)}", "info")
print_status(f" - 标签: {tags or []}", "info")
print_status(f" - 发布状态: {published}", "info")
# 调用 http_client 上传
result = http_client.workflow_import(
name=final_name,
workflow_uuid=workflow_uuid_val,
workflow_name=final_name,
nodes=nodes,
edges=edges,
tags=tags,
published=published,
)
if result.get("code") == 0:
data = result.get("data", {})
print_status("工作流上传成功!", "success")
print_status(f" - UUID: {data.get('uuid', 'N/A')}", "info")
print_status(f" - 名称: {data.get('name', 'N/A')}", "info")
else:
print_status(f"工作流上传失败: {result.get('message', '未知错误')}", "error")
return result
def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None:
"""
处理 workflow_upload 子命令
Args:
args_dict: 命令行参数字典
"""
workflow_file = args_dict.get("workflow_file")
workflow_name = args_dict.get("workflow_name")
tags = args_dict.get("tags", [])
published = args_dict.get("published", False)
if workflow_file:
upload_workflow(workflow_file, workflow_name, tags, published)
else:
print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error")