Compare commits

..

5 Commits

Author SHA1 Message Date
Xuwznln
a5b5497dd0 Merge branch 'dev' into feat/samples 2026-02-06 00:49:44 +08:00
Xuwznln
b9d6f71970 Adapt to new scheduler. 2026-02-06 00:48:27 +08:00
Xuwznln
5dda5c61ce fix pump transfer. fix resource update when protocol & ros callback 2026-02-05 23:21:43 +08:00
Xuwznln
1d181743ea adapt to new samples sys 2026-02-05 00:18:19 +08:00
Xuwznln
337789e270 add sample_material 2026-02-04 19:46:12 +08:00
33 changed files with 172 additions and 11111 deletions

View File

@@ -171,12 +171,6 @@ def parse_args():
action="store_true", action="store_true",
help="Disable sending update feedback to server", help="Disable sending update feedback to server",
) )
parser.add_argument(
"--test_mode",
action="store_true",
default=False,
help="Test mode: all actions simulate execution and return mock results without running real hardware",
)
# workflow upload subcommand # workflow upload subcommand
workflow_parser = subparsers.add_parser( workflow_parser = subparsers.add_parser(
"workflow_upload", "workflow_upload",
@@ -210,12 +204,6 @@ def parse_args():
default=False, default=False,
help="Whether to publish the workflow (default: False)", help="Whether to publish the workflow (default: False)",
) )
workflow_parser.add_argument(
"--description",
type=str,
default="",
help="Workflow description, used when publishing the workflow",
)
return parser return parser
@@ -243,60 +231,52 @@ def main():
# 加载配置文件优先加载config然后从env读取 # 加载配置文件优先加载config然后从env读取
config_path = args_dict.get("config") config_path = args_dict.get("config")
# === 解析 working_dir === if check_mode:
# 规则1: working_dir 传入 → 检测 unilabos_data 子目录,已是则不修改 args_dict["working_dir"] = os.path.abspath(os.getcwd())
# 规则2: 仅 config_path 传入 → 用其父目录作为 working_dir # 当 skip_env_check 时,默认使用当前目录作为 working_dir
# 规则4: 两者都传入 → 各用各的,但 working_dir 仍做 unilabos_data 子目录检测 if skip_env_check and not args_dict.get("working_dir") and not config_path:
raw_working_dir = args_dict.get("working_dir")
if raw_working_dir:
working_dir = os.path.abspath(raw_working_dir)
elif config_path and os.path.exists(config_path):
working_dir = os.path.dirname(os.path.abspath(config_path))
else:
working_dir = os.path.abspath(os.getcwd()) working_dir = os.path.abspath(os.getcwd())
print_status(f"跳过环境检查模式:使用当前目录作为工作目录 {working_dir}", "info")
# unilabos_data 子目录自动检测 # 检查当前目录是否有 local_config.py
if os.path.basename(working_dir) != "unilabos_data": local_config_in_cwd = os.path.join(working_dir, "local_config.py")
unilabos_data_sub = os.path.join(working_dir, "unilabos_data") if os.path.exists(local_config_in_cwd):
if os.path.isdir(unilabos_data_sub): config_path = local_config_in_cwd
working_dir = unilabos_data_sub
elif not raw_working_dir and not (config_path and os.path.exists(config_path)):
# 未显式指定路径,默认使用 cwd/unilabos_data
working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
# === 解析 config_path ===
if config_path and not os.path.exists(config_path):
# config_path 传入但不存在,尝试在 working_dir 中查找
candidate = os.path.join(working_dir, "local_config.py")
if os.path.exists(candidate):
config_path = candidate
print_status(f"在工作目录中发现配置文件: {config_path}", "info")
else:
print_status(
f"配置文件 {config_path} 不存在,工作目录 {working_dir} 中也未找到 local_config.py"
f"请通过 --config 传入 local_config.py 文件路径",
"error",
)
os._exit(1)
elif not config_path:
# 规则3: 未传入 config_path尝试 working_dir/local_config.py
candidate = os.path.join(working_dir, "local_config.py")
if os.path.exists(candidate):
config_path = candidate
print_status(f"发现本地配置文件: {config_path}", "info") print_status(f"发现本地配置文件: {config_path}", "info")
else: else:
print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info") print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info")
print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info") elif os.getcwd().endswith("unilabos_data"):
if check_mode or input() != "n": working_dir = os.path.abspath(os.getcwd())
os.makedirs(working_dir, exist_ok=True) else:
config_path = os.path.join(working_dir, "local_config.py") working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
shutil.copy(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), if args_dict.get("working_dir"):
config_path, working_dir = args_dict.get("working_dir", "")
if config_path and not os.path.exists(config_path):
config_path = os.path.join(working_dir, "local_config.py")
if not os.path.exists(config_path):
print_status(
f"当前工作目录 {working_dir} 未找到local_config.py请通过 --config 传入 local_config.py 文件路径",
"error",
) )
print_status(f"已创建 local_config.py 路径: {config_path}", "info")
else:
os._exit(1) os._exit(1)
elif config_path and os.path.exists(config_path):
working_dir = os.path.dirname(config_path)
elif os.path.exists(working_dir) and os.path.exists(os.path.join(working_dir, "local_config.py")):
config_path = os.path.join(working_dir, "local_config.py")
elif not skip_env_check and not config_path and (
not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py"))
):
print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info")
print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info")
if input() != "n":
os.makedirs(working_dir, exist_ok=True)
config_path = os.path.join(working_dir, "local_config.py")
shutil.copy(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), config_path
)
print_status(f"已创建 local_config.py 路径: {config_path}", "info")
else:
os._exit(1)
# 加载配置文件 (check_mode 跳过) # 加载配置文件 (check_mode 跳过)
print_status(f"当前工作目录为 {working_dir}", "info") print_status(f"当前工作目录为 {working_dir}", "info")
@@ -308,9 +288,7 @@ def main():
if hasattr(BasicConfig, "log_level"): if hasattr(BasicConfig, "log_level"):
logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.") logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.")
file_path = configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir) configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
if file_path is not None:
logger.info(f"[LOG_FILE] {file_path}")
if args.addr != parser.get_default("addr"): if args.addr != parser.get_default("addr"):
if args.addr == "test": if args.addr == "test":
@@ -354,9 +332,6 @@ def main():
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
BasicConfig.upload_registry = args_dict.get("upload_registry", False) BasicConfig.upload_registry = args_dict.get("upload_registry", False)
BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False) BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False)
BasicConfig.test_mode = args_dict.get("test_mode", False)
if BasicConfig.test_mode:
print_status("启用测试模式:所有动作将模拟执行,不调用真实硬件", "warning")
BasicConfig.communication_protocol = "websocket" BasicConfig.communication_protocol = "websocket"
machine_name = os.popen("hostname").read().strip() machine_name = os.popen("hostname").read().strip()
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])

View File

@@ -38,9 +38,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[
response = http_client.resource_registry({"resources": list(devices_to_register.values())}) response = http_client.resource_registry({"resources": list(devices_to_register.values())})
cost_time = time.time() - start_time cost_time = time.time() - start_time
if response.status_code in [200, 201]: if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}s") logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}ms")
else: else:
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}s") logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}ms")
except Exception as e: except Exception as e:
logger.error(f"[UniLab Register] 设备注册异常: {e}") logger.error(f"[UniLab Register] 设备注册异常: {e}")
@@ -51,9 +51,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[
response = http_client.resource_registry({"resources": list(resources_to_register.values())}) response = http_client.resource_registry({"resources": list(resources_to_register.values())})
cost_time = time.time() - start_time cost_time = time.time() - start_time
if response.status_code in [200, 201]: if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}s") logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}ms")
else: else:
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}s") logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}ms")
except Exception as e: except Exception as e:
logger.error(f"[UniLab Register] 资源注册异常: {e}") logger.error(f"[UniLab Register] 资源注册异常: {e}")

View File

@@ -343,10 +343,9 @@ class HTTPClient:
edges: List[Dict[str, Any]], edges: List[Dict[str, Any]],
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
published: bool = False, published: bool = False,
description: str = "",
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
导入工作流到服务器,如果 published 为 True则额外发起发布请求 导入工作流到服务器
Args: Args:
name: 工作流名称(顶层) name: 工作流名称(顶层)
@@ -356,7 +355,6 @@ class HTTPClient:
edges: 工作流边列表 edges: 工作流边列表
tags: 工作流标签列表,默认为空列表 tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False published: 是否发布工作流默认为False
description: 工作流描述,发布时使用
Returns: Returns:
Dict: API响应数据包含 code 和 data (uuid, name) Dict: API响应数据包含 code 和 data (uuid, name)
@@ -369,6 +367,7 @@ class HTTPClient:
"nodes": nodes, "nodes": nodes,
"edges": edges, "edges": edges,
"tags": tags if tags is not None else [], "tags": tags if tags is not None else [],
"published": published,
}, },
} }
# 保存请求到文件 # 保存请求到文件
@@ -389,51 +388,11 @@ class HTTPClient:
res = response.json() res = response.json()
if "code" in res and res["code"] != 0: if "code" in res and res["code"] != 0:
logger.error(f"导入工作流失败: {response.text}") logger.error(f"导入工作流失败: {response.text}")
return res
# 导入成功后,如果需要发布则额外发起发布请求
if published:
imported_uuid = res.get("data", {}).get("uuid", workflow_uuid)
publish_res = self.workflow_publish(imported_uuid, description)
res["publish_result"] = publish_res
return res return res
else: else:
logger.error(f"导入工作流失败: {response.status_code}, {response.text}") logger.error(f"导入工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text} return {"code": response.status_code, "message": response.text}
def workflow_publish(self, workflow_uuid: str, description: str = "") -> Dict[str, Any]:
"""
发布工作流
Args:
workflow_uuid: 工作流UUID
description: 工作流描述
Returns:
Dict: API响应数据
"""
payload = {
"uuid": workflow_uuid,
"description": description,
"published": True,
}
logger.info(f"正在发布工作流: {workflow_uuid}")
response = requests.patch(
f"{self.remote_addr}/lab/workflow/owner",
json=payload,
headers={"Authorization": f"Lab {self.auth}"},
timeout=60,
)
if response.status_code == 200:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"发布工作流失败: {response.text}")
else:
logger.info(f"工作流发布成功: {workflow_uuid}")
return res
else:
logger.error(f"发布工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
# 创建默认客户端实例 # 创建默认客户端实例
http_client = HTTPClient() http_client = HTTPClient()

View File

@@ -76,7 +76,6 @@ class JobInfo:
start_time: float start_time: float
last_update_time: float = field(default_factory=time.time) last_update_time: float = field(default_factory=time.time)
ready_timeout: Optional[float] = None # READY状态的超时时间 ready_timeout: Optional[float] = None # READY状态的超时时间
always_free: bool = False # 是否为永久闲置动作(不受排队限制)
def update_timestamp(self): def update_timestamp(self):
"""更新最后更新时间""" """更新最后更新时间"""
@@ -128,15 +127,6 @@ class DeviceActionManager:
# 总是将job添加到all_jobs中 # 总是将job添加到all_jobs中
self.all_jobs[job_info.job_id] = job_info self.all_jobs[job_info.job_id] = job_info
# always_free的动作不受排队限制直接设为READY
if job_info.always_free:
job_info.status = JobStatus.READY
job_info.update_timestamp()
job_info.set_ready_timeout(10)
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.trace(f"[DeviceActionManager] Job {job_log} always_free, start immediately")
return True
# 检查是否有正在执行或准备执行的任务 # 检查是否有正在执行或准备执行的任务
if device_key in self.active_jobs: if device_key in self.active_jobs:
# 有正在执行或准备执行的任务,加入队列 # 有正在执行或准备执行的任务,加入队列
@@ -186,15 +176,11 @@ class DeviceActionManager:
logger.error(f"[DeviceActionManager] Job {job_log} is not in READY status, current: {job_info.status}") logger.error(f"[DeviceActionManager] Job {job_log} is not in READY status, current: {job_info.status}")
return False return False
# always_free的job不需要检查active_jobs # 检查设备上是否是这个job
if not job_info.always_free: if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id:
# 检查设备上是否是这个job job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id: logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}")
job_log = format_job_log( return False
job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name
)
logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}")
return False
# 开始执行任务将状态从READY转换为STARTED # 开始执行任务将状态从READY转换为STARTED
job_info.status = JobStatus.STARTED job_info.status = JobStatus.STARTED
@@ -217,13 +203,6 @@ class DeviceActionManager:
job_info = self.all_jobs[job_id] job_info = self.all_jobs[job_id]
device_key = job_info.device_action_key device_key = job_info.device_action_key
# always_free的job直接清理不影响队列
if job_info.always_free:
job_info.status = JobStatus.ENDED
job_info.update_timestamp()
del self.all_jobs[job_id]
return None
# 移除活跃任务 # 移除活跃任务
if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id: if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id:
del self.active_jobs[device_key] del self.active_jobs[device_key]
@@ -255,14 +234,9 @@ class DeviceActionManager:
return None return None
def get_active_jobs(self) -> List[JobInfo]: def get_active_jobs(self) -> List[JobInfo]:
"""获取所有正在执行的任务(含active_jobs和always_free的STARTED job)""" """获取所有正在执行的任务"""
with self.lock: with self.lock:
jobs = list(self.active_jobs.values()) return list(self.active_jobs.values())
# 补充 always_free 的 STARTED job(它们不在 active_jobs 中)
for job in self.all_jobs.values():
if job.always_free and job.status == JobStatus.STARTED and job not in jobs:
jobs.append(job)
return jobs
def get_queued_jobs(self) -> List[JobInfo]: def get_queued_jobs(self) -> List[JobInfo]:
"""获取所有排队中的任务""" """获取所有排队中的任务"""
@@ -287,14 +261,6 @@ class DeviceActionManager:
job_info = self.all_jobs[job_id] job_info = self.all_jobs[job_id]
device_key = job_info.device_action_key device_key = job_info.device_action_key
# always_free的job直接清理
if job_info.always_free:
job_info.status = JobStatus.ENDED
del self.all_jobs[job_id]
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.trace(f"[DeviceActionManager] Always-free job {job_log} cancelled")
return True
# 如果是正在执行的任务 # 如果是正在执行的任务
if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id: if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id:
# 清理active job状态 # 清理active job状态
@@ -368,18 +334,13 @@ class DeviceActionManager:
timeout_jobs = [] timeout_jobs = []
with self.lock: with self.lock:
# 收集所有需要检查的 READY 任务(active_jobs + always_free READY jobs) # 统计READY状态的任务数量
ready_candidates = list(self.active_jobs.values()) ready_jobs_count = sum(1 for job in self.active_jobs.values() if job.status == JobStatus.READY)
for job in self.all_jobs.values():
if job.always_free and job.status == JobStatus.READY and job not in ready_candidates:
ready_candidates.append(job)
ready_jobs_count = sum(1 for job in ready_candidates if job.status == JobStatus.READY)
if ready_jobs_count > 0: if ready_jobs_count > 0:
logger.trace(f"[DeviceActionManager] Checking {ready_jobs_count} READY jobs for timeout") # type: ignore # noqa: E501 logger.trace(f"[DeviceActionManager] Checking {ready_jobs_count} READY jobs for timeout") # type: ignore # noqa: E501
# 找到所有超时的READY任务只检测不处理 # 找到所有超时的READY任务只检测不处理
for job_info in ready_candidates: for job_info in self.active_jobs.values():
if job_info.is_ready_timeout(): if job_info.is_ready_timeout():
timeout_jobs.append(job_info) timeout_jobs.append(job_info)
job_log = format_job_log( job_log = format_job_log(
@@ -647,24 +608,6 @@ class MessageProcessor:
if host_node: if host_node:
host_node.handle_pong_response(pong_data) host_node.handle_pong_response(pong_data)
def _check_action_always_free(self, device_id: str, action_name: str) -> bool:
"""检查该action是否标记为always_free通过HostNode统一的_action_value_mappings查找"""
try:
host_node = HostNode.get_instance(0)
if not host_node:
return False
# noinspection PyProtectedMember
action_mappings = host_node._action_value_mappings.get(device_id)
if not action_mappings:
return False
# 尝试直接匹配或 auto- 前缀匹配
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
return action_mappings[key].get("always_free", False)
return False
except Exception:
return False
async def _handle_query_action_state(self, data: Dict[str, Any]): async def _handle_query_action_state(self, data: Dict[str, Any]):
"""处理query_action_state消息""" """处理query_action_state消息"""
device_id = data.get("device_id", "") device_id = data.get("device_id", "")
@@ -679,9 +622,6 @@ class MessageProcessor:
device_action_key = f"/devices/{device_id}/{action_name}" device_action_key = f"/devices/{device_id}/{action_name}"
# 检查action是否为always_free
action_always_free = self._check_action_always_free(device_id, action_name)
# 创建任务信息 # 创建任务信息
job_info = JobInfo( job_info = JobInfo(
job_id=job_id, job_id=job_id,
@@ -691,7 +631,6 @@ class MessageProcessor:
device_action_key=device_action_key, device_action_key=device_action_key,
status=JobStatus.QUEUE, status=JobStatus.QUEUE,
start_time=time.time(), start_time=time.time(),
always_free=action_always_free,
) )
# 添加到设备管理器 # 添加到设备管理器
@@ -1184,11 +1123,6 @@ class QueueProcessor:
logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs") logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs")
for job_info in queued_jobs: for job_info in queued_jobs:
# 快照可能已过期:在遍历过程中 end_job() 可能已将此 job 移至 READY
# 此时不应再发送 busy/need_more否则会覆盖已发出的 free=True 通知
if job_info.status != JobStatus.QUEUE:
continue
message = { message = {
"action": "report_action_state", "action": "report_action_state",
"data": { "data": {

View File

@@ -23,7 +23,6 @@ class BasicConfig:
disable_browser = False # 禁止浏览器自动打开 disable_browser = False # 禁止浏览器自动打开
port = 8002 # 本地HTTP服务 port = 8002 # 本地HTTP服务
check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性 check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性
test_mode = False # 测试模式,所有动作不实际执行,返回模拟结果
# 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' # 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG" log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG"
@@ -146,5 +145,5 @@ def load_config(config_path=None):
traceback.print_exc() traceback.print_exc()
exit(1) exit(1)
else: else:
config_path = os.path.join(os.path.dirname(__file__), "example_config.py") config_path = os.path.join(os.path.dirname(__file__), "local_config.py")
load_config(config_path) load_config(config_path)

View File

@@ -21,7 +21,7 @@ from pylabrobot.resources import (
ResourceHolder, ResourceHolder,
Lid, Lid,
Trash, Trash,
Tip, TubeRack, Tip,
) )
from typing_extensions import TypedDict from typing_extensions import TypedDict
@@ -696,13 +696,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。 如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。
""" """
assert issubclass(plate.__class__, Plate) or issubclass(plate.__class__, TubeRack) , f"plate must be a Plate, now: {type(plate)}" assert issubclass(plate.__class__, Plate), "plate must be a Plate"
plate: Union[Plate, TubeRack] plate: Plate = cast(Plate, cast(Resource, plate))
# 根据 well_names 获取对应的 Well 对象 # 根据 well_names 获取对应的 Well 对象
if issubclass(plate.__class__, Plate): wells = [plate.get_well(name) for name in well_names]
wells = [plate.get_well(name) for name in well_names]
elif issubclass(plate.__class__, TubeRack):
wells = [plate.get_tube(name) for name in well_names]
res_volumes = [] res_volumes = []
# 如果 liquid_names 和 volumes 都为空,直接返回 # 如果 liquid_names 和 volumes 都为空,直接返回

View File

@@ -91,7 +91,7 @@ class PRCXI9300Deck(Deck):
""" """
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs): def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs):
super().__init__(size_x, size_y, size_z, name) super().__init__(name, size_x, size_y, size_z)
self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位 self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位
self.slot_locations = [Coordinate(0, 0, 0)] * 16 self.slot_locations = [Coordinate(0, 0, 0)] * 16
@@ -248,15 +248,14 @@ class PRCXI9300TipRack(TipRack):
if ordered_items is not None: if ordered_items is not None:
items = ordered_items items = ordered_items
elif ordering is not None: elif ordering is not None:
# 检查 ordering 中的值类型来决定如何处理: # 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param # 如果是字符串,说明这是位置名称,需要让 TipRack 自己创建 Tip 对象
# - None 值(从第二次往返序列化): 同样只用键创建 ordering_param # 我们只传递位置信息(键),不传递值,使用 ordering 参数
# - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用 if ordering and isinstance(next(iter(ordering.values()), None), str):
first_val = next(iter(ordering.values()), None) if ordering else None # ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TipRack 自己创建 Tip 对象 # 传递 ordering 参数而不是 ordered_items让 TipRack 自己创建 Tip 对象
items = None items = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else: else:
# ordering 的值已经是对象,可以直接使用 # ordering 的值已经是对象,可以直接使用
@@ -398,15 +397,14 @@ class PRCXI9300TubeRack(TubeRack):
items_to_pass = ordered_items items_to_pass = ordered_items
ordering_param = None ordering_param = None
elif ordering is not None: elif ordering is not None:
# 检查 ordering 中的值类型来决定如何处理: # 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param # 如果是字符串,说明这是位置名称,需要让 TubeRack 自己创建 Tube 对象
# - None 值(从第二次往返序列化): 同样只用键创建 ordering_param # 我们只传递位置信息(键),不传递值,使用 ordering 参数
# - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用 if ordering and isinstance(next(iter(ordering.values()), None), str):
first_val = next(iter(ordering.values()), None) if ordering else None # ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象 # 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象
items_to_pass = None items_to_pass = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else: else:
# ordering 的值已经是对象,可以直接使用 # ordering 的值已经是对象,可以直接使用

View File

@@ -19,11 +19,10 @@ from rclpy.node import Node
import re import re
class LiquidHandlerJointPublisher(BaseROS2DeviceNode): class LiquidHandlerJointPublisher(BaseROS2DeviceNode):
def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", registry_name: str = "lh_joint_publisher", **kwargs): def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", **kwargs):
super().__init__( super().__init__(
driver_instance=self, driver_instance=self,
device_id=device_id, device_id=device_id,
registry_name=registry_name,
status_types={}, status_types={},
action_value_mappings={}, action_value_mappings={},
hardware_interface={}, hardware_interface={},

View File

@@ -22,7 +22,7 @@ from threading import Lock, RLock
from typing_extensions import TypedDict from typing_extensions import TypedDict
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.utils.decorator import not_action, always_free from unilabos.utils.decorator import not_action
from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, RETURN_UNILABOS_SAMPLES from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, RETURN_UNILABOS_SAMPLES
@@ -123,8 +123,8 @@ class VirtualWorkbench:
_ros_node: BaseROS2DeviceNode _ros_node: BaseROS2DeviceNode
# 配置常量 # 配置常量
ARM_OPERATION_TIME: float = 2 # 机械臂操作时间(秒) ARM_OPERATION_TIME: float = 3.0 # 机械臂操作时间(秒)
HEATING_TIME: float = 60.0 # 加热时间(秒) HEATING_TIME: float = 10.0 # 加热时间(秒)
NUM_HEATING_STATIONS: int = 3 # 加热台数量 NUM_HEATING_STATIONS: int = 3 # 加热台数量
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs): def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
@@ -141,9 +141,9 @@ class VirtualWorkbench:
self.data: Dict[str, Any] = {} self.data: Dict[str, Any] = {}
# 从config中获取可配置参数 # 从config中获取可配置参数
self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", self.ARM_OPERATION_TIME)) self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", 3.0))
self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME)) self.HEATING_TIME = float(self.config.get("heating_time", 10.0))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS)) self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", 3))
# 机械臂状态和锁 (使用threading.Lock) # 机械臂状态和锁 (使用threading.Lock)
self._arm_lock = Lock() self._arm_lock = Lock()
@@ -431,7 +431,6 @@ class VirtualWorkbench:
sample_uuid, content in sample_uuids.items()] sample_uuid, content in sample_uuids.items()]
} }
@always_free
def start_heating( def start_heating(
self, self,
sample_uuids: SampleUUIDsType, sample_uuids: SampleUUIDsType,
@@ -502,21 +501,10 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}开始加热{material_id}") self._update_data_status(f"加热台{station_id}开始加热{material_id}")
# 打印当前所有正在加热的台位 # 模拟加热过程 (10秒)
with self._stations_lock:
heating_list = [
f"加热台{sid}:{s.current_material}"
for sid, s in self._heating_stations.items()
if s.state == HeatingStationState.HEATING and s.current_material
]
self.logger.info(f"[并行加热] 当前同时加热中: {', '.join(heating_list)}")
# 模拟加热过程
start_time = time.time() start_time = time.time()
last_countdown_log = start_time
while True: while True:
elapsed = time.time() - start_time elapsed = time.time() - start_time
remaining = max(0.0, self.HEATING_TIME - elapsed)
progress = min(100.0, (elapsed / self.HEATING_TIME) * 100) progress = min(100.0, (elapsed / self.HEATING_TIME) * 100)
with self._stations_lock: with self._stations_lock:
@@ -524,11 +512,6 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%") self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%")
# 每5秒打印一次倒计时
if time.time() - last_countdown_log >= 5.0:
self.logger.info(f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s")
last_countdown_log = time.time()
if elapsed >= self.HEATING_TIME: if elapsed >= self.HEATING_TIME:
break break

View File

@@ -96,13 +96,10 @@ serial:
type: string type: string
port: port:
type: string type: string
registry_name:
type: string
resource_tracker: resource_tracker:
type: object type: object
required: required:
- device_id - device_id
- registry_name
- port - port
type: object type: object
data: data:

View File

@@ -67,9 +67,6 @@ camera:
period: period:
default: 0.1 default: 0.1
type: number type: number
registry_name:
default: ''
type: string
resource_tracker: resource_tracker:
type: object type: object
required: [] required: []

View File

@@ -6090,7 +6090,6 @@ virtual_workbench:
type: object type: object
type: UniLabJsonCommand type: UniLabJsonCommand
auto-start_heating: auto-start_heating:
always_free: true
feedback: {} feedback: {}
goal: {} goal: {}
goal_default: goal_default:

View File

@@ -5,7 +5,6 @@ import sys
import inspect import inspect
import importlib import importlib
import threading import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Union, Tuple from typing import Any, Dict, List, Union, Tuple
@@ -89,14 +88,6 @@ class Registry:
) )
test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。" test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。"
test_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("test_resource", {})
test_resource_schema = self._generate_unilab_json_command_schema(
test_resource_method_info.get("args", []),
"test_resource",
test_resource_method_info.get("return_annotation"),
)
test_resource_schema["description"] = "用于测试物料、设备和样本。"
self.device_type_registry.update( self.device_type_registry.update(
{ {
"host_node": { "host_node": {
@@ -198,7 +189,32 @@ class Registry:
"goal": {}, "goal": {},
"feedback": {}, "feedback": {},
"result": {}, "result": {},
"schema": test_resource_schema, "schema": {
"description": "",
"properties": {
"feedback": {},
"goal": {
"properties": {
"resource": ros_message_to_json_schema(Resource, "resource"),
"resources": {
"items": {
"properties": ros_message_to_json_schema(
Resource, "resources"
),
"type": "object",
},
"type": "array",
},
"device": {"type": "string"},
"devices": {"items": {"type": "string"}, "type": "array"},
},
"type": "object",
},
"result": {},
},
"title": "test_resource",
"type": "object",
},
"placeholder_keys": { "placeholder_keys": {
"device": "unilabos_devices", "device": "unilabos_devices",
"devices": "unilabos_devices", "devices": "unilabos_devices",
@@ -822,7 +838,6 @@ class Registry:
("list", "unilabos.registry.placeholder_type:DeviceSlot"), ("list", "unilabos.registry.placeholder_type:DeviceSlot"),
] ]
}, },
**({"always_free": True} if v.get("always_free") else {}),
} }
for k, v in enhanced_info["action_methods"].items() for k, v in enhanced_info["action_methods"].items()
if k not in device_config["class"]["action_value_mappings"] if k not in device_config["class"]["action_value_mappings"]
@@ -928,7 +943,6 @@ class Registry:
if is_valid: if is_valid:
results.append((file, data, device_ids)) results.append((file, data, device_ids))
except Exception as e: except Exception as e:
traceback.print_exc()
logger.warning(f"[UniLab Registry] 处理设备文件异常: {file}, 错误: {e}") logger.warning(f"[UniLab Registry] 处理设备文件异常: {file}, 错误: {e}")
# 线程安全地更新注册表 # 线程安全地更新注册表

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -38,52 +38,24 @@ class LabSample(TypedDict):
extra: Dict[str, Any] extra: Dict[str, Any]
class ResourceDictPositionSizeType(TypedDict):
depth: float
width: float
height: float
class ResourceDictPositionSize(BaseModel): class ResourceDictPositionSize(BaseModel):
depth: float = Field(description="Depth", default=0.0) # z depth: float = Field(description="Depth", default=0.0) # z
width: float = Field(description="Width", default=0.0) # x width: float = Field(description="Width", default=0.0) # x
height: float = Field(description="Height", default=0.0) # y height: float = Field(description="Height", default=0.0) # y
class ResourceDictPositionScaleType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionScale(BaseModel): class ResourceDictPositionScale(BaseModel):
x: float = Field(description="x scale", default=0.0) x: float = Field(description="x scale", default=0.0)
y: float = Field(description="y scale", default=0.0) y: float = Field(description="y scale", default=0.0)
z: float = Field(description="z scale", default=0.0) z: float = Field(description="z scale", default=0.0)
class ResourceDictPositionObjectType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionObject(BaseModel): class ResourceDictPositionObject(BaseModel):
x: float = Field(description="X coordinate", default=0.0) x: float = Field(description="X coordinate", default=0.0)
y: float = Field(description="Y coordinate", default=0.0) y: float = Field(description="Y coordinate", default=0.0)
z: float = Field(description="Z coordinate", default=0.0) z: float = Field(description="Z coordinate", default=0.0)
class ResourceDictPositionType(TypedDict):
size: ResourceDictPositionSizeType
scale: ResourceDictPositionScaleType
layout: Literal["2d", "x-y", "z-y", "x-z"]
position: ResourceDictPositionObjectType
position3d: ResourceDictPositionObjectType
rotation: ResourceDictPositionObjectType
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"]
class ResourceDictPosition(BaseModel): class ResourceDictPosition(BaseModel):
size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize) size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize)
scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale) scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale)
@@ -102,24 +74,6 @@ class ResourceDictPosition(BaseModel):
) )
class ResourceDictType(TypedDict):
id: str
uuid: str
name: str
description: str
resource_schema: Dict[str, Any]
model: Dict[str, Any]
icon: str
parent_uuid: Optional[str]
parent: Optional["ResourceDictType"]
type: Union[Literal["device"], str]
klass: str
pose: ResourceDictPositionType
config: Dict[str, Any]
data: Dict[str, Any]
extra: Dict[str, Any]
# 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化 # 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化
class ResourceDict(BaseModel): class ResourceDict(BaseModel):
id: str = Field(description="Resource ID") id: str = Field(description="Resource ID")

View File

@@ -44,7 +44,8 @@ def ros2_device_node(
# 从属性中自动发现可发布状态 # 从属性中自动发现可发布状态
if status_types is None: if status_types is None:
status_types = {} status_types = {}
assert device_config is not None, "device_config cannot be None" if device_config is None:
raise ValueError("device_config cannot be None")
if action_value_mappings is None: if action_value_mappings is None:
action_value_mappings = {} action_value_mappings = {}
if hardware_interface is None: if hardware_interface is None:

View File

@@ -146,7 +146,7 @@ def init_wrapper(
device_id: str, device_id: str,
device_uuid: str, device_uuid: str,
driver_class: type[T], driver_class: type[T],
device_config: ResourceDictInstance, device_config: ResourceTreeInstance,
status_types: Dict[str, Any], status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any], action_value_mappings: Dict[str, Any],
hardware_interface: Dict[str, Any], hardware_interface: Dict[str, Any],
@@ -279,7 +279,6 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self, self,
driver_instance: T, driver_instance: T,
device_id: str, device_id: str,
registry_name: str,
device_uuid: str, device_uuid: str,
status_types: Dict[str, Any], status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any], action_value_mappings: Dict[str, Any],
@@ -301,7 +300,6 @@ class BaseROS2DeviceNode(Node, Generic[T]):
""" """
self.driver_instance = driver_instance self.driver_instance = driver_instance
self.device_id = device_id self.device_id = device_id
self.registry_name = registry_name
self.uuid = device_uuid self.uuid = device_uuid
self.publish_high_frequency = False self.publish_high_frequency = False
self.callback_group = ReentrantCallbackGroup() self.callback_group = ReentrantCallbackGroup()
@@ -418,9 +416,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer): if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer):
# noinspection PyTypeChecker # noinspection PyTypeChecker
container_instance: RegularContainer = rts_plr_instances[0] container_instance: RegularContainer = rts_plr_instances[0]
found_resources = self.resource_tracker.figure_resource( found_resources = self.resource_tracker.figure_resource({"name": container_instance.name}, try_mode=True)
{"name": container_instance.name}, try_mode=True
)
if not len(found_resources): if not len(found_resources):
self.resource_tracker.add_resource(container_instance) self.resource_tracker.add_resource(container_instance)
logger.info(f"添加物料{container_instance.name}到资源跟踪器") logger.info(f"添加物料{container_instance.name}到资源跟踪器")
@@ -460,7 +456,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
} }
res.response = json.dumps(final_response) res.response = json.dumps(final_response)
# 如果driver自己就有assign的方法那就使用driver自己的assign方法 # 如果driver自己就有assign的方法那就使用driver自己的assign方法
if hasattr(self.driver_instance, "create_resource") and self.node_name != "host_node": if hasattr(self.driver_instance, "create_resource"):
create_resource_func = getattr(self.driver_instance, "create_resource") create_resource_func = getattr(self.driver_instance, "create_resource")
try: try:
ret = create_resource_func( ret = create_resource_func(
@@ -1156,7 +1152,6 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"machine_name": BasicConfig.machine_name, "machine_name": BasicConfig.machine_name,
"type": "slave", "type": "slave",
"edge_device_id": self.device_id, "edge_device_id": self.device_id,
"registry_name": self.registry_name,
} }
}, },
ensure_ascii=False, ensure_ascii=False,
@@ -1631,7 +1626,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
else: else:
resolved_sample_uuids[sample_uuid] = material_uuid resolved_sample_uuids[sample_uuid] = material_uuid
function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids
self.lab_logger().debug(f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}") self.lab_logger().debug(
f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}"
)
continue continue
# 处理单个 ResourceSlot # 处理单个 ResourceSlot
@@ -2008,7 +2005,6 @@ class ROS2DeviceNode:
if driver_is_ros: if driver_is_ros:
driver_params["device_id"] = device_id driver_params["device_id"] = device_id
driver_params["registry_name"] = device_config.res_content.klass
driver_params["resource_tracker"] = self.resource_tracker driver_params["resource_tracker"] = self.resource_tracker
self._driver_instance = self._driver_creator.create_instance(driver_params) self._driver_instance = self._driver_creator.create_instance(driver_params)
if self._driver_instance is None: if self._driver_instance is None:
@@ -2026,7 +2022,6 @@ class ROS2DeviceNode:
children=children, children=children,
driver_instance=self._driver_instance, # type: ignore driver_instance=self._driver_instance, # type: ignore
device_id=device_id, device_id=device_id,
registry_name=device_config.res_content.klass,
device_uuid=device_uuid, device_uuid=device_uuid,
status_types=status_types, status_types=status_types,
action_value_mappings=action_value_mappings, action_value_mappings=action_value_mappings,
@@ -2038,7 +2033,6 @@ class ROS2DeviceNode:
self._ros_node = BaseROS2DeviceNode( self._ros_node = BaseROS2DeviceNode(
driver_instance=self._driver_instance, driver_instance=self._driver_instance,
device_id=device_id, device_id=device_id,
registry_name=device_config.res_content.klass,
device_uuid=device_uuid, device_uuid=device_uuid,
status_types=status_types, status_types=status_types,
action_value_mappings=action_value_mappings, action_value_mappings=action_value_mappings,
@@ -2047,7 +2041,6 @@ class ROS2DeviceNode:
resource_tracker=self.resource_tracker, resource_tracker=self.resource_tracker,
) )
self._ros_node: BaseROS2DeviceNode self._ros_node: BaseROS2DeviceNode
# 将注册表类型名传递给BaseROS2DeviceNode,用于slave上报
self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}") self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}")
self.driver_instance._ros_node = self._ros_node # type: ignore self.driver_instance._ros_node = self._ros_node # type: ignore
self.driver_instance._execute_driver_command = self._ros_node._execute_driver_command # type: ignore self.driver_instance._execute_driver_command = self._ros_node._execute_driver_command # type: ignore

View File

@@ -6,13 +6,12 @@ from cv_bridge import CvBridge
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker
class VideoPublisher(BaseROS2DeviceNode): class VideoPublisher(BaseROS2DeviceNode):
def __init__(self, device_id='video_publisher', registry_name="", device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None): def __init__(self, device_id='video_publisher', device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None):
# 初始化BaseROS2DeviceNode使用自身作为driver_instance # 初始化BaseROS2DeviceNode使用自身作为driver_instance
BaseROS2DeviceNode.__init__( BaseROS2DeviceNode.__init__(
self, self,
driver_instance=self, driver_instance=self,
device_id=device_id, device_id=device_id,
registry_name=registry_name,
device_uuid=device_uuid, device_uuid=device_uuid,
status_types={}, status_types={},
action_value_mappings={}, action_value_mappings={},

View File

@@ -10,7 +10,6 @@ class ControllerNode(BaseROS2DeviceNode):
def __init__( def __init__(
self, self,
device_id: str, device_id: str,
registry_name: str,
controller_func: Callable, controller_func: Callable,
update_rate: float, update_rate: float,
inputs: Dict[str, Dict[str, type | str]], inputs: Dict[str, Dict[str, type | str]],
@@ -52,7 +51,6 @@ class ControllerNode(BaseROS2DeviceNode):
self, self,
driver_instance=self, driver_instance=self,
device_id=device_id, device_id=device_id,
registry_name=registry_name,
status_types=status_types, status_types=status_types,
action_value_mappings=action_value_mappings, action_value_mappings=action_value_mappings,
hardware_interface=hardware_interface, hardware_interface=hardware_interface,

View File

@@ -35,7 +35,7 @@ from unilabos.resources.resource_tracker import (
ResourceTreeInstance, ResourceTreeInstance,
RETURN_UNILABOS_SAMPLES, RETURN_UNILABOS_SAMPLES,
JSON_UNILABOS_PARAM, JSON_UNILABOS_PARAM,
PARAM_SAMPLE_UUIDS, SampleUUIDsType, LabSample, PARAM_SAMPLE_UUIDS,
) )
from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import ( from unilabos.ros.msgs.message_converter import (
@@ -51,7 +51,6 @@ from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.log import warning from unilabos.utils.log import warning
from unilabos.utils.type_check import serialize_result_info from unilabos.utils.type_check import serialize_result_info
from unilabos.config.config import BasicConfig
if TYPE_CHECKING: if TYPE_CHECKING:
from unilabos.app.ws_client import QueueItem from unilabos.app.ws_client import QueueItem
@@ -64,8 +63,7 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict): class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]] resources: List[List[ResourceDict]]
devices: List[Dict[str, Any]] devices: List[DeviceSlot]
unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict): class TestLatencyReturn(TypedDict):
@@ -250,7 +248,6 @@ class HostNode(BaseROS2DeviceNode):
self, self,
driver_instance=self, driver_instance=self,
device_id=device_id, device_id=device_id,
registry_name="host_node",
device_uuid=host_node_dict["uuid"], device_uuid=host_node_dict["uuid"],
status_types={}, status_types={},
action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"], action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"],
@@ -305,8 +302,7 @@ class HostNode(BaseROS2DeviceNode):
} # 用来存储多个ActionClient实例 } # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = ( self._action_value_mappings: Dict[str, Dict] = (
{} {}
) # device_id -> action_value_mappings(本地+远程设备统一存储) ) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings)
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态 self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备 self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
self._last_discovery_time = 0.0 # 上次设备发现的时间 self._last_discovery_time = 0.0 # 上次设备发现的时间
@@ -640,8 +636,6 @@ class HostNode(BaseROS2DeviceNode):
self.device_machine_names[device_id] = "本地" self.device_machine_names[device_id] = "本地"
self.devices_instances[device_id] = d self.devices_instances[device_id] = d
# noinspection PyProtectedMember # noinspection PyProtectedMember
self._action_value_mappings[device_id] = d._ros_node._action_value_mappings
# noinspection PyProtectedMember
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items(): for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith( if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith(
"UniLabJsonCommand" "UniLabJsonCommand"
@@ -778,17 +772,6 @@ class HostNode(BaseROS2DeviceNode):
u = uuid.UUID(item.job_id) u = uuid.UUID(item.job_id)
device_id = item.device_id device_id = item.device_id
action_name = item.action_name action_name = item.action_name
if BasicConfig.test_mode:
action_id = f"/devices/{device_id}/{action_name}"
self.lab_logger().info(
f"[TEST MODE] 模拟执行: {action_id} (job={item.job_id[:8]}), 参数: {str(action_kwargs)[:500]}"
)
# 根据注册表 handles 构建模拟返回值
mock_return = self._build_test_mode_return(device_id, action_name, action_kwargs)
self._handle_test_mode_result(item, action_id, mock_return)
return
if action_type.startswith("UniLabJsonCommand"): if action_type.startswith("UniLabJsonCommand"):
if action_name.startswith("auto-"): if action_name.startswith("auto-"):
action_name = action_name[5:] action_name = action_name[5:]
@@ -826,51 +809,6 @@ class HostNode(BaseROS2DeviceNode):
) )
future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f)) future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f))
def _build_test_mode_return(
self, device_id: str, action_name: str, action_kwargs: Dict[str, Any]
) -> Dict[str, Any]:
"""
根据注册表 handles 的 output 定义构建测试模式的模拟返回值
根据 data_key 中 @flatten 的层数决定嵌套数组层数,叶子值为空字典。
例如: "vessel"{}, "plate.@flatten" → [{}], "a.@flatten.@flatten" → [[{}]]
"""
mock_return: Dict[str, Any] = {"test_mode": True, "action_name": action_name}
action_mappings = self._action_value_mappings.get(device_id, {})
action_mapping = action_mappings.get(action_name, {})
handles = action_mapping.get("handles", {})
if isinstance(handles, dict):
for output_handle in handles.get("output", []):
data_key = output_handle.get("data_key", "")
handler_key = output_handle.get("handler_key", "")
# 根据 @flatten 层数构建嵌套数组,叶子为空字典
flatten_count = data_key.count("@flatten")
value: Any = {}
for _ in range(flatten_count):
value = [value]
mock_return[handler_key] = value
return mock_return
def _handle_test_mode_result(
self, item: "QueueItem", action_id: str, mock_return: Dict[str, Any]
) -> None:
"""
测试模式下直接构建结果并走正常的结果回调流程(跳过 ROS
"""
job_id = item.job_id
status = "success"
return_info = serialize_result_info("", True, mock_return)
self.lab_logger().info(f"[TEST MODE] Result for {action_id} ({job_id[:8]}): {status}")
from unilabos.app.web.controller import store_job_result
store_job_result(job_id, status, return_info, mock_return)
# 发布状态到桥接器
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(mock_return, item, status, return_info)
def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None: def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""目标响应回调""" """目标响应回调"""
goal_handle = future.result() goal_handle = future.result()
@@ -1230,10 +1168,6 @@ class HostNode(BaseROS2DeviceNode):
def _node_info_update_callback(self, request, response): def _node_info_update_callback(self, request, response):
""" """
更新节点信息回调 更新节点信息回调
处理两种消息:
1. 首次上报(main_slave_run): 带 devices_config + registry_config,存储 action_value_mappings
2. 设备重注册(SYNC_SLAVE_NODE_INFO): 带 edge_device_id + registry_name,用 registry_name 索引已存储的 mappings
""" """
self.lab_logger().trace(f"[Host Node] Node info update request received: {request}") self.lab_logger().trace(f"[Host Node] Node info update request received: {request}")
try: try:
@@ -1245,48 +1179,12 @@ class HostNode(BaseROS2DeviceNode):
info = info["SYNC_SLAVE_NODE_INFO"] info = info["SYNC_SLAVE_NODE_INFO"]
machine_name = info["machine_name"] machine_name = info["machine_name"]
edge_device_id = info["edge_device_id"] edge_device_id = info["edge_device_id"]
registry_name = info.get("registry_name", "")
self.device_machine_names[edge_device_id] = machine_name self.device_machine_names[edge_device_id] = machine_name
# 用 registry_name 索引已存储的 registry_config,获取 action_value_mappings
if registry_name and registry_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[registry_name].get(
"class", {}
).get("action_value_mappings", {})
if action_mappings:
self._action_value_mappings[edge_device_id] = action_mappings
self.lab_logger().info(
f"[Host Node] Loaded {len(action_mappings)} action mappings "
f"for remote device {edge_device_id} (registry: {registry_name})"
)
else: else:
devices_config = info.pop("devices_config") devices_config = info.pop("devices_config")
registry_config = info.pop("registry_config") registry_config = info.pop("registry_config")
if registry_config: if registry_config:
http_client.resource_registry({"resources": registry_config}) http_client.resource_registry({"resources": registry_config})
# 存储 slave 的 registry_config,用于后续 SYNC_SLAVE_NODE_INFO 索引
for reg_name, reg_data in registry_config.items():
if isinstance(reg_data, dict) and "class" in reg_data:
self._slave_registry_configs[reg_name] = reg_data
# 解析 devices_config,建立 device_id -> action_value_mappings 映射
if devices_config:
for device_tree in devices_config:
for device_dict in device_tree:
device_id = device_dict.get("id", "")
class_name = device_dict.get("class", "")
if device_id and class_name and class_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[class_name].get(
"class", {}
).get("action_value_mappings", {})
if action_mappings:
self._action_value_mappings[device_id] = action_mappings
self.lab_logger().info(
f"[Host Node] Stored {len(action_mappings)} action mappings "
f"for remote device {device_id} (class: {class_name})"
)
self.lab_logger().debug(f"[Host Node] Node info update: {info}") self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK" response.response = "OK"
except Exception as e: except Exception as e:
@@ -1583,7 +1481,6 @@ class HostNode(BaseROS2DeviceNode):
def test_resource( def test_resource(
self, self,
sample_uuids: SampleUUIDsType,
resource: ResourceSlot = None, resource: ResourceSlot = None,
resources: List[ResourceSlot] = None, resources: List[ResourceSlot] = None,
device: DeviceSlot = None, device: DeviceSlot = None,
@@ -1598,7 +1495,6 @@ class HostNode(BaseROS2DeviceNode):
return { return {
"resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(), "resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(),
"devices": [device, *devices], "devices": [device, *devices],
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
} }
def handle_pong_response(self, pong_data: dict): def handle_pong_response(self, pong_data: dict):

View File

@@ -7,11 +7,10 @@ from rclpy.callback_groups import ReentrantCallbackGroup
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class JointRepublisher(BaseROS2DeviceNode): class JointRepublisher(BaseROS2DeviceNode):
def __init__(self,device_id, registry_name, resource_tracker, **kwargs): def __init__(self,device_id,resource_tracker, **kwargs):
super().__init__( super().__init__(
driver_instance=self, driver_instance=self,
device_id=device_id, device_id=device_id,
registry_name=registry_name,
status_types={}, status_types={},
action_value_mappings={}, action_value_mappings={},
hardware_interface={}, hardware_interface={},

View File

@@ -26,7 +26,7 @@ from unilabos.resources.graphio import initialize_resources
from unilabos.registry.registry import lab_registry from unilabos.registry.registry import lab_registry
class ResourceMeshManager(BaseROS2DeviceNode): class ResourceMeshManager(BaseROS2DeviceNode):
def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", registry_name: str = "", rate=50, **kwargs): def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", rate=50, **kwargs):
"""初始化资源网格管理器节点 """初始化资源网格管理器节点
Args: Args:
@@ -37,7 +37,6 @@ class ResourceMeshManager(BaseROS2DeviceNode):
super().__init__( super().__init__(
driver_instance=self, driver_instance=self,
device_id=device_id, device_id=device_id,
registry_name=registry_name,
status_types={}, status_types={},
action_value_mappings={}, action_value_mappings={},
hardware_interface={}, hardware_interface={},

View File

@@ -7,7 +7,7 @@ from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeRe
class ROS2SerialNode(BaseROS2DeviceNode): class ROS2SerialNode(BaseROS2DeviceNode):
def __init__(self, device_id, registry_name, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None): def __init__(self, device_id, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None):
# 保存属性,以便在调用父类初始化前使用 # 保存属性,以便在调用父类初始化前使用
self.port = port self.port = port
self.baudrate = baudrate self.baudrate = baudrate
@@ -28,7 +28,6 @@ class ROS2SerialNode(BaseROS2DeviceNode):
BaseROS2DeviceNode.__init__( BaseROS2DeviceNode.__init__(
self, self,
driver_instance=self, driver_instance=self,
registry_name=registry_name,
device_id=device_id, device_id=device_id,
status_types={}, status_types={},
action_value_mappings={}, action_value_mappings={},

View File

@@ -47,7 +47,6 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
*, *,
driver_instance: "WorkstationBase", driver_instance: "WorkstationBase",
device_id: str, device_id: str,
registry_name: str,
device_uuid: str, device_uuid: str,
status_types: Dict[str, Any], status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any], action_value_mappings: Dict[str, Any],
@@ -63,7 +62,6 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
super().__init__( super().__init__(
driver_instance=driver_instance, driver_instance=driver_instance,
device_id=device_id, device_id=device_id,
registry_name=registry_name,
device_uuid=device_uuid, device_uuid=device_uuid,
status_types=status_types, status_types=status_types,
action_value_mappings={**action_value_mappings, **self.protocol_action_mappings}, action_value_mappings={**action_value_mappings, **self.protocol_action_mappings},

View File

@@ -339,8 +339,13 @@
"z": 0 "z": 0
}, },
"config": { "config": {
"max_volume": 500.0,
"type": "RegularContainer", "type": "RegularContainer",
"category": "container" "category": "container",
"max_temp": 200.0,
"min_temp": -20.0,
"has_stirrer": true,
"has_heater": true
}, },
"data": { "data": {
"liquids": [], "liquids": [],
@@ -764,7 +769,9 @@
"size_y": 250, "size_y": 250,
"size_z": 0, "size_z": 0,
"type": "RegularContainer", "type": "RegularContainer",
"category": "container" "category": "container",
"reagent": "sodium_chloride",
"physical_state": "solid"
}, },
"data": { "data": {
"current_mass": 500.0, "current_mass": 500.0,
@@ -785,11 +792,14 @@
"z": 0 "z": 0
}, },
"config": { "config": {
"volume": 500.0,
"size_x": 600, "size_x": 600,
"size_y": 250, "size_y": 250,
"size_z": 0, "size_z": 0,
"type": "RegularContainer", "type": "RegularContainer",
"category": "container" "category": "container",
"reagent": "sodium_carbonate",
"physical_state": "solid"
}, },
"data": { "data": {
"current_mass": 500.0, "current_mass": 500.0,
@@ -810,11 +820,14 @@
"z": 0 "z": 0
}, },
"config": { "config": {
"volume": 500.0,
"size_x": 650, "size_x": 650,
"size_y": 250, "size_y": 250,
"size_z": 0, "size_z": 0,
"type": "RegularContainer", "type": "RegularContainer",
"category": "container" "category": "container",
"reagent": "magnesium_chloride",
"physical_state": "solid"
}, },
"data": { "data": {
"current_mass": 500.0, "current_mass": 500.0,

View File

@@ -184,51 +184,6 @@ def get_all_subscriptions(instance) -> list:
return subscriptions return subscriptions
def always_free(func: F) -> F:
"""
标记动作为永久闲置(不受busy队列限制)的装饰器
被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制,
任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。
Example:
class MyDriver:
@always_free
def query_status(self, param: str):
# 这个动作可以随时执行,不需要排队
return self._status
def transfer(self, volume: float):
# 这个动作会按正常排队逻辑执行
pass
Note:
- 可以与其他装饰器组合使用,@always_free 应放在最外层
- 仅影响 WebSocket 调度层的 busy/free 判断,不影响 ROS2 层
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._is_always_free = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_always_free(func) -> bool:
"""
检查函数是否被标记为永久闲置
Args:
func: 被检查的函数
Returns:
如果函数被 @always_free 装饰则返回 True否则返回 False
"""
return getattr(func, "_is_always_free", False)
def not_action(func: F) -> F: def not_action(func: F) -> F:
""" """
标记方法为非动作的装饰器 标记方法为非动作的装饰器

View File

@@ -29,7 +29,7 @@ from ast import Constant
from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS
from unilabos.utils import logger from unilabos.utils import logger
from unilabos.utils.decorator import is_not_action, is_always_free from unilabos.utils.decorator import is_not_action
class ImportManager: class ImportManager:
@@ -282,9 +282,6 @@ class ImportManager:
continue continue
# 其他非_开头的方法归类为action # 其他非_开头的方法归类为action
method_info = self._analyze_method_signature(method) method_info = self._analyze_method_signature(method)
# 检查是否被 @always_free 装饰器标记
if is_always_free(method):
method_info["always_free"] = True
result["action_methods"][name] = method_info result["action_methods"][name] = method_info
return result return result
@@ -342,9 +339,6 @@ class ImportManager:
if self._is_not_action_method(node): if self._is_not_action_method(node):
continue continue
# 其他非_开头的方法归类为action # 其他非_开头的方法归类为action
# 检查是否被 @always_free 装饰器标记
if self._is_always_free_method(node):
method_info["always_free"] = True
result["action_methods"][method_name] = method_info result["action_methods"][method_name] = method_info
return result return result
@@ -480,13 +474,6 @@ class ImportManager:
return True return True
return False return False
def _is_always_free_method(self, node: ast.FunctionDef) -> bool:
"""检查是否是@always_free装饰的方法"""
for decorator in node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id == "always_free":
return True
return False
def _get_property_name_from_setter(self, node: ast.FunctionDef) -> str: def _get_property_name_from_setter(self, node: ast.FunctionDef) -> str:
"""从setter装饰器中获取属性名""" """从setter装饰器中获取属性名"""
for decorator in node.decorator_list: for decorator in node.decorator_list:

View File

@@ -193,7 +193,6 @@ def configure_logger(loglevel=None, working_dir=None):
root_logger.addHandler(console_handler) root_logger.addHandler(console_handler)
# 如果指定了工作目录,添加文件处理器 # 如果指定了工作目录,添加文件处理器
log_filepath = None
if working_dir is not None: if working_dir is not None:
logs_dir = os.path.join(working_dir, "logs") logs_dir = os.path.join(working_dir, "logs")
os.makedirs(logs_dir, exist_ok=True) os.makedirs(logs_dir, exist_ok=True)
@@ -214,7 +213,6 @@ def configure_logger(loglevel=None, working_dir=None):
logging.getLogger("asyncio").setLevel(logging.INFO) logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.INFO)
return log_filepath

View File

@@ -362,16 +362,14 @@ def build_protocol_graph(
protocol_steps: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]],
workstation_name: str, workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None, action_resource_mapping: Optional[Dict[str, str]] = None,
labware_defs: Optional[List[Dict[str, Any]]] = None,
) -> WorkflowGraph: ) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑 """统一的协议图构建函数,根据设备类型自动选择构建逻辑
Args: Args:
labware_info: reagent 信息字典,格式为 {name: {slot, well}, ...},用于 set_liquid 和 well 查找 labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...}
protocol_steps: 协议步骤列表 protocol_steps: 协议步骤列表
workstation_name: 工作站名称 workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选 action_resource_mapping: action 到 resource_name 的映射字典,可选
labware_defs: labware 定义列表,格式为 [{"name": "...", "slot": "1", "type": "lab_xxx"}, ...]
""" """
G = WorkflowGraph() G = WorkflowGraph()
resource_last_writer = {} # reagent_name -> "node_id:port" resource_last_writer = {} # reagent_name -> "node_id:port"
@@ -379,7 +377,18 @@ def build_protocol_graph(
protocol_steps = refactor_data(protocol_steps, action_resource_mapping) protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
# ==================== 第一步:按 slot 创建 create_resource 节点 ==================== # ==================== 第一步:按 slot 去重创建 create_resource 节点 ====================
# 收集所有唯一的 slot
slots_info = {} # slot -> {labware, res_id}
for labware_id, item in labware_info.items():
slot = str(item.get("slot", ""))
if slot and slot not in slots_info:
res_id = f"plate_slot_{slot}"
slots_info[slot] = {
"labware": item.get("labware", ""),
"res_id": res_id,
}
# 创建 Group 节点,包含所有 create_resource 节点 # 创建 Group 节点,包含所有 create_resource 节点
group_node_id = str(uuid.uuid4()) group_node_id = str(uuid.uuid4())
G.add_node( G.add_node(
@@ -395,35 +404,29 @@ def build_protocol_graph(
param=None, param=None,
) )
# 直接使用 JSON 中的 labware 定义,每个 slot 一条记录type 即 class_name # 为每个唯一的 slot 创建 create_resource 节点
res_index = 0 res_index = 0
for lw in (labware_defs or []): for slot, info in slots_info.items():
slot = str(lw.get("slot", "")) node_id = str(uuid.uuid4())
if not slot or slot in slot_to_create_resource: res_id = info["res_id"]
continue # 跳过空 slot 或已处理的 slot
lw_name = lw.get("name", f"slot {slot}")
lw_type = lw.get("type", CREATE_RESOURCE_DEFAULTS["class_name"])
res_id = f"plate_slot_{slot}"
res_index += 1 res_index += 1
node_id = str(uuid.uuid4())
G.add_node( G.add_node(
node_id, node_id,
template_name="create_resource", template_name="create_resource",
resource_name="host_node", resource_name="host_node",
name=lw_name, name=f"Plate {res_index}",
description=f"Create {lw_name}", description=f"Create plate on slot {slot}",
lab_node_type="Labware", lab_node_type="Labware",
footer="create_resource-host_node", footer="create_resource-host_node",
device_name=DEVICE_NAME_HOST, device_name=DEVICE_NAME_HOST,
type=NODE_TYPE_DEFAULT, type=NODE_TYPE_DEFAULT,
parent_uuid=group_node_id, parent_uuid=group_node_id, # 指向 Group 节点
minimized=True, minimized=True, # 折叠显示
param={ param={
"res_id": res_id, "res_id": res_id,
"device_id": CREATE_RESOURCE_DEFAULTS["device_id"], "device_id": CREATE_RESOURCE_DEFAULTS["device_id"],
"class_name": lw_type, "class_name": CREATE_RESOURCE_DEFAULTS["class_name"],
"parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot), "parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot),
"bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0}, "bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0},
"slot_on_deck": slot, "slot_on_deck": slot,
@@ -431,6 +434,8 @@ def build_protocol_graph(
) )
slot_to_create_resource[slot] = node_id slot_to_create_resource[slot] = node_id
# create_resource 之间不需要 ready 连接
# ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ==================== # ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ====================
# 创建 Group 节点,包含所有 set_liquid_from_plate 节点 # 创建 Group 节点,包含所有 set_liquid_from_plate 节点
set_liquid_group_id = str(uuid.uuid4()) set_liquid_group_id = str(uuid.uuid4())

View File

@@ -1,20 +1,16 @@
""" """
JSON 工作流转换模块 JSON 工作流转换模块
将 workflow/reagent/labware 格式的 JSON 转换为统一工作流格式。 将 workflow/reagent 格式的 JSON 转换为统一工作流格式。
输入格式: 输入格式:
{ {
"labware": [
{"name": "...", "slot": "1", "type": "lab_xxx"},
...
],
"workflow": [ "workflow": [
{"action": "...", "action_args": {...}}, {"action": "...", "action_args": {...}},
... ...
], ],
"reagent": { "reagent": {
"reagent_name": {"slot": int, "well": [...]}, "reagent_name": {"slot": int, "well": [...], "labware": "..."},
... ...
} }
} }
@@ -249,18 +245,18 @@ def convert_from_json(
if "workflow" not in json_data or "reagent" not in json_data: if "workflow" not in json_data or "reagent" not in json_data:
raise ValueError( raise ValueError(
"不支持的 JSON 格式。请使用标准格式:\n" "不支持的 JSON 格式。请使用标准格式:\n"
'{"labware": [...], "workflow": [...], "reagent": {...}}' '{"workflow": [{"action": "...", "action_args": {...}}, ...], '
'"reagent": {"name": {"slot": int, "well": [...], "labware": "..."}, ...}}'
) )
# 提取数据 # 提取数据
workflow = json_data["workflow"] workflow = json_data["workflow"]
reagent = json_data["reagent"] reagent = json_data["reagent"]
labware_defs = json_data.get("labware", []) # 新的 labware 定义列表
# 规范化步骤数据 # 规范化步骤数据
protocol_steps = normalize_workflow_steps(workflow) protocol_steps = normalize_workflow_steps(workflow)
# reagent 已经是字典格式,用于 set_liquid 和 well 数量查找 # reagent 已经是字典格式,直接使
labware_info = reagent labware_info = reagent
# 构建工作流图 # 构建工作流图
@@ -269,7 +265,6 @@ def convert_from_json(
protocol_steps=protocol_steps, protocol_steps=protocol_steps,
workstation_name=workstation_name, workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING, action_resource_mapping=ACTION_RESOURCE_MAPPING,
labware_defs=labware_defs,
) )
# 校验句柄配置 # 校验句柄配置

View File

@@ -41,7 +41,6 @@ def upload_workflow(
workflow_name: Optional[str] = None, workflow_name: Optional[str] = None,
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
published: bool = False, published: bool = False,
description: str = "",
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
上传工作流到服务器 上传工作流到服务器
@@ -57,7 +56,6 @@ def upload_workflow(
workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名 workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名
tags: 工作流标签列表,默认为空列表 tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False published: 是否发布工作流默认为False
description: 工作流描述,发布时使用
Returns: Returns:
Dict: API响应数据 Dict: API响应数据
@@ -77,14 +75,6 @@ def upload_workflow(
print_status(f"工作流文件JSON解析失败: {e}", "error") print_status(f"工作流文件JSON解析失败: {e}", "error")
return {"code": -1, "message": f"JSON解析失败: {e}"} return {"code": -1, "message": f"JSON解析失败: {e}"}
# 从 JSON 文件中提取 description 和 tags作为 fallback
if not description and "description" in workflow_data:
description = workflow_data["description"]
print_status(f"从文件中读取 description", "info")
if not tags and "tags" in workflow_data:
tags = workflow_data["tags"]
print_status(f"从文件中读取 tags: {tags}", "info")
# 自动检测并转换格式 # 自动检测并转换格式
if not _is_node_link_format(workflow_data): if not _is_node_link_format(workflow_data):
try: try:
@@ -106,7 +96,6 @@ def upload_workflow(
print_status(f" - 节点数量: {len(nodes)}", "info") print_status(f" - 节点数量: {len(nodes)}", "info")
print_status(f" - 边数量: {len(edges)}", "info") print_status(f" - 边数量: {len(edges)}", "info")
print_status(f" - 标签: {tags or []}", "info") print_status(f" - 标签: {tags or []}", "info")
print_status(f" - 描述: {description[:50]}{'...' if len(description) > 50 else ''}", "info")
print_status(f" - 发布状态: {published}", "info") print_status(f" - 发布状态: {published}", "info")
# 调用 http_client 上传 # 调用 http_client 上传
@@ -118,7 +107,6 @@ def upload_workflow(
edges=edges, edges=edges,
tags=tags, tags=tags,
published=published, published=published,
description=description,
) )
if result.get("code") == 0: if result.get("code") == 0:
@@ -143,9 +131,8 @@ def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None:
workflow_name = args_dict.get("workflow_name") workflow_name = args_dict.get("workflow_name")
tags = args_dict.get("tags", []) tags = args_dict.get("tags", [])
published = args_dict.get("published", False) published = args_dict.get("published", False)
description = args_dict.get("description", "")
if workflow_file: if workflow_file:
upload_workflow(workflow_file, workflow_name, tags, published, description) upload_workflow(workflow_file, workflow_name, tags, published)
else: else:
print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error") print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error")