Compare commits

..

16 Commits

Author SHA1 Message Date
Xuwznln
07cf690897 fix possible crash 2026-02-12 01:46:26 +08:00
Xuwznln
cfea27460a fix deck & host_node 2026-02-12 01:46:24 +08:00
Xuwznln
b7d3e980a9 set liquid with tube 2026-02-12 01:46:23 +08:00
Xuwznln
f9ed6cb3fb add test_resource_schema 2026-02-11 14:02:21 +08:00
Xuwznln
699a0b3ce7 fix test resource schema 2026-02-10 23:08:29 +08:00
Xuwznln
cf3a20ae79 registry update & workflow update 2026-02-10 22:46:07 +08:00
Xuwznln
cdf0652020 add test mode 2026-02-10 15:18:41 +08:00
Xuwznln
60073ff139 support description & tags upload 2026-02-10 14:38:55 +08:00
Xuwznln
a9053b822f fix config load 2026-02-10 13:06:05 +08:00
Xuwznln
d238c2ab8b fix log 2026-02-10 13:04:33 +08:00
Xuwznln
9a7d5c7c82 add registry name & add always free 2026-02-07 02:11:43 +08:00
Xuwznln
4f7d431c0b correct config organic synthesis 2026-02-06 12:04:19 +08:00
Xuwznln
341a1b537c Adapt to new scheduler, sampels, and edge upload format (#230)
* add sample_material

* adapt to new samples sys

* fix pump transfer. fix resource update when protocol & ros callback

* Adapt to new scheduler.
2026-02-06 00:49:53 +08:00
Xuwznln
957fb41a6f Feat/samples (#229)
* add sample_material

* adapt to new samples sys
2026-02-05 00:42:12 +08:00
Xuwznln
26271bcab8 adapt to new samples sys 2026-02-04 18:49:08 +08:00
Xuwznln
84a8223173 adapt to new edge format 2026-02-03 23:22:38 +08:00
39 changed files with 1432 additions and 522 deletions

View File

@@ -452,8 +452,9 @@ unilab --ak your_ak --sk your_sk -g test/experiments/mock_devices/mock_all.json
**操作步骤:**
1. 将两个 `container` 拖拽到 `workstation`
2.`virtual_transfer_pump` 拖拽到 `workstation`
3. 在画布上连接它们(建立父子关系)
2.`virtual_multiway_valve` 拖拽到 `workstation`
3. `virtual_transfer_pump` 拖拽到 `workstation`
4. 在画布上连接它们(建立父子关系)
![设备连接](image/links.png)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 275 KiB

After

Width:  |  Height:  |  Size: 415 KiB

View File

@@ -171,6 +171,12 @@ def parse_args():
action="store_true",
help="Disable sending update feedback to server",
)
parser.add_argument(
"--test_mode",
action="store_true",
default=False,
help="Test mode: all actions simulate execution and return mock results without running real hardware",
)
# workflow upload subcommand
workflow_parser = subparsers.add_parser(
"workflow_upload",
@@ -204,6 +210,12 @@ def parse_args():
default=False,
help="Whether to publish the workflow (default: False)",
)
workflow_parser.add_argument(
"--description",
type=str,
default="",
help="Workflow description, used when publishing the workflow",
)
return parser
@@ -231,48 +243,56 @@ def main():
# 加载配置文件优先加载config然后从env读取
config_path = args_dict.get("config")
if check_mode:
args_dict["working_dir"] = os.path.abspath(os.getcwd())
# 当 skip_env_check 时,默认使用当前目录作为 working_dir
if skip_env_check and not args_dict.get("working_dir") and not config_path:
working_dir = os.path.abspath(os.getcwd())
print_status(f"跳过环境检查模式:使用当前目录作为工作目录 {working_dir}", "info")
# 检查当前目录是否有 local_config.py
local_config_in_cwd = os.path.join(working_dir, "local_config.py")
if os.path.exists(local_config_in_cwd):
config_path = local_config_in_cwd
print_status(f"发现本地配置文件: {config_path}", "info")
# === 解析 working_dir ===
# 规则1: working_dir 传入 → 检测 unilabos_data 子目录,已是则不修改
# 规则2: 仅 config_path 传入 → 用其父目录作为 working_dir
# 规则4: 两者都传入 → 各用各的,但 working_dir 仍做 unilabos_data 子目录检测
raw_working_dir = args_dict.get("working_dir")
if raw_working_dir:
working_dir = os.path.abspath(raw_working_dir)
elif config_path and os.path.exists(config_path):
working_dir = os.path.dirname(os.path.abspath(config_path))
else:
print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info")
elif os.getcwd().endswith("unilabos_data"):
working_dir = os.path.abspath(os.getcwd())
else:
# unilabos_data 子目录自动检测
if os.path.basename(working_dir) != "unilabos_data":
unilabos_data_sub = os.path.join(working_dir, "unilabos_data")
if os.path.isdir(unilabos_data_sub):
working_dir = unilabos_data_sub
elif not raw_working_dir and not (config_path and os.path.exists(config_path)):
# 未显式指定路径,默认使用 cwd/unilabos_data
working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
if args_dict.get("working_dir"):
working_dir = args_dict.get("working_dir", "")
# === 解析 config_path ===
if config_path and not os.path.exists(config_path):
config_path = os.path.join(working_dir, "local_config.py")
if not os.path.exists(config_path):
# config_path 传入但不存在,尝试在 working_dir 中查找
candidate = os.path.join(working_dir, "local_config.py")
if os.path.exists(candidate):
config_path = candidate
print_status(f"在工作目录中发现配置文件: {config_path}", "info")
else:
print_status(
f"当前工作目录 {working_dir} 未找到local_config.py请通过 --config 传入 local_config.py 文件路径",
f"配置文件 {config_path} 不存在,工作目录 {working_dir} 中也未找到 local_config.py"
f"请通过 --config 传入 local_config.py 文件路径",
"error",
)
os._exit(1)
elif config_path and os.path.exists(config_path):
working_dir = os.path.dirname(config_path)
elif os.path.exists(working_dir) and os.path.exists(os.path.join(working_dir, "local_config.py")):
config_path = os.path.join(working_dir, "local_config.py")
elif not skip_env_check and not config_path and (
not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py"))
):
elif not config_path:
# 规则3: 未传入 config_path尝试 working_dir/local_config.py
candidate = os.path.join(working_dir, "local_config.py")
if os.path.exists(candidate):
config_path = candidate
print_status(f"发现本地配置文件: {config_path}", "info")
else:
print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info")
print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info")
if input() != "n":
if check_mode or input() != "n":
os.makedirs(working_dir, exist_ok=True)
config_path = os.path.join(working_dir, "local_config.py")
shutil.copy(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), config_path
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"),
config_path,
)
print_status(f"已创建 local_config.py 路径: {config_path}", "info")
else:
@@ -288,7 +308,9 @@ def main():
if hasattr(BasicConfig, "log_level"):
logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.")
configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
file_path = configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
if file_path is not None:
logger.info(f"[LOG_FILE] {file_path}")
if args.addr != parser.get_default("addr"):
if args.addr == "test":
@@ -332,6 +354,9 @@ def main():
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
BasicConfig.upload_registry = args_dict.get("upload_registry", False)
BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False)
BasicConfig.test_mode = args_dict.get("test_mode", False)
if BasicConfig.test_mode:
print_status("启用测试模式:所有动作将模拟执行,不调用真实硬件", "warning")
BasicConfig.communication_protocol = "websocket"
machine_name = os.popen("hostname").read().strip()
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])

View File

@@ -54,6 +54,7 @@ class JobAddReq(BaseModel):
action_type: str = Field(
examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default=""
)
sample_material: dict = Field(examples=[{"string": "string"}], description="sample uuid to material uuid")
action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict)
task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="")
job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="")

View File

@@ -38,9 +38,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[
response = http_client.resource_registry({"resources": list(devices_to_register.values())})
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}ms")
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}s")
else:
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}ms")
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}s")
except Exception as e:
logger.error(f"[UniLab Register] 设备注册异常: {e}")
@@ -51,9 +51,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[
response = http_client.resource_registry({"resources": list(resources_to_register.values())})
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}ms")
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}s")
else:
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}ms")
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}s")
except Exception as e:
logger.error(f"[UniLab Register] 资源注册异常: {e}")

View File

@@ -343,9 +343,10 @@ class HTTPClient:
edges: List[Dict[str, Any]],
tags: Optional[List[str]] = None,
published: bool = False,
description: str = "",
) -> Dict[str, Any]:
"""
导入工作流到服务器
导入工作流到服务器,如果 published 为 True则额外发起发布请求
Args:
name: 工作流名称(顶层)
@@ -355,6 +356,7 @@ class HTTPClient:
edges: 工作流边列表
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
description: 工作流描述,发布时使用
Returns:
Dict: API响应数据包含 code 和 data (uuid, name)
@@ -367,7 +369,6 @@ class HTTPClient:
"nodes": nodes,
"edges": edges,
"tags": tags if tags is not None else [],
"published": published,
},
}
# 保存请求到文件
@@ -389,10 +390,50 @@ class HTTPClient:
if "code" in res and res["code"] != 0:
logger.error(f"导入工作流失败: {response.text}")
return res
# 导入成功后,如果需要发布则额外发起发布请求
if published:
imported_uuid = res.get("data", {}).get("uuid", workflow_uuid)
publish_res = self.workflow_publish(imported_uuid, description)
res["publish_result"] = publish_res
return res
else:
logger.error(f"导入工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
def workflow_publish(self, workflow_uuid: str, description: str = "") -> Dict[str, Any]:
"""
发布工作流
Args:
workflow_uuid: 工作流UUID
description: 工作流描述
Returns:
Dict: API响应数据
"""
payload = {
"uuid": workflow_uuid,
"description": description,
"published": True,
}
logger.info(f"正在发布工作流: {workflow_uuid}")
response = requests.patch(
f"{self.remote_addr}/lab/workflow/owner",
json=payload,
headers={"Authorization": f"Lab {self.auth}"},
timeout=60,
)
if response.status_code == 200:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"发布工作流失败: {response.text}")
else:
logger.info(f"工作流发布成功: {workflow_uuid}")
return res
else:
logger.error(f"发布工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
# 创建默认客户端实例
http_client = HTTPClient()

View File

@@ -327,6 +327,7 @@ def job_add(req: JobAddReq) -> JobData:
queue_item,
action_type=action_type,
action_kwargs=action_args,
sample_material=req.sample_material,
server_info=server_info,
)

View File

@@ -76,6 +76,7 @@ class JobInfo:
start_time: float
last_update_time: float = field(default_factory=time.time)
ready_timeout: Optional[float] = None # READY状态的超时时间
always_free: bool = False # 是否为永久闲置动作(不受排队限制)
def update_timestamp(self):
"""更新最后更新时间"""
@@ -127,6 +128,15 @@ class DeviceActionManager:
# 总是将job添加到all_jobs中
self.all_jobs[job_info.job_id] = job_info
# always_free的动作不受排队限制直接设为READY
if job_info.always_free:
job_info.status = JobStatus.READY
job_info.update_timestamp()
job_info.set_ready_timeout(10)
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.trace(f"[DeviceActionManager] Job {job_log} always_free, start immediately")
return True
# 检查是否有正在执行或准备执行的任务
if device_key in self.active_jobs:
# 有正在执行或准备执行的任务,加入队列
@@ -176,9 +186,13 @@ class DeviceActionManager:
logger.error(f"[DeviceActionManager] Job {job_log} is not in READY status, current: {job_info.status}")
return False
# always_free的job不需要检查active_jobs
if not job_info.always_free:
# 检查设备上是否是这个job
if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id:
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
job_log = format_job_log(
job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name
)
logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}")
return False
@@ -203,6 +217,13 @@ class DeviceActionManager:
job_info = self.all_jobs[job_id]
device_key = job_info.device_action_key
# always_free的job直接清理不影响队列
if job_info.always_free:
job_info.status = JobStatus.ENDED
job_info.update_timestamp()
del self.all_jobs[job_id]
return None
# 移除活跃任务
if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id:
del self.active_jobs[device_key]
@@ -234,9 +255,14 @@ class DeviceActionManager:
return None
def get_active_jobs(self) -> List[JobInfo]:
"""获取所有正在执行的任务"""
"""获取所有正在执行的任务(含active_jobs和always_free的STARTED job)"""
with self.lock:
return list(self.active_jobs.values())
jobs = list(self.active_jobs.values())
# 补充 always_free 的 STARTED job(它们不在 active_jobs 中)
for job in self.all_jobs.values():
if job.always_free and job.status == JobStatus.STARTED and job not in jobs:
jobs.append(job)
return jobs
def get_queued_jobs(self) -> List[JobInfo]:
"""获取所有排队中的任务"""
@@ -261,6 +287,14 @@ class DeviceActionManager:
job_info = self.all_jobs[job_id]
device_key = job_info.device_action_key
# always_free的job直接清理
if job_info.always_free:
job_info.status = JobStatus.ENDED
del self.all_jobs[job_id]
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.trace(f"[DeviceActionManager] Always-free job {job_log} cancelled")
return True
# 如果是正在执行的任务
if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id:
# 清理active job状态
@@ -334,13 +368,18 @@ class DeviceActionManager:
timeout_jobs = []
with self.lock:
# 统计READY状态的任务数量
ready_jobs_count = sum(1 for job in self.active_jobs.values() if job.status == JobStatus.READY)
# 收集所有需要检查的 READY 任务(active_jobs + always_free READY jobs)
ready_candidates = list(self.active_jobs.values())
for job in self.all_jobs.values():
if job.always_free and job.status == JobStatus.READY and job not in ready_candidates:
ready_candidates.append(job)
ready_jobs_count = sum(1 for job in ready_candidates if job.status == JobStatus.READY)
if ready_jobs_count > 0:
logger.trace(f"[DeviceActionManager] Checking {ready_jobs_count} READY jobs for timeout") # type: ignore # noqa: E501
# 找到所有超时的READY任务只检测不处理
for job_info in self.active_jobs.values():
for job_info in ready_candidates:
if job_info.is_ready_timeout():
timeout_jobs.append(job_info)
job_log = format_job_log(
@@ -545,7 +584,7 @@ class MessageProcessor:
try:
message_str = json.dumps(msg, ensure_ascii=False)
await self.websocket.send(message_str)
logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501
# logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501
except Exception as e:
logger.error(f"[MessageProcessor] Failed to send message: {str(e)}")
logger.error(traceback.format_exc())
@@ -608,6 +647,24 @@ class MessageProcessor:
if host_node:
host_node.handle_pong_response(pong_data)
def _check_action_always_free(self, device_id: str, action_name: str) -> bool:
"""检查该action是否标记为always_free通过HostNode统一的_action_value_mappings查找"""
try:
host_node = HostNode.get_instance(0)
if not host_node:
return False
# noinspection PyProtectedMember
action_mappings = host_node._action_value_mappings.get(device_id)
if not action_mappings:
return False
# 尝试直接匹配或 auto- 前缀匹配
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
return action_mappings[key].get("always_free", False)
return False
except Exception:
return False
async def _handle_query_action_state(self, data: Dict[str, Any]):
"""处理query_action_state消息"""
device_id = data.get("device_id", "")
@@ -622,6 +679,9 @@ class MessageProcessor:
device_action_key = f"/devices/{device_id}/{action_name}"
# 检查action是否为always_free
action_always_free = self._check_action_always_free(device_id, action_name)
# 创建任务信息
job_info = JobInfo(
job_id=job_id,
@@ -631,6 +691,7 @@ class MessageProcessor:
device_action_key=device_action_key,
status=JobStatus.QUEUE,
start_time=time.time(),
always_free=action_always_free,
)
# 添加到设备管理器
@@ -657,6 +718,8 @@ class MessageProcessor:
async def _handle_job_start(self, data: Dict[str, Any]):
"""处理job_start消息"""
try:
if not data.get("sample_material"):
data["sample_material"] = {}
req = JobAddReq(**data)
job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action)
@@ -688,6 +751,7 @@ class MessageProcessor:
queue_item,
action_type=req.action_type,
action_kwargs=req.action_args,
sample_material=req.sample_material,
server_info=req.server_info,
)
@@ -1120,6 +1184,11 @@ class QueueProcessor:
logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs")
for job_info in queued_jobs:
# 快照可能已过期:在遍历过程中 end_job() 可能已将此 job 移至 READY
# 此时不应再发送 busy/need_more否则会覆盖已发出的 free=True 通知
if job_info.status != JobStatus.QUEUE:
continue
message = {
"action": "report_action_state",
"data": {
@@ -1301,7 +1370,7 @@ class WebSocketClient(BaseCommunicationClient):
},
}
self.message_processor.send_message(message)
logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}")
# logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}")
def publish_job_status(
self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None

View File

@@ -95,8 +95,29 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float:
return total_volume
def is_integrated_pump(node_name):
return "pump" in node_name and "valve" in node_name
def is_integrated_pump(node_class: str, node_name: str = "") -> bool:
"""
判断是否为泵阀一体设备
"""
class_lower = (node_class or "").lower()
name_lower = (node_name or "").lower()
if "pump" not in class_lower and "pump" not in name_lower:
return False
integrated_markers = [
"valve",
"pump_valve",
"pumpvalve",
"integrated",
"transfer_pump",
]
for marker in integrated_markers:
if marker in class_lower or marker in name_lower:
return True
return False
def find_connected_pump(G, valve_node):
@@ -186,7 +207,9 @@ def build_pump_valve_maps(G, pump_backbone):
debug_print(f"🔧 过滤后的骨架: {filtered_backbone}")
for node in filtered_backbone:
if is_integrated_pump(G.nodes[node]["class"]):
node_data = G.nodes.get(node, {})
node_class = node_data.get("class", "") or ""
if is_integrated_pump(node_class, node):
pumps_from_node[node] = node
valve_from_node[node] = node
debug_print(f" - 集成泵-阀: {node}")

View File

@@ -23,6 +23,7 @@ class BasicConfig:
disable_browser = False # 禁止浏览器自动打开
port = 8002 # 本地HTTP服务
check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性
test_mode = False # 测试模式,所有动作不实际执行,返回模拟结果
# 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG"
@@ -145,5 +146,5 @@ def load_config(config_path=None):
traceback.print_exc()
exit(1)
else:
config_path = os.path.join(os.path.dirname(__file__), "local_config.py")
config_path = os.path.join(os.path.dirname(__file__), "example_config.py")
load_config(config_path)

View File

@@ -21,13 +21,18 @@ from pylabrobot.resources import (
ResourceHolder,
Lid,
Trash,
Tip,
Tip, TubeRack,
)
from typing_extensions import TypedDict
from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend
from unilabos.registry.placeholder_type import ResourceSlot
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict
from unilabos.resources.resource_tracker import (
ResourceTreeSet,
ResourceDict,
EXTRA_SAMPLE_UUID,
EXTRA_UNILABOS_SAMPLE_UUID,
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
@@ -231,12 +236,11 @@ class LiquidHandlerMiddleware(LiquidHandler):
res_samples = []
res_volumes = []
for resource, volume, channel in zip(resources, vols, use_channels):
res_samples.append(
{"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)}
)
sample_uuid_value = resource.unilabos_extra.get(EXTRA_SAMPLE_UUID, None)
res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: sample_uuid_value})
res_volumes.append(volume)
self.pending_liquids_dict[channel] = {
"sample_uuid": resource.unilabos_extra.get("sample_uuid", None),
EXTRA_SAMPLE_UUID: sample_uuid_value,
"volume": volume,
}
return SimpleReturn(samples=res_samples, volumes=res_volumes)
@@ -278,10 +282,10 @@ class LiquidHandlerMiddleware(LiquidHandler):
res_samples = []
res_volumes = []
for resource, volume, channel in zip(resources, vols, use_channels):
res_uuid = self.pending_liquids_dict[channel]["sample_uuid"]
res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID]
self.pending_liquids_dict[channel]["volume"] -= volume
resource.unilabos_extra["sample_uuid"] = res_uuid
res_samples.append({"name": resource.name, "sample_uuid": res_uuid})
resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid
res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: res_uuid})
res_volumes.append(volume)
return SimpleReturn(samples=res_samples, volumes=res_volumes)
@@ -686,18 +690,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
)
def set_liquid_from_plate(
self, plate: List[ResourceSlot], well_names: list[str], liquid_names: list[str], volumes: list[float]
self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float]
) -> SetLiquidFromPlateReturn:
"""Set the liquid in wells of a plate by well names (e.g., A1, A2, B3).
如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。
"""
if isinstance(plate, list): # 未来移除
plate = plate[0]
assert issubclass(plate.__class__, Plate), "plate must be a Plate"
plate: Plate = cast(Plate, plate)
assert issubclass(plate.__class__, Plate) or issubclass(plate.__class__, TubeRack) , f"plate must be a Plate, now: {type(plate)}"
plate: Union[Plate, TubeRack]
# 根据 well_names 获取对应的 Well 对象
if issubclass(plate.__class__, Plate):
wells = [plate.get_well(name) for name in well_names]
elif issubclass(plate.__class__, TubeRack):
wells = [plate.get_tube(name) for name in well_names]
res_volumes = []
# 如果 liquid_names 和 volumes 都为空,直接返回

View File

@@ -91,7 +91,7 @@ class PRCXI9300Deck(Deck):
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs):
super().__init__(name, size_x, size_y, size_z)
super().__init__(size_x, size_y, size_z, name)
self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位
self.slot_locations = [Coordinate(0, 0, 0)] * 16
@@ -248,14 +248,15 @@ class PRCXI9300TipRack(TipRack):
if ordered_items is not None:
items = ordered_items
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# 如果是字符串,说明这是位置名称,需要让 TipRack 自己创建 Tip 对象
# 我们只传递位置信息(键),不传递值,使用 ordering 参数
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
# 检查 ordering 中的值类型来决定如何处理:
# - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param
# - None 值(从第二次往返序列化): 同样只用键创建 ordering_param
# - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用
first_val = next(iter(ordering.values()), None) if ordering else None
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TipRack 自己创建 Tip 对象
items = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,可以直接使用
@@ -397,14 +398,15 @@ class PRCXI9300TubeRack(TubeRack):
items_to_pass = ordered_items
ordering_param = None
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# 如果是字符串,说明这是位置名称,需要让 TubeRack 自己创建 Tube 对象
# 我们只传递位置信息(键),不传递值,使用 ordering 参数
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
# 检查 ordering 中的值类型来决定如何处理:
# - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param
# - None 值(从第二次往返序列化): 同样只用键创建 ordering_param
# - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用
first_val = next(iter(ordering.values()), None) if ordering else None
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象
items_to_pass = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,可以直接使用
@@ -595,7 +597,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
return super().set_liquid(wells, liquid_names, volumes)
def set_liquid_from_plate(
self, plate: List[ResourceSlot], well_names: list[str], liquid_names: list[str], volumes: list[float]
self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float]
) -> SetLiquidFromPlateReturn:
return super().set_liquid_from_plate(plate, well_names, liquid_names, volumes)

View File

@@ -19,10 +19,11 @@ from rclpy.node import Node
import re
class LiquidHandlerJointPublisher(BaseROS2DeviceNode):
def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", **kwargs):
def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", registry_name: str = "lh_joint_publisher", **kwargs):
super().__init__(
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
status_types={},
action_value_mappings={},
hardware_interface={},

View File

@@ -31,14 +31,14 @@ class VirtualTransferPump:
# 从config或kwargs中获取参数确保类型正确
if config:
self.max_volume = float(config.get('max_volume', 25.0))
self.port = config.get('port', 'VIRTUAL')
self.max_volume = float(config.get("max_volume", 25.0))
self.port = config.get("port", "VIRTUAL")
else:
self.max_volume = float(kwargs.get('max_volume', 25.0))
self.port = kwargs.get('port', 'VIRTUAL')
self.max_volume = float(kwargs.get("max_volume", 25.0))
self.port = kwargs.get("port", "VIRTUAL")
self._transfer_rate = float(kwargs.get('transfer_rate', 0))
self.mode = kwargs.get('mode', VirtualPumpMode.Normal)
self._transfer_rate = float(kwargs.get("transfer_rate", 0))
self.mode = kwargs.get("mode", VirtualPumpMode.Normal)
# 状态变量 - 确保都是正确类型
self._status = "Idle"
@@ -54,7 +54,9 @@ class VirtualTransferPump:
self.logger = logging.getLogger(f"VirtualTransferPump.{self.device_id}")
print(f"🚰 === 虚拟转移泵 {self.device_id} 已创建 === ✨")
print(f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s")
print(
f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s"
)
print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}")
def post_init(self, ros_node: BaseROS2DeviceNode):
@@ -189,7 +191,9 @@ class VirtualTransferPump:
operation_emoji = "📍"
self.logger.info(f"🎯 SET_POSITION: {operation_type} {operation_emoji}")
self.logger.info(f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)")
self.logger.info(
f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)"
)
self.logger.info(f" 🌊 速度: {velocity:.2f} mL/s")
self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s")
@@ -207,7 +211,11 @@ class VirtualTransferPump:
for i in range(steps + 1):
# 计算当前位置和进度
progress = (i / steps) * 100 if steps > 0 else 100
current_pos = start_position + (target_position - start_position) * (i / steps) if steps > 0 else target_position
current_pos = (
start_position + (target_position - start_position) * (i / steps)
if steps > 0
else target_position
)
# 更新状态
if i < steps:
@@ -244,7 +252,9 @@ class VirtualTransferPump:
# 📊 最终状态日志
if volume_to_move > 0.01:
self.logger.info(f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL")
self.logger.info(
f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL"
)
# 返回符合action定义的结果
return {
@@ -252,7 +262,7 @@ class VirtualTransferPump:
"message": f"✅ 成功移动到位置 {self._position:.2f}mL ({operation_type})",
"final_position": self._position,
"final_volume": self._current_volume,
"operation_type": operation_type
"operation_type": operation_type,
}
except Exception as e:
@@ -262,7 +272,7 @@ class VirtualTransferPump:
"success": False,
"message": error_msg,
"final_position": self._position,
"final_volume": self._current_volume
"final_volume": self._current_volume,
}
# 其他泵操作方法
@@ -388,7 +398,9 @@ class VirtualTransferPump:
return self._current_volume >= (self.max_volume - 0.01) # 允许小量误差
def __str__(self):
return f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})"
return (
f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})"
)
def __repr__(self):
return self.__str__()

View File

@@ -11,9 +11,10 @@ Virtual Workbench Device - 模拟工作台设备
注意:调用来自线程池,使用 threading.Lock 进行同步
"""
import logging
import time
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from enum import Enum
from threading import Lock, RLock
@@ -21,38 +22,47 @@ from threading import Lock, RLock
from typing_extensions import TypedDict
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.utils.decorator import not_action
from unilabos.utils.decorator import not_action, always_free
from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, RETURN_UNILABOS_SAMPLES
# ============ TypedDict 返回类型定义 ============
class MoveToHeatingStationResult(TypedDict):
"""move_to_heating_station 返回类型"""
success: bool
station_id: int
material_id: str
material_number: int
message: str
unilabos_samples: List[LabSample]
class StartHeatingResult(TypedDict):
"""start_heating 返回类型"""
success: bool
station_id: int
material_id: str
material_number: int
message: str
unilabos_samples: List[LabSample]
class MoveToOutputResult(TypedDict):
"""move_to_output 返回类型"""
success: bool
station_id: int
material_id: str
unilabos_samples: List[LabSample]
class PrepareMaterialsResult(TypedDict):
"""prepare_materials 返回类型 - 批量准备物料"""
success: bool
count: int
material_1: int # 物料编号1
@@ -61,12 +71,15 @@ class PrepareMaterialsResult(TypedDict):
material_4: int # 物料编号4
material_5: int # 物料编号5
message: str
unilabos_samples: List[LabSample]
# ============ 状态枚举 ============
class HeatingStationState(Enum):
"""加热台状态枚举"""
IDLE = "idle" # 空闲
OCCUPIED = "occupied" # 已放置物料,等待加热
HEATING = "heating" # 加热中
@@ -75,6 +88,7 @@ class HeatingStationState(Enum):
class ArmState(Enum):
"""机械臂状态枚举"""
IDLE = "idle" # 空闲
BUSY = "busy" # 工作中
@@ -82,6 +96,7 @@ class ArmState(Enum):
@dataclass
class HeatingStation:
"""加热台数据结构"""
station_id: int
state: HeatingStationState = HeatingStationState.IDLE
current_material: Optional[str] = None # 当前物料 (如 "A1", "A2")
@@ -108,8 +123,8 @@ class VirtualWorkbench:
_ros_node: BaseROS2DeviceNode
# 配置常量
ARM_OPERATION_TIME: float = 3.0 # 机械臂操作时间(秒)
HEATING_TIME: float = 10.0 # 加热时间(秒)
ARM_OPERATION_TIME: float = 2 # 机械臂操作时间(秒)
HEATING_TIME: float = 60.0 # 加热时间(秒)
NUM_HEATING_STATIONS: int = 3 # 加热台数量
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
@@ -126,9 +141,9 @@ class VirtualWorkbench:
self.data: Dict[str, Any] = {}
# 从config中获取可配置参数
self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", 3.0))
self.HEATING_TIME = float(self.config.get("heating_time", 10.0))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", 3))
self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", self.ARM_OPERATION_TIME))
self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS))
# 机械臂状态和锁 (使用threading.Lock)
self._arm_lock = Lock()
@@ -137,8 +152,7 @@ class VirtualWorkbench:
# 加热台状态 (station_id -> HeatingStation) - 立即初始化不依赖initialize()
self._heating_stations: Dict[int, HeatingStation] = {
i: HeatingStation(station_id=i)
for i in range(1, self.NUM_HEATING_STATIONS + 1)
i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1)
}
self._stations_lock = RLock() # 可重入锁,保护加热台状态
@@ -178,14 +192,16 @@ class VirtualWorkbench:
station.heating_progress = 0.0
# 初始化状态
self.data.update({
self.data.update(
{
"status": "Ready",
"arm_state": ArmState.IDLE.value,
"arm_current_task": None,
"heating_stations": self._get_stations_status(),
"active_tasks_count": 0,
"message": "工作台就绪",
})
}
)
self.logger.info(f"工作台初始化完成: {self.NUM_HEATING_STATIONS}个加热台就绪")
return True
@@ -204,12 +220,14 @@ class VirtualWorkbench:
with self._tasks_lock:
self._active_tasks.clear()
self.data.update({
self.data.update(
{
"status": "Offline",
"arm_state": ArmState.IDLE.value,
"heating_stations": {},
"message": "工作台已关闭",
})
}
)
return True
def _get_stations_status(self) -> Dict[int, Dict[str, Any]]:
@@ -227,12 +245,14 @@ class VirtualWorkbench:
def _update_data_status(self, message: Optional[str] = None):
"""更新状态数据"""
self.data.update({
self.data.update(
{
"arm_state": self._arm_state.value,
"arm_current_task": self._arm_current_task,
"heating_stations": self._get_stations_status(),
"active_tasks_count": len(self._active_tasks),
})
}
)
if message:
self.data["message"] = message
@@ -280,6 +300,7 @@ class VirtualWorkbench:
def prepare_materials(
self,
sample_uuids: SampleUUIDsType,
count: int = 5,
) -> PrepareMaterialsResult:
"""
@@ -297,10 +318,7 @@ class VirtualWorkbench:
# 生成物料列表 A1 - A{count}
materials = [i for i in range(1, count + 1)]
self.logger.info(
f"[准备物料] 生成 {count} 个物料: "
f"A1-A{count} -> material_1~material_{count}"
)
self.logger.info(f"[准备物料] 生成 {count} 个物料: " f"A1-A{count} -> material_1~material_{count}")
return {
"success": True,
@@ -311,10 +329,12 @@ class VirtualWorkbench:
"material_4": materials[3] if len(materials) > 3 else 0,
"material_5": materials[4] if len(materials) > 4 else 0,
"message": f"已准备 {count} 个物料: A1-A{count}",
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
}
def move_to_heating_station(
self,
sample_uuids: SampleUUIDsType,
material_number: int,
) -> MoveToHeatingStationResult:
"""
@@ -391,6 +411,9 @@ class VirtualWorkbench:
"material_id": material_id,
"material_number": material_number,
"message": f"{material_id}已成功移动到加热台{station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
except Exception as e:
@@ -403,10 +426,15 @@ class VirtualWorkbench:
"material_id": material_id,
"material_number": material_number,
"message": f"移动失败: {str(e)}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
@always_free
def start_heating(
self,
sample_uuids: SampleUUIDsType,
station_id: int,
material_number: int,
) -> StartHeatingResult:
@@ -429,6 +457,9 @@ class VirtualWorkbench:
"material_id": "",
"material_number": material_number,
"message": f"无效的加热台ID: {station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
with self._stations_lock:
@@ -441,6 +472,9 @@ class VirtualWorkbench:
"material_id": "",
"material_number": material_number,
"message": f"加热台{station_id}上没有物料",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
if station.state == HeatingStationState.HEATING:
@@ -450,6 +484,9 @@ class VirtualWorkbench:
"material_id": station.current_material,
"material_number": material_number,
"message": f"加热台{station_id}已经在加热中",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
material_id = station.current_material
@@ -465,10 +502,21 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}开始加热{material_id}")
# 模拟加热过程 (10秒)
# 打印当前所有正在加热的台位
with self._stations_lock:
heating_list = [
f"加热台{sid}:{s.current_material}"
for sid, s in self._heating_stations.items()
if s.state == HeatingStationState.HEATING and s.current_material
]
self.logger.info(f"[并行加热] 当前同时加热中: {', '.join(heating_list)}")
# 模拟加热过程
start_time = time.time()
last_countdown_log = start_time
while True:
elapsed = time.time() - start_time
remaining = max(0.0, self.HEATING_TIME - elapsed)
progress = min(100.0, (elapsed / self.HEATING_TIME) * 100)
with self._stations_lock:
@@ -476,6 +524,11 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%")
# 每5秒打印一次倒计时
if time.time() - last_countdown_log >= 5.0:
self.logger.info(f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s")
last_countdown_log = time.time()
if elapsed >= self.HEATING_TIME:
break
@@ -499,10 +552,14 @@ class VirtualWorkbench:
"material_id": material_id,
"material_number": material_number,
"message": f"加热台{station_id}加热完成",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
def move_to_output(
self,
sample_uuids: SampleUUIDsType,
station_id: int,
material_number: int,
) -> MoveToOutputResult:
@@ -525,6 +582,9 @@ class VirtualWorkbench:
"material_id": "",
"output_position": f"C{output_number}",
"message": f"无效的加热台ID: {station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
with self._stations_lock:
@@ -538,6 +598,9 @@ class VirtualWorkbench:
"material_id": "",
"output_position": f"C{output_number}",
"message": f"加热台{station_id}上没有物料",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
if station.state != HeatingStationState.COMPLETED:
@@ -547,6 +610,9 @@ class VirtualWorkbench:
"material_id": material_id,
"output_position": f"C{output_number}",
"message": f"加热台{station_id}尚未完成加热 (当前状态: {station.state.value})",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
output_position = f"C{output_number}"
@@ -595,6 +661,9 @@ class VirtualWorkbench:
"material_id": material_id,
"output_position": output_position,
"message": f"{material_id}已成功移动到{output_position}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
except Exception as e:
@@ -607,6 +676,9 @@ class VirtualWorkbench:
"material_id": "",
"output_position": output_position,
"message": f"移动失败: {str(e)}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
# ============ 状态属性 ============

View File

@@ -96,10 +96,13 @@ serial:
type: string
port:
type: string
registry_name:
type: string
resource_tracker:
type: object
required:
- device_id
- registry_name
- port
type: object
data:

View File

@@ -67,6 +67,9 @@ camera:
period:
default: 0.1
type: number
registry_name:
default: ''
type: string
resource_tracker:
type: object
required: []

View File

@@ -9468,7 +9468,7 @@ liquid_handler.prcxi:
well_names: null
handles:
input:
- data_key: plate
- data_key: '@this.0@@@plate'
data_source: handle
data_type: resource
handler_key: input_plate
@@ -9503,7 +9503,6 @@ liquid_handler.prcxi:
type: string
type: array
plate:
items:
properties:
category:
type: string
@@ -9576,8 +9575,6 @@ liquid_handler.prcxi:
- data
title: plate
type: object
title: plate
type: array
volumes:
items:
type: number
@@ -9593,17 +9590,207 @@ liquid_handler.prcxi:
- volumes
type: object
result:
$defs:
ResourceDict:
properties:
class:
description: Resource class name
title: Class
type: string
config:
additionalProperties: true
description: Resource configuration
title: Config
type: object
data:
additionalProperties: true
description: 'Resource data, eg: container liquid data'
title: Data
type: object
description:
default: ''
description: Resource description
title: Description
type: string
extra:
additionalProperties: true
description: 'Extra data, eg: slot index'
title: Extra
type: object
icon:
default: ''
description: Resource icon
title: Icon
type: string
id:
description: Resource ID
title: Id
type: string
model:
additionalProperties: true
description: Resource model
title: Model
type: object
name:
description: Resource name
title: Name
type: string
parent:
anyOf:
- $ref: '#/$defs/ResourceDict'
- type: 'null'
default: null
description: Parent resource object
parent_uuid:
anyOf:
- type: string
- type: 'null'
default: null
description: Parent resource uuid
title: Parent Uuid
pose:
$ref: '#/$defs/ResourceDictPosition'
description: Resource position
schema:
additionalProperties: true
description: Resource schema
title: Schema
type: object
type:
anyOf:
- const: device
type: string
- type: string
description: Resource type
title: Type
uuid:
description: Resource UUID
title: Uuid
type: string
required:
- id
- uuid
- name
- type
- class
- config
- data
- extra
title: ResourceDict
type: object
ResourceDictPosition:
properties:
cross_section_type:
default: rectangle
description: Cross section type
enum:
- rectangle
- circle
- rounded_rectangle
title: Cross Section Type
type: string
layout:
default: x-y
description: Resource layout
enum:
- 2d
- x-y
- z-y
- x-z
title: Layout
type: string
position:
$ref: '#/$defs/ResourceDictPositionObject'
description: Resource position
position3d:
$ref: '#/$defs/ResourceDictPositionObject'
description: Resource position in 3D space
rotation:
$ref: '#/$defs/ResourceDictPositionObject'
description: Resource rotation
scale:
$ref: '#/$defs/ResourceDictPositionScale'
description: Resource scale
size:
$ref: '#/$defs/ResourceDictPositionSize'
description: Resource size
title: ResourceDictPosition
type: object
ResourceDictPositionObject:
properties:
x:
default: 0.0
description: X coordinate
title: X
type: number
y:
default: 0.0
description: Y coordinate
title: Y
type: number
z:
default: 0.0
description: Z coordinate
title: Z
type: number
title: ResourceDictPositionObject
type: object
ResourceDictPositionScale:
properties:
x:
default: 0.0
description: x scale
title: X
type: number
y:
default: 0.0
description: y scale
title: Y
type: number
z:
default: 0.0
description: z scale
title: Z
type: number
title: ResourceDictPositionScale
type: object
ResourceDictPositionSize:
properties:
depth:
default: 0.0
description: Depth
title: Depth
type: number
height:
default: 0.0
description: Height
title: Height
type: number
width:
default: 0.0
description: Width
title: Width
type: number
title: ResourceDictPositionSize
type: object
properties:
plate:
items: {}
items:
items:
$ref: '#/$defs/ResourceDict'
type: array
title: Plate
type: array
volumes:
items: {}
items:
type: number
title: Volumes
type: array
wells:
items: {}
items:
items:
$ref: '#/$defs/ResourceDict'
type: array
title: Wells
type: array
required:

View File

@@ -5835,6 +5835,25 @@ virtual_workbench:
- material_number
type: object
result:
$defs:
LabSample:
properties:
extra:
additionalProperties: true
title: Extra
type: object
oss_path:
title: Oss Path
type: string
sample_uuid:
title: Sample Uuid
type: string
required:
- sample_uuid
- oss_path
- extra
title: LabSample
type: object
description: move_to_heating_station 返回类型
properties:
material_id:
@@ -5853,12 +5872,18 @@ virtual_workbench:
success:
title: Success
type: boolean
unilabos_samples:
items:
$ref: '#/$defs/LabSample'
title: Unilabos Samples
type: array
required:
- success
- station_id
- material_id
- material_number
- message
- unilabos_samples
title: MoveToHeatingStationResult
type: object
required:
@@ -5903,6 +5928,25 @@ virtual_workbench:
- material_number
type: object
result:
$defs:
LabSample:
properties:
extra:
additionalProperties: true
title: Extra
type: object
oss_path:
title: Oss Path
type: string
sample_uuid:
title: Sample Uuid
type: string
required:
- sample_uuid
- oss_path
- extra
title: LabSample
type: object
description: move_to_output 返回类型
properties:
material_id:
@@ -5914,10 +5958,16 @@ virtual_workbench:
success:
title: Success
type: boolean
unilabos_samples:
items:
$ref: '#/$defs/LabSample'
title: Unilabos Samples
type: array
required:
- success
- station_id
- material_id
- unilabos_samples
title: MoveToOutputResult
type: object
required:
@@ -5972,6 +6022,25 @@ virtual_workbench:
required: []
type: object
result:
$defs:
LabSample:
properties:
extra:
additionalProperties: true
title: Extra
type: object
oss_path:
title: Oss Path
type: string
sample_uuid:
title: Sample Uuid
type: string
required:
- sample_uuid
- oss_path
- extra
title: LabSample
type: object
description: prepare_materials 返回类型 - 批量准备物料
properties:
count:
@@ -5998,6 +6067,11 @@ virtual_workbench:
success:
title: Success
type: boolean
unilabos_samples:
items:
$ref: '#/$defs/LabSample'
title: Unilabos Samples
type: array
required:
- success
- count
@@ -6007,6 +6081,7 @@ virtual_workbench:
- material_4
- material_5
- message
- unilabos_samples
title: PrepareMaterialsResult
type: object
required:
@@ -6015,6 +6090,7 @@ virtual_workbench:
type: object
type: UniLabJsonCommand
auto-start_heating:
always_free: true
feedback: {}
goal: {}
goal_default:
@@ -6062,6 +6138,25 @@ virtual_workbench:
- material_number
type: object
result:
$defs:
LabSample:
properties:
extra:
additionalProperties: true
title: Extra
type: object
oss_path:
title: Oss Path
type: string
sample_uuid:
title: Sample Uuid
type: string
required:
- sample_uuid
- oss_path
- extra
title: LabSample
type: object
description: start_heating 返回类型
properties:
material_id:
@@ -6079,12 +6174,18 @@ virtual_workbench:
success:
title: Success
type: boolean
unilabos_samples:
items:
$ref: '#/$defs/LabSample'
title: Unilabos Samples
type: array
required:
- success
- station_id
- material_id
- material_number
- message
- unilabos_samples
title: StartHeatingResult
type: object
required:

View File

@@ -5,6 +5,7 @@ import sys
import inspect
import importlib
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Union, Tuple
@@ -88,6 +89,14 @@ class Registry:
)
test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。"
test_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("test_resource", {})
test_resource_schema = self._generate_unilab_json_command_schema(
test_resource_method_info.get("args", []),
"test_resource",
test_resource_method_info.get("return_annotation"),
)
test_resource_schema["description"] = "用于测试物料、设备和样本。"
self.device_type_registry.update(
{
"host_node": {
@@ -189,32 +198,7 @@ class Registry:
"goal": {},
"feedback": {},
"result": {},
"schema": {
"description": "",
"properties": {
"feedback": {},
"goal": {
"properties": {
"resource": ros_message_to_json_schema(Resource, "resource"),
"resources": {
"items": {
"properties": ros_message_to_json_schema(
Resource, "resources"
),
"type": "object",
},
"type": "array",
},
"device": {"type": "string"},
"devices": {"items": {"type": "string"}, "type": "array"},
},
"type": "object",
},
"result": {},
},
"title": "test_resource",
"type": "object",
},
"schema": test_resource_schema,
"placeholder_keys": {
"device": "unilabos_devices",
"devices": "unilabos_devices",
@@ -838,6 +822,7 @@ class Registry:
("list", "unilabos.registry.placeholder_type:DeviceSlot"),
]
},
**({"always_free": True} if v.get("always_free") else {}),
}
for k, v in enhanced_info["action_methods"].items()
if k not in device_config["class"]["action_value_mappings"]
@@ -943,6 +928,7 @@ class Registry:
if is_valid:
results.append((file, data, device_ids))
except Exception as e:
traceback.print_exc()
logger.warning(f"[UniLab Registry] 处理设备文件异常: {file}, 错误: {e}")
# 线程安全地更新注册表

View File

@@ -151,12 +151,40 @@ def canonicalize_links_ports(links: List[Dict[str, Any]], resource_tree_set: Res
"""
# 构建 id 到 uuid 的映射
id_to_uuid: Dict[str, str] = {}
uuid_to_id: Dict[str, str] = {}
for node in resource_tree_set.all_nodes:
id_to_uuid[node.res_content.id] = node.res_content.uuid
uuid_to_id[node.res_content.uuid] = node.res_content.id
# 第三遍处理:为每个 link 添加 source_uuid 和 target_uuid
for link in links:
source_id = link.get("source")
target_id = link.get("target")
# 添加 source_uuid
if source_id and source_id in id_to_uuid:
link["source_uuid"] = id_to_uuid[source_id]
# 添加 target_uuid
if target_id and target_id in id_to_uuid:
link["target_uuid"] = id_to_uuid[target_id]
source_uuid = link.get("source_uuid")
target_uuid = link.get("target_uuid")
# 添加 source_uuid
if source_uuid and source_uuid in uuid_to_id:
link["source"] = uuid_to_id[source_uuid]
# 添加 target_uuid
if target_uuid and target_uuid in uuid_to_id:
link["target"] = uuid_to_id[target_uuid]
# 第一遍处理将字符串类型的port转换为字典格式
for link in links:
port = link.get("port")
if port is None:
continue
if link.get("type", "physical") == "physical":
link["type"] = "fluid"
if isinstance(port, int):
@@ -179,13 +207,15 @@ def canonicalize_links_ports(links: List[Dict[str, Any]], resource_tree_set: Res
link["port"] = {link["source"]: None, link["target"]: None}
# 构建边字典,键为(source节点, target节点)值为对应的port信息
edges = {(link["source"], link["target"]): link["port"] for link in links}
edges = {(link["source"], link["target"]): link["port"] for link in links if link.get("port")}
# 第二遍处理填充反向边的dest信息
delete_reverses = []
for i, link in enumerate(links):
s, t = link["source"], link["target"]
current_port = link["port"]
current_port = link.get("port")
if current_port is None:
continue
if current_port.get(t) is None:
reverse_key = (t, s)
reverse_port = edges.get(reverse_key)
@@ -200,20 +230,6 @@ def canonicalize_links_ports(links: List[Dict[str, Any]], resource_tree_set: Res
current_port[t] = current_port[s]
# 删除已被使用反向端口信息的反向边
standardized_links = [link for i, link in enumerate(links) if i not in delete_reverses]
# 第三遍处理:为每个 link 添加 source_uuid 和 target_uuid
for link in standardized_links:
source_id = link.get("source")
target_id = link.get("target")
# 添加 source_uuid
if source_id and source_id in id_to_uuid:
link["source_uuid"] = id_to_uuid[source_id]
# 添加 target_uuid
if target_id and target_id in id_to_uuid:
link["target_uuid"] = id_to_uuid[target_id]
return standardized_links
@@ -284,6 +300,8 @@ def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]]
edge["sourceHandle"] = port[source]
elif "source_port" in edge:
edge["sourceHandle"] = edge.pop("source_port")
elif "source_handle" in edge:
edge["sourceHandle"] = edge.pop("source_handle")
else:
typ = edge.get("type")
if typ == "communication":
@@ -292,6 +310,8 @@ def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]]
edge["targetHandle"] = port[target]
elif "target_port" in edge:
edge["targetHandle"] = edge.pop("target_port")
elif "target_handle" in edge:
edge["targetHandle"] = edge.pop("target_handle")
else:
typ = edge.get("type")
if typ == "communication":

View File

@@ -5,6 +5,8 @@ from pydantic import BaseModel, field_serializer, field_validator, ValidationErr
from pydantic import Field
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
from typing_extensions import TypedDict
from unilabos.resources.plr_additional_res_reg import register
from unilabos.utils.log import logger
@@ -14,6 +16,32 @@ if TYPE_CHECKING:
EXTRA_CLASS = "unilabos_resource_class"
EXTRA_SAMPLE_UUID = "sample_uuid"
EXTRA_UNILABOS_SAMPLE_UUID = "unilabos_sample_uuid"
# 函数参数名常量 - 用于自动注入 sample_uuids 列表
PARAM_SAMPLE_UUIDS = "sample_uuids"
# JSON Command 中的系统参数字段名
JSON_UNILABOS_PARAM = "unilabos_param"
# 返回值中的 samples 字段名
RETURN_UNILABOS_SAMPLES = "unilabos_samples"
# sample_uuids 参数类型 (用于 virtual bench 等设备添加 sample_uuids 参数)
SampleUUIDsType = Dict[str, Optional["PLRResource"]]
class LabSample(TypedDict):
sample_uuid: str
oss_path: str
extra: Dict[str, Any]
class ResourceDictPositionSizeType(TypedDict):
depth: float
width: float
height: float
class ResourceDictPositionSize(BaseModel):
@@ -22,18 +50,40 @@ class ResourceDictPositionSize(BaseModel):
height: float = Field(description="Height", default=0.0) # y
class ResourceDictPositionScaleType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionScale(BaseModel):
x: float = Field(description="x scale", default=0.0)
y: float = Field(description="y scale", default=0.0)
z: float = Field(description="z scale", default=0.0)
class ResourceDictPositionObjectType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionObject(BaseModel):
x: float = Field(description="X coordinate", default=0.0)
y: float = Field(description="Y coordinate", default=0.0)
z: float = Field(description="Z coordinate", default=0.0)
class ResourceDictPositionType(TypedDict):
size: ResourceDictPositionSizeType
scale: ResourceDictPositionScaleType
layout: Literal["2d", "x-y", "z-y", "x-z"]
position: ResourceDictPositionObjectType
position3d: ResourceDictPositionObjectType
rotation: ResourceDictPositionObjectType
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"]
class ResourceDictPosition(BaseModel):
size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize)
scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale)
@@ -52,6 +102,24 @@ class ResourceDictPosition(BaseModel):
)
class ResourceDictType(TypedDict):
id: str
uuid: str
name: str
description: str
resource_schema: Dict[str, Any]
model: Dict[str, Any]
icon: str
parent_uuid: Optional[str]
parent: Optional["ResourceDictType"]
type: Union[Literal["device"], str]
klass: str
pose: ResourceDictPositionType
config: Dict[str, Any]
data: Dict[str, Any]
extra: Dict[str, Any]
# 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化
class ResourceDict(BaseModel):
id: str = Field(description="Resource ID")
@@ -529,6 +597,7 @@ class ResourceTreeSet(object):
plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True)
from pylabrobot.resources import Coordinate
from pylabrobot.serializer import deserialize
location = cast(Coordinate, deserialize(plr_dict["location"]))
plr_resource.location = location
plr_resource.load_all_state(all_states)

View File

@@ -44,8 +44,7 @@ def ros2_device_node(
# 从属性中自动发现可发布状态
if status_types is None:
status_types = {}
if device_config is None:
raise ValueError("device_config cannot be None")
assert device_config is not None, "device_config cannot be None"
if action_value_mappings is None:
action_value_mappings = {}
if hardware_interface is None:

View File

@@ -4,8 +4,20 @@ import json
import threading
import time
import traceback
from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \
Tuple
from typing import (
get_type_hints,
TypeVar,
Generic,
Dict,
Any,
Type,
TypedDict,
Optional,
List,
TYPE_CHECKING,
Union,
Tuple,
)
from concurrent.futures import ThreadPoolExecutor
import asyncio
@@ -48,6 +60,9 @@ from unilabos.resources.resource_tracker import (
ResourceTreeSet,
ResourceTreeInstance,
ResourceDictInstance,
EXTRA_SAMPLE_UUID,
PARAM_SAMPLE_UUIDS,
JSON_UNILABOS_PARAM,
)
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
from rclpy.task import Task, Future
@@ -131,7 +146,7 @@ def init_wrapper(
device_id: str,
device_uuid: str,
driver_class: type[T],
device_config: ResourceTreeInstance,
device_config: ResourceDictInstance,
status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any],
hardware_interface: Dict[str, Any],
@@ -216,14 +231,15 @@ class PropertyPublisher:
def publish_property(self):
try:
self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}")
# self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}")
value = self.get_property()
if self.print_publish:
self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}")
pass
# self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}")
if value is not None:
msg = convert_to_ros_msg(self.msg_type, value)
self.publisher_.publish(msg)
self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功")
# self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功")
except Exception as e:
self.node.lab_logger().error(
f"【.publish_property】发布属性 {self.publisher_.topic} 出错: {str(e)}\n{traceback.format_exc()}"
@@ -263,6 +279,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self,
driver_instance: T,
device_id: str,
registry_name: str,
device_uuid: str,
status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any],
@@ -284,6 +301,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"""
self.driver_instance = driver_instance
self.device_id = device_id
self.registry_name = registry_name
self.uuid = device_uuid
self.publish_high_frequency = False
self.callback_group = ReentrantCallbackGroup()
@@ -361,6 +379,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
from pylabrobot.resources.deck import Deck
from pylabrobot.resources import Coordinate
from pylabrobot.resources import Plate
# 物料传输到对应的node节点
client = self._resource_clients["c2s_update_resource_tree"]
request = SerialCommand.Request()
@@ -388,33 +407,29 @@ class BaseROS2DeviceNode(Node, Generic[T]):
rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources)
parent_resource = None
if bind_parent_id != self.node_name:
parent_resource = self.resource_tracker.figure_resource(
{"name": bind_parent_id}
)
parent_resource = self.resource_tracker.figure_resource({"name": bind_parent_id})
for r in rts.root_nodes:
# noinspection PyUnresolvedReferences
r.res_content.parent_uuid = parent_resource.unilabos_uuid
else:
for r in rts.root_nodes:
r.res_content.parent_uuid = self.uuid
if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer):
rts_plr_instances = rts.to_plr_resources()
if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer):
# noinspection PyTypeChecker
container_instance: RegularContainer = rts.root_nodes[0]
container_instance: RegularContainer = rts_plr_instances[0]
found_resources = self.resource_tracker.figure_resource(
{"id": container_instance.name}, try_mode=True
{"name": container_instance.name}, try_mode=True
)
if not len(found_resources):
self.resource_tracker.add_resource(container_instance)
logger.info(f"添加物料{container_instance.name}到资源跟踪器")
else:
assert (
len(found_resources) == 1
), f"找到多个同名物料: {container_instance.name}, 请检查物料系统"
assert len(found_resources) == 1, f"找到多个同名物料: {container_instance.name}, 请检查物料系统"
found_resource = found_resources[0]
if isinstance(found_resource, RegularContainer):
logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}")
found_resource.state.update(json.loads(container_instance.state))
found_resource.state.update(container_instance.state)
elif isinstance(found_resource, dict):
raise ValueError("已不支持 字典 版本的RegularContainer")
else:
@@ -422,14 +437,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}"
)
# noinspection PyUnresolvedReferences
request.command = json.dumps({
request.command = json.dumps(
{
"action": "add",
"data": {
"data": rts.dump(),
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "",
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else self.uuid,
"first_add": False,
},
})
}
)
tree_response: SerialCommand.Response = await client.call_async(request)
uuid_maps = json.loads(tree_response.response)
plr_instances = rts.to_plr_resources()
@@ -443,7 +460,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
}
res.response = json.dumps(final_response)
# 如果driver自己就有assign的方法那就使用driver自己的assign方法
if hasattr(self.driver_instance, "create_resource"):
if hasattr(self.driver_instance, "create_resource") and self.node_name != "host_node":
create_resource_func = getattr(self.driver_instance, "create_resource")
try:
ret = create_resource_func(
@@ -471,7 +488,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1:
ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT)
LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT)
self.lab_logger().warning(f"增加液体资源时数量为1自动补全为 {len(LIQUID_INPUT_SLOT)}")
self.lab_logger().warning(
f"增加液体资源时数量为1自动补全为 {len(LIQUID_INPUT_SLOT)}"
)
for liquid_type, liquid_volume, liquid_input_slot in zip(
ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT
):
@@ -490,9 +509,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
input_wells = []
for r in LIQUID_INPUT_SLOT:
input_wells.append(plr_instance.children[r])
final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(input_wells).dump()
final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(
input_wells
).dump()
res.response = json.dumps(final_response)
if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param:
if (
issubclass(parent_resource.__class__, Deck)
and hasattr(parent_resource, "assign_child_at_slot")
and "slot" in other_calling_param
):
other_calling_param["slot"] = int(other_calling_param["slot"])
parent_resource.assign_child_at_slot(plr_instance, **other_calling_param)
else:
@@ -507,14 +532,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
rts_with_parent = ResourceTreeSet.from_plr_resources([parent_resource])
if rts_with_parent.root_nodes[0].res_content.uuid_parent is None:
rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid
request.command = json.dumps({
request.command = json.dumps(
{
"action": "add",
"data": {
"data": rts_with_parent.dump(),
"mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent,
"first_add": False,
},
})
}
)
tree_response: SerialCommand.Response = await client.call_async(request)
uuid_maps = json.loads(tree_response.response)
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
@@ -811,7 +838,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
}
def _handle_update(
plr_resources: List[Union[ResourcePLR, ResourceDictInstance]], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
plr_resources: List[Union[ResourcePLR, ResourceDictInstance]],
tree_set: ResourceTreeSet,
additional_add_params: Dict[str, Any],
) -> Tuple[Dict[str, Any], List[ResourcePLR]]:
"""
处理资源更新操作的内部函数
@@ -836,7 +865,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
original_parent_resource = original_instance.parent
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
not_same_parent = original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None
not_same_parent = (
original_parent_resource_uuid != target_parent_resource_uuid
and original_parent_resource is not None
)
old_name = original_instance.name
new_name = plr_resource.name
parent_appended = False
@@ -872,8 +904,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
else:
# 判断是否变更了resource_site重新登记
target_site = original_instance.unilabos_extra.get("update_resource_site")
sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None
site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else []
sites = (
original_instance.parent.sites
if original_instance.parent is not None and hasattr(original_instance.parent, "sites")
else None
)
site_names = (
list(original_instance.parent._ordering.keys())
if original_instance.parent is not None and hasattr(original_instance.parent, "sites")
else []
)
if target_site is not None and sites is not None and site_names is not None:
site_index = sites.index(original_instance)
site_name = site_names[site_index]
@@ -910,9 +950,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
action = i.get("action") # remove, add, update
resources_uuid: List[str] = i.get("data") # 资源数据
additional_add_params = i.get("additional_add_params", {}) # 额外参数
self.lab_logger().trace(
f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}"
)
self.lab_logger().trace(f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}")
tree_set = None
if action in ["add", "update"]:
tree_set = await self.get_resource(
@@ -939,9 +977,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
tree.root_node.res_content.parent_uuid = self.uuid
r = SerialCommand.Request()
r.command = json.dumps(
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
{"data": {"data": new_tree_set.dump()}, "action": "update"}
) # 和Update Resource一致
response: SerialCommand_Response = await self._resource_clients[
"c2s_update_resource_tree"].call_async(r) # type: ignore
"c2s_update_resource_tree"
].call_async(
r
) # type: ignore
self.lab_logger().info(f"确认资源云端 Add 结果: {response.response}")
results.append(result)
elif action == "update":
@@ -961,9 +1003,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
tree.root_node.res_content.parent_uuid = self.uuid
r = SerialCommand.Request()
r.command = json.dumps(
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
{"data": {"data": new_tree_set.dump()}, "action": "update"}
) # 和Update Resource一致
response: SerialCommand_Response = await self._resource_clients[
"c2s_update_resource_tree"].call_async(r) # type: ignore
"c2s_update_resource_tree"
].call_async(
r
) # type: ignore
self.lab_logger().info(f"确认资源云端 Update 结果: {response.response}")
results.append(result)
elif action == "remove":
@@ -1110,6 +1156,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"machine_name": BasicConfig.machine_name,
"type": "slave",
"edge_device_id": self.device_id,
"registry_name": self.registry_name,
}
},
ensure_ascii=False,
@@ -1333,7 +1380,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
resource_id=resource_data["id"], with_children=True
)
if "sample_id" in resource_data:
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"]
queried_resources[idx] = plr_resource
else:
uuid_indices.append((idx, unilabos_uuid, resource_data))
@@ -1346,7 +1393,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
for i, (idx, _, resource_data) in enumerate(uuid_indices):
plr_resource = plr_resources[i]
if "sample_id" in resource_data:
plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"]
plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"]
queried_resources[idx] = plr_resource
self.lab_logger().debug(f"资源查询结果: 共 {len(queried_resources)} 个资源")
@@ -1354,7 +1401,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 通过资源跟踪器获取本地实例
final_resources = queried_resources if is_sequence else queried_resources[0]
if not is_sequence:
plr = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)
plr = self.resource_tracker.figure_resource(
{"name": final_resources.name}, try_mode=False
)
# 保留unilabos_extra
if hasattr(final_resources, "unilabos_extra") and hasattr(plr, "unilabos_extra"):
plr.unilabos_extra = getattr(final_resources, "unilabos_extra", {}).copy()
@@ -1393,8 +1442,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
execution_success = True
except Exception as _:
execution_error = traceback.format_exc()
error(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}")
trace(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}")
error(
f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}"
)
trace(
f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
)
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs)
future.add_done_callback(_handle_future_exception)
@@ -1414,9 +1467,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
except Exception as _:
execution_error = traceback.format_exc()
error(
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}")
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}"
)
trace(
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}")
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
)
future.add_done_callback(_handle_future_exception)
@@ -1483,8 +1538,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if isinstance(rs, list):
for r in rs:
res = self.resource_tracker.parent_resource(r) # 获取 resource 对象
if res is None:
res = rs
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)
else:
res = self.resource_tracker.parent_resource(rs)
if res is None:
res = rs
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)
@@ -1539,20 +1601,37 @@ class BaseROS2DeviceNode(Node, Generic[T]):
try:
function_name = target["function_name"]
function_args = target["function_args"]
# 获取 unilabos 系统参数
unilabos_param: Dict[str, Any] = target[JSON_UNILABOS_PARAM]
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
function = getattr(self.driver_instance, function_name)
assert callable(
function
), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}"
# 处理 ResourceSlot 类型参数
args_list = default_manager._analyze_method_signature(function)["args"]
# 处理参数(包含 unilabos 系统参数如 sample_uuids
args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"]
for arg in args_list:
arg_name = arg["name"]
arg_type = arg["type"]
# 跳过不在 function_args 中的参数
if arg_name not in function_args:
# 处理 sample_uuids 参数注入
if arg_name == PARAM_SAMPLE_UUIDS:
raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {})
# 将 material uuid 转换为 resource 实例
# key: sample_uuid, value: material_uuid -> resource 实例
resolved_sample_uuids: Dict[str, Any] = {}
for sample_uuid, material_uuid in raw_sample_uuids.items():
if material_uuid and self.resource_tracker:
resource = self.resource_tracker.uuid_to_resources.get(material_uuid)
resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid
else:
resolved_sample_uuids[sample_uuid] = material_uuid
function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids
self.lab_logger().debug(f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}")
continue
# 处理单个 ResourceSlot
@@ -1581,6 +1660,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
)
raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}")
# todo: 默认反报送
return function(**function_args)
except KeyError as ex:
@@ -1601,14 +1681,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raise ValueError("至少需要提供一个 UUID")
uuids_list = list(uuids)
future = self._resource_clients["c2s_update_resource_tree"].call_async(SerialCommand.Request(
future = self._resource_clients["c2s_update_resource_tree"].call_async(
SerialCommand.Request(
command=json.dumps(
{
"data": {"data": uuids_list, "with_children": True},
"action": "get",
}
)
))
)
)
# 等待结果使用while循环每次sleep 0.05秒最多等待30秒
timeout = 30.0
@@ -1666,6 +1748,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
try:
function_name = target["function_name"]
function_args = target["function_args"]
# 获取 unilabos 系统参数
unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {})
assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}"
function = getattr(self.driver_instance, function_name)
assert callable(
@@ -1675,14 +1760,30 @@ class BaseROS2DeviceNode(Node, Generic[T]):
function
), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}"
# 处理 ResourceSlot 类型参数
args_list = default_manager._analyze_method_signature(function)["args"]
# 处理参数(包含 unilabos 系统参数如 sample_uuids
args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"]
for arg in args_list:
arg_name = arg["name"]
arg_type = arg["type"]
# 跳过不在 function_args 中的参数
if arg_name not in function_args:
# 处理 sample_uuids 参数注入
if arg_name == PARAM_SAMPLE_UUIDS:
raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {})
# 将 material uuid 转换为 resource 实例
# key: sample_uuid, value: material_uuid -> resource 实例
resolved_sample_uuids: Dict[str, Any] = {}
for sample_uuid, material_uuid in raw_sample_uuids.items():
if material_uuid and self.resource_tracker:
resource = self.resource_tracker.uuid_to_resources.get(material_uuid)
resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid
else:
resolved_sample_uuids[sample_uuid] = material_uuid
function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids
self.lab_logger().debug(
f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}"
)
continue
# 处理单个 ResourceSlot
@@ -1907,6 +2008,7 @@ class ROS2DeviceNode:
if driver_is_ros:
driver_params["device_id"] = device_id
driver_params["registry_name"] = device_config.res_content.klass
driver_params["resource_tracker"] = self.resource_tracker
self._driver_instance = self._driver_creator.create_instance(driver_params)
if self._driver_instance is None:
@@ -1924,6 +2026,7 @@ class ROS2DeviceNode:
children=children,
driver_instance=self._driver_instance, # type: ignore
device_id=device_id,
registry_name=device_config.res_content.klass,
device_uuid=device_uuid,
status_types=status_types,
action_value_mappings=action_value_mappings,
@@ -1935,6 +2038,7 @@ class ROS2DeviceNode:
self._ros_node = BaseROS2DeviceNode(
driver_instance=self._driver_instance,
device_id=device_id,
registry_name=device_config.res_content.klass,
device_uuid=device_uuid,
status_types=status_types,
action_value_mappings=action_value_mappings,
@@ -1943,6 +2047,7 @@ class ROS2DeviceNode:
resource_tracker=self.resource_tracker,
)
self._ros_node: BaseROS2DeviceNode
# 将注册表类型名传递给BaseROS2DeviceNode,用于slave上报
self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}")
self.driver_instance._ros_node = self._ros_node # type: ignore
self.driver_instance._execute_driver_command = self._ros_node._execute_driver_command # type: ignore
@@ -1960,7 +2065,9 @@ class ROS2DeviceNode:
asyncio.set_event_loop(loop)
loop.run_forever()
ROS2DeviceNode._asyncio_loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNode")
ROS2DeviceNode._asyncio_loop_thread = threading.Thread(
target=run_event_loop, daemon=True, name="ROS2DeviceNode"
)
ROS2DeviceNode._asyncio_loop_thread.start()
logger.info(f"循环线程已启动")

View File

@@ -6,12 +6,13 @@ from cv_bridge import CvBridge
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker
class VideoPublisher(BaseROS2DeviceNode):
def __init__(self, device_id='video_publisher', device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None):
def __init__(self, device_id='video_publisher', registry_name="", device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None):
# 初始化BaseROS2DeviceNode使用自身作为driver_instance
BaseROS2DeviceNode.__init__(
self,
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
device_uuid=device_uuid,
status_types={},
action_value_mappings={},

View File

@@ -10,6 +10,7 @@ class ControllerNode(BaseROS2DeviceNode):
def __init__(
self,
device_id: str,
registry_name: str,
controller_func: Callable,
update_rate: float,
inputs: Dict[str, Dict[str, type | str]],
@@ -51,6 +52,7 @@ class ControllerNode(BaseROS2DeviceNode):
self,
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
status_types=status_types,
action_value_mappings=action_value_mappings,
hardware_interface=hardware_interface,

View File

@@ -1,17 +1,17 @@
import collections
from dataclasses import dataclass, field
import json
import threading
import time
import traceback
import uuid
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union
from typing_extensions import TypedDict
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Point
from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.service import Service
from typing_extensions import TypedDict
from unilabos_msgs.msg import Resource # type: ignore
from unilabos_msgs.srv import (
ResourceAdd,
@@ -23,10 +23,20 @@ from unilabos_msgs.srv import (
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.registry.registry import lab_registry
from unilabos.resources.container import RegularContainer
from unilabos.resources.graphio import initialize_resource
from unilabos.resources.registry import add_schema
from unilabos.resources.resource_tracker import (
ResourceDict,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
RETURN_UNILABOS_SAMPLES,
JSON_UNILABOS_PARAM,
PARAM_SAMPLE_UUIDS, SampleUUIDsType, LabSample,
)
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
get_msg_type,
@@ -37,17 +47,11 @@ from unilabos.ros.msgs.message_converter import (
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.ros.nodes.presets.controller_node import ControllerNode
from unilabos.resources.resource_tracker import (
ResourceDict,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
)
from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.log import warning
from unilabos.utils.type_check import serialize_result_info
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.config.config import BasicConfig
if TYPE_CHECKING:
from unilabos.app.ws_client import QueueItem
@@ -60,7 +64,8 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]]
devices: List[DeviceSlot]
devices: List[Dict[str, Any]]
unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict):
@@ -245,6 +250,7 @@ class HostNode(BaseROS2DeviceNode):
self,
driver_instance=self,
device_id=device_id,
registry_name="host_node",
device_uuid=host_node_dict["uuid"],
status_types={},
action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"],
@@ -299,7 +305,8 @@ class HostNode(BaseROS2DeviceNode):
} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = (
{}
) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
) # device_id -> action_value_mappings(本地+远程设备统一存储)
self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings)
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
self._last_discovery_time = 0.0 # 上次设备发现的时间
@@ -633,6 +640,8 @@ class HostNode(BaseROS2DeviceNode):
self.device_machine_names[device_id] = "本地"
self.devices_instances[device_id] = d
# noinspection PyProtectedMember
self._action_value_mappings[device_id] = d._ros_node._action_value_mappings
# noinspection PyProtectedMember
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith(
"UniLabJsonCommand"
@@ -755,6 +764,7 @@ class HostNode(BaseROS2DeviceNode):
item: "QueueItem",
action_type: str,
action_kwargs: Dict[str, Any],
sample_material: Dict[str, str],
server_info: Optional[Dict[str, Any]] = None,
) -> None:
"""
@@ -768,18 +778,29 @@ class HostNode(BaseROS2DeviceNode):
u = uuid.UUID(item.job_id)
device_id = item.device_id
action_name = item.action_name
if BasicConfig.test_mode:
action_id = f"/devices/{device_id}/{action_name}"
self.lab_logger().info(
f"[TEST MODE] 模拟执行: {action_id} (job={item.job_id[:8]}), 参数: {str(action_kwargs)[:500]}"
)
# 根据注册表 handles 构建模拟返回值
mock_return = self._build_test_mode_return(device_id, action_name, action_kwargs)
self._handle_test_mode_result(item, action_id, mock_return)
return
if action_type.startswith("UniLabJsonCommand"):
if action_name.startswith("auto-"):
action_name = action_name[5:]
action_id = f"/devices/{device_id}/_execute_driver_command"
action_kwargs = {
"string": json.dumps(
{
json_command: Dict[str, Any] = {
"function_name": action_name,
"function_args": action_kwargs,
JSON_UNILABOS_PARAM: {
PARAM_SAMPLE_UUIDS: sample_material,
},
}
)
}
action_kwargs = {"string": json.dumps(json_command)}
if action_type.startswith("UniLabJsonCommandAsync"):
action_id = f"/devices/{device_id}/_execute_driver_command_async"
else:
@@ -790,21 +811,6 @@ class HostNode(BaseROS2DeviceNode):
raise ValueError(f"ActionClient {action_id} not found.")
action_client: ActionClient = self._action_clients[action_id]
# 遍历action_kwargs下的所有子dict将"sample_uuid"的值赋给"sample_id"
def assign_sample_id(obj):
if isinstance(obj, dict):
if "sample_uuid" in obj:
obj["sample_id"] = obj["sample_uuid"]
obj.pop("sample_uuid")
for k, v in obj.items():
if k != "unilabos_extra":
assign_sample_id(v)
elif isinstance(obj, list):
for item in obj:
assign_sample_id(item)
assign_sample_id(action_kwargs)
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
# self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}")
@@ -820,6 +826,51 @@ class HostNode(BaseROS2DeviceNode):
)
future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f))
def _build_test_mode_return(
self, device_id: str, action_name: str, action_kwargs: Dict[str, Any]
) -> Dict[str, Any]:
"""
根据注册表 handles 的 output 定义构建测试模式的模拟返回值
根据 data_key 中 @flatten 的层数决定嵌套数组层数,叶子值为空字典。
例如: "vessel"{}, "plate.@flatten" → [{}], "a.@flatten.@flatten" → [[{}]]
"""
mock_return: Dict[str, Any] = {"test_mode": True, "action_name": action_name}
action_mappings = self._action_value_mappings.get(device_id, {})
action_mapping = action_mappings.get(action_name, {})
handles = action_mapping.get("handles", {})
if isinstance(handles, dict):
for output_handle in handles.get("output", []):
data_key = output_handle.get("data_key", "")
handler_key = output_handle.get("handler_key", "")
# 根据 @flatten 层数构建嵌套数组,叶子为空字典
flatten_count = data_key.count("@flatten")
value: Any = {}
for _ in range(flatten_count):
value = [value]
mock_return[handler_key] = value
return mock_return
def _handle_test_mode_result(
self, item: "QueueItem", action_id: str, mock_return: Dict[str, Any]
) -> None:
"""
测试模式下直接构建结果并走正常的结果回调流程(跳过 ROS
"""
job_id = item.job_id
status = "success"
return_info = serialize_result_info("", True, mock_return)
self.lab_logger().info(f"[TEST MODE] Result for {action_id} ({job_id[:8]}): {status}")
from unilabos.app.web.controller import store_job_result
store_job_result(job_id, status, return_info, mock_return)
# 发布状态到桥接器
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(mock_return, item, status, return_info)
def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""目标响应回调"""
goal_handle = future.result()
@@ -867,14 +918,14 @@ class HostNode(BaseROS2DeviceNode):
# 适配后端的一些额外处理
return_value = return_info.get("return_value")
if isinstance(return_value, dict):
unilabos_samples = return_value.pop("unilabos_samples", None)
unilabos_samples = return_value.pop(RETURN_UNILABOS_SAMPLES, None)
if isinstance(unilabos_samples, list) and unilabos_samples:
self.lab_logger().info(
f"[Host Node] Job {job_id[:8]} returned {len(unilabos_samples)} sample(s): "
f"{[s.get('name', s.get('id', 'unknown')) if isinstance(s, dict) else str(s)[:20] for s in unilabos_samples[:5]]}"
f"{'...' if len(unilabos_samples) > 5 else ''}"
)
return_info["unilabos_samples"] = unilabos_samples
return_info["samples"] = unilabos_samples
suc = return_info.get("suc", False)
if not suc:
status = "failed"
@@ -1179,6 +1230,10 @@ class HostNode(BaseROS2DeviceNode):
def _node_info_update_callback(self, request, response):
"""
更新节点信息回调
处理两种消息:
1. 首次上报(main_slave_run): 带 devices_config + registry_config,存储 action_value_mappings
2. 设备重注册(SYNC_SLAVE_NODE_INFO): 带 edge_device_id + registry_name,用 registry_name 索引已存储的 mappings
"""
self.lab_logger().trace(f"[Host Node] Node info update request received: {request}")
try:
@@ -1190,12 +1245,48 @@ class HostNode(BaseROS2DeviceNode):
info = info["SYNC_SLAVE_NODE_INFO"]
machine_name = info["machine_name"]
edge_device_id = info["edge_device_id"]
registry_name = info.get("registry_name", "")
self.device_machine_names[edge_device_id] = machine_name
# 用 registry_name 索引已存储的 registry_config,获取 action_value_mappings
if registry_name and registry_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[registry_name].get(
"class", {}
).get("action_value_mappings", {})
if action_mappings:
self._action_value_mappings[edge_device_id] = action_mappings
self.lab_logger().info(
f"[Host Node] Loaded {len(action_mappings)} action mappings "
f"for remote device {edge_device_id} (registry: {registry_name})"
)
else:
devices_config = info.pop("devices_config")
registry_config = info.pop("registry_config")
if registry_config:
http_client.resource_registry({"resources": registry_config})
# 存储 slave 的 registry_config,用于后续 SYNC_SLAVE_NODE_INFO 索引
for reg_name, reg_data in registry_config.items():
if isinstance(reg_data, dict) and "class" in reg_data:
self._slave_registry_configs[reg_name] = reg_data
# 解析 devices_config,建立 device_id -> action_value_mappings 映射
if devices_config:
for device_tree in devices_config:
for device_dict in device_tree:
device_id = device_dict.get("id", "")
class_name = device_dict.get("class", "")
if device_id and class_name and class_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[class_name].get(
"class", {}
).get("action_value_mappings", {})
if action_mappings:
self._action_value_mappings[device_id] = action_mappings
self.lab_logger().info(
f"[Host Node] Stored {len(action_mappings)} action mappings "
f"for remote device {device_id} (class: {class_name})"
)
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK"
except Exception as e:
@@ -1492,6 +1583,7 @@ class HostNode(BaseROS2DeviceNode):
def test_resource(
self,
sample_uuids: SampleUUIDsType,
resource: ResourceSlot = None,
resources: List[ResourceSlot] = None,
device: DeviceSlot = None,
@@ -1506,6 +1598,7 @@ class HostNode(BaseROS2DeviceNode):
return {
"resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(),
"devices": [device, *devices],
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
}
def handle_pong_response(self, pong_data: dict):

View File

@@ -7,10 +7,11 @@ from rclpy.callback_groups import ReentrantCallbackGroup
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class JointRepublisher(BaseROS2DeviceNode):
def __init__(self,device_id,resource_tracker, **kwargs):
def __init__(self,device_id, registry_name, resource_tracker, **kwargs):
super().__init__(
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
status_types={},
action_value_mappings={},
hardware_interface={},

View File

@@ -26,7 +26,7 @@ from unilabos.resources.graphio import initialize_resources
from unilabos.registry.registry import lab_registry
class ResourceMeshManager(BaseROS2DeviceNode):
def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", rate=50, **kwargs):
def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", registry_name: str = "", rate=50, **kwargs):
"""初始化资源网格管理器节点
Args:
@@ -37,6 +37,7 @@ class ResourceMeshManager(BaseROS2DeviceNode):
super().__init__(
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
status_types={},
action_value_mappings={},
hardware_interface={},

View File

@@ -7,7 +7,7 @@ from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeRe
class ROS2SerialNode(BaseROS2DeviceNode):
def __init__(self, device_id, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None):
def __init__(self, device_id, registry_name, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None):
# 保存属性,以便在调用父类初始化前使用
self.port = port
self.baudrate = baudrate
@@ -28,6 +28,7 @@ class ROS2SerialNode(BaseROS2DeviceNode):
BaseROS2DeviceNode.__init__(
self,
driver_instance=self,
registry_name=registry_name,
device_id=device_id,
status_types={},
action_value_mappings={},

View File

@@ -47,6 +47,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
*,
driver_instance: "WorkstationBase",
device_id: str,
registry_name: str,
device_uuid: str,
status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any],
@@ -62,6 +63,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
super().__init__(
driver_instance=driver_instance,
device_id=device_id,
registry_name=registry_name,
device_uuid=device_uuid,
status_types=status_types,
action_value_mappings={**action_value_mappings, **self.protocol_action_mappings},
@@ -340,6 +342,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
plr = self.resource_tracker.figure_resource({"name": res_name}, try_mode=False)
# 获取父资源
res = self.resource_tracker.parent_resource(plr)
if res is None:
res = plr
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)

View File

@@ -52,7 +52,8 @@ class DeviceClassCreator(Generic[T]):
if self.device_instance is not None:
for c in self.children:
if c.res_content.type != "device":
self.resource_tracker.add_resource(c.get_plr_nested_dict())
res = ResourceTreeSet([ResourceTreeInstance(c)]).to_plr_resources()[0]
self.resource_tracker.add_resource(res)
def create_instance(self, data: Dict[str, Any]) -> T:
"""
@@ -119,7 +120,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
# return resource, source_type
def _process_resource_references(
self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None
self, data: Any, processed_child_names: Optional[Dict[str, Any]], to_dict=False, states=None, prefix_path="", name_to_uuid=None
) -> Any:
"""
递归处理资源引用替换_resource_child_name对应的资源
@@ -164,6 +165,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
states[prefix_path] = resource_instance.serialize_all_state()
return serialized
else:
processed_child_names[child_name] = resource_instance
self.resource_tracker.add_resource(resource_instance)
# 立即设置UUIDstate已经在resource_ulab_to_plr中处理过了
if name_to_uuid:
@@ -182,12 +184,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
result = {}
for key, value in data.items():
new_prefix = f"{prefix_path}.{key}" if prefix_path else key
result[key] = self._process_resource_references(value, to_dict, states, new_prefix, name_to_uuid)
result[key] = self._process_resource_references(value, processed_child_names, to_dict, states, new_prefix, name_to_uuid)
return result
elif isinstance(data, list):
return [
self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid)
self._process_resource_references(item, processed_child_names, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid)
for i, item in enumerate(data)
]
@@ -234,7 +236,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
# 首先处理资源引用
states = {}
processed_data = self._process_resource_references(
data, to_dict=True, states=states, name_to_uuid=name_to_uuid
data, {}, to_dict=True, states=states, name_to_uuid=name_to_uuid
)
try:
@@ -270,7 +272,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
arg_value = spec_args[param_name].annotation
data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value
logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}")
processed_data = self._process_resource_references(data, to_dict=False, name_to_uuid=name_to_uuid)
processed_child_names = {}
processed_data = self._process_resource_references(data, processed_child_names, to_dict=False, name_to_uuid=name_to_uuid)
for child_name, resource_instance in processed_data.items():
for ind, name in enumerate([child.res_content.name for child in self.children]):
if name == child_name:
self.children.pop(ind)
self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) # 补全变量后直接调用调用的自身的attach_resource
except Exception as e:
logger.error(f"PyLabRobot创建实例失败: {e}")
@@ -342,9 +349,10 @@ class WorkstationNodeCreator(DeviceClassCreator[T]):
try:
# 创建实例额外补充一个给protocol node的字段后面考虑取消
data["children"] = self.children
for child in self.children:
if child.res_content.type != "device":
self.resource_tracker.add_resource(child.get_plr_nested_dict())
# super(WorkstationNodeCreator, self).create_instance(data)的时候会attach
# for child in self.children:
# if child.res_content.type != "device":
# self.resource_tracker.add_resource(child.get_plr_nested_dict())
deck_dict = data.get("deck")
if deck_dict:
from pylabrobot.resources import Deck, Resource

View File

@@ -339,13 +339,8 @@
"z": 0
},
"config": {
"max_volume": 500.0,
"type": "RegularContainer",
"category": "container",
"max_temp": 200.0,
"min_temp": -20.0,
"has_stirrer": true,
"has_heater": true
"category": "container"
},
"data": {
"liquids": [],
@@ -769,9 +764,7 @@
"size_y": 250,
"size_z": 0,
"type": "RegularContainer",
"category": "container",
"reagent": "sodium_chloride",
"physical_state": "solid"
"category": "container"
},
"data": {
"current_mass": 500.0,
@@ -792,14 +785,11 @@
"z": 0
},
"config": {
"volume": 500.0,
"size_x": 600,
"size_y": 250,
"size_z": 0,
"type": "RegularContainer",
"category": "container",
"reagent": "sodium_carbonate",
"physical_state": "solid"
"category": "container"
},
"data": {
"current_mass": 500.0,
@@ -820,14 +810,11 @@
"z": 0
},
"config": {
"volume": 500.0,
"size_x": 650,
"size_y": 250,
"size_z": 0,
"type": "RegularContainer",
"category": "container",
"reagent": "magnesium_chloride",
"physical_state": "solid"
"category": "container"
},
"data": {
"current_mass": 500.0,

View File

@@ -184,6 +184,51 @@ def get_all_subscriptions(instance) -> list:
return subscriptions
def always_free(func: F) -> F:
"""
标记动作为永久闲置(不受busy队列限制)的装饰器
被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制,
任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。
Example:
class MyDriver:
@always_free
def query_status(self, param: str):
# 这个动作可以随时执行,不需要排队
return self._status
def transfer(self, volume: float):
# 这个动作会按正常排队逻辑执行
pass
Note:
- 可以与其他装饰器组合使用,@always_free 应放在最外层
- 仅影响 WebSocket 调度层的 busy/free 判断,不影响 ROS2 层
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._is_always_free = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_always_free(func) -> bool:
"""
检查函数是否被标记为永久闲置
Args:
func: 被检查的函数
Returns:
如果函数被 @always_free 装饰则返回 True否则返回 False
"""
return getattr(func, "_is_always_free", False)
def not_action(func: F) -> F:
"""
标记方法为非动作的装饰器

View File

@@ -27,8 +27,9 @@ __all__ = [
from ast import Constant
from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS
from unilabos.utils import logger
from unilabos.utils.decorator import is_not_action
from unilabos.utils.decorator import is_not_action, is_always_free
class ImportManager:
@@ -281,6 +282,9 @@ class ImportManager:
continue
# 其他非_开头的方法归类为action
method_info = self._analyze_method_signature(method)
# 检查是否被 @always_free 装饰器标记
if is_always_free(method):
method_info["always_free"] = True
result["action_methods"][name] = method_info
return result
@@ -338,16 +342,24 @@ class ImportManager:
if self._is_not_action_method(node):
continue
# 其他非_开头的方法归类为action
# 检查是否被 @always_free 装饰器标记
if self._is_always_free_method(node):
method_info["always_free"] = True
result["action_methods"][method_name] = method_info
return result
def _analyze_method_signature(self, method) -> Dict[str, Any]:
def _analyze_method_signature(self, method, skip_unilabos_params: bool = True) -> Dict[str, Any]:
"""
分析方法签名,提取具体的命名参数信息
注意:此方法会跳过*args和**kwargs只提取具体的命名参数
这样可以确保通过**dict方式传参时的准确性
Args:
method: 要分析的方法
skip_unilabos_params: 是否跳过 unilabos 系统参数(如 sample_uuids
registry 补全时为 TrueJsonCommand 执行时为 False
示例用法:
method_info = self._analyze_method_signature(some_method)
params = {"param1": "value1", "param2": "value2"}
@@ -368,6 +380,10 @@ class ImportManager:
if param.kind == param.VAR_KEYWORD: # **kwargs
continue
# 跳过 sample_uuids 参数由系统自动注入registry 补全时跳过)
if skip_unilabos_params and param_name == PARAM_SAMPLE_UUIDS:
continue
is_required = param.default == inspect.Parameter.empty
if is_required:
num_required += 1
@@ -464,6 +480,13 @@ class ImportManager:
return True
return False
def _is_always_free_method(self, node: ast.FunctionDef) -> bool:
"""检查是否是@always_free装饰的方法"""
for decorator in node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id == "always_free":
return True
return False
def _get_property_name_from_setter(self, node: ast.FunctionDef) -> str:
"""从setter装饰器中获取属性名"""
for decorator in node.decorator_list:
@@ -563,6 +586,9 @@ class ImportManager:
for i, arg in enumerate(node.args.args):
if arg.arg == "self":
continue
# 跳过 sample_uuids 参数(由系统自动注入)
if arg.arg == PARAM_SAMPLE_UUIDS:
continue
arg_info = {
"name": arg.arg,
"type": None,

View File

@@ -193,6 +193,7 @@ def configure_logger(loglevel=None, working_dir=None):
root_logger.addHandler(console_handler)
# 如果指定了工作目录,添加文件处理器
log_filepath = None
if working_dir is not None:
logs_dir = os.path.join(working_dir, "logs")
os.makedirs(logs_dir, exist_ok=True)
@@ -213,6 +214,7 @@ def configure_logger(loglevel=None, working_dir=None):
logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
return log_filepath

View File

@@ -60,7 +60,11 @@
==================== 连接关系图 ====================
控制流 (ready 端口串联):
create_resource_1 -> create_resource_2 -> ... -> set_liquid_1 -> set_liquid_2 -> ... -> transfer_liquid_1 -> transfer_liquid_2 -> ...
- create_resource 之间: 无 ready 连接
- set_liquid_from_plate 之间: 无 ready 连接
- create_resource 与 set_liquid_from_plate 之间: 无 ready 连接
- transfer_liquid 之间: 通过 ready 端口串联
transfer_liquid_1 -> transfer_liquid_2 -> transfer_liquid_3 -> ...
物料流:
[create_resource] --labware--> [set_liquid_from_plate] --output_wells--> [transfer_liquid] --sources_out/targets_out--> [下一个 transfer_liquid]
@@ -358,14 +362,16 @@ def build_protocol_graph(
protocol_steps: List[Dict[str, Any]],
workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None,
labware_defs: Optional[List[Dict[str, Any]]] = None,
) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑
Args:
labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...}
labware_info: reagent 信息字典,格式为 {name: {slot, well}, ...},用于 set_liquid 和 well 查找
protocol_steps: 协议步骤列表
workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选
labware_defs: labware 定义列表,格式为 [{"name": "...", "slot": "1", "type": "lab_xxx"}, ...]
"""
G = WorkflowGraph()
resource_last_writer = {} # reagent_name -> "node_id:port"
@@ -373,18 +379,7 @@ def build_protocol_graph(
protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
# ==================== 第一步:按 slot 去重创建 create_resource 节点 ====================
# 收集所有唯一的 slot
slots_info = {} # slot -> {labware, res_id}
for labware_id, item in labware_info.items():
slot = str(item.get("slot", ""))
if slot and slot not in slots_info:
res_id = f"plate_slot_{slot}"
slots_info[slot] = {
"labware": item.get("labware", ""),
"res_id": res_id,
}
# ==================== 第一步:按 slot 创建 create_resource 节点 ====================
# 创建 Group 节点,包含所有 create_resource 节点
group_node_id = str(uuid.uuid4())
G.add_node(
@@ -400,30 +395,35 @@ def build_protocol_graph(
param=None,
)
# 为每个唯一的 slot 创建 create_resource 节点
# 直接使用 JSON 中的 labware 定义,每个 slot 一条记录type 即 class_name
res_index = 0
last_create_resource_id = None
for slot, info in slots_info.items():
node_id = str(uuid.uuid4())
res_id = info["res_id"]
for lw in (labware_defs or []):
slot = str(lw.get("slot", ""))
if not slot or slot in slot_to_create_resource:
continue # 跳过空 slot 或已处理的 slot
lw_name = lw.get("name", f"slot {slot}")
lw_type = lw.get("type", CREATE_RESOURCE_DEFAULTS["class_name"])
res_id = f"plate_slot_{slot}"
res_index += 1
node_id = str(uuid.uuid4())
G.add_node(
node_id,
template_name="create_resource",
resource_name="host_node",
name=f"Plate {res_index}",
description=f"Create plate on slot {slot}",
name=lw_name,
description=f"Create {lw_name}",
lab_node_type="Labware",
footer="create_resource-host_node",
device_name=DEVICE_NAME_HOST,
type=NODE_TYPE_DEFAULT,
parent_uuid=group_node_id, # 指向 Group 节点
minimized=True, # 折叠显示
parent_uuid=group_node_id,
minimized=True,
param={
"res_id": res_id,
"device_id": CREATE_RESOURCE_DEFAULTS["device_id"],
"class_name": CREATE_RESOURCE_DEFAULTS["class_name"],
"class_name": lw_type,
"parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot),
"bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0},
"slot_on_deck": slot,
@@ -431,11 +431,6 @@ def build_protocol_graph(
)
slot_to_create_resource[slot] = node_id
# create_resource 之间通过 ready 串联
if last_create_resource_id is not None:
G.add_edge(last_create_resource_id, node_id, source_port="ready", target_port="ready")
last_create_resource_id = node_id
# ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ====================
# 创建 Group 节点,包含所有 set_liquid_from_plate 节点
set_liquid_group_id = str(uuid.uuid4())
@@ -453,7 +448,6 @@ def build_protocol_graph(
)
set_liquid_index = 0
last_set_liquid_id = last_create_resource_id # set_liquid_from_plate 连接在 create_resource 之后
for labware_id, item in labware_info.items():
# 跳过 Tip/Rack 类型
@@ -494,10 +488,7 @@ def build_protocol_graph(
},
)
# ready 连接:上一个节点 -> set_liquid_from_plate
if last_set_liquid_id is not None:
G.add_edge(last_set_liquid_id, node_id, source_port="ready", target_port="ready")
last_set_liquid_id = node_id
# set_liquid_from_plate 之间不需要 ready 连接
# 物料流create_resource 的 labware -> set_liquid_from_plate 的 input_plate
create_res_node_id = slot_to_create_resource.get(slot)
@@ -507,7 +498,8 @@ def build_protocol_graph(
# set_liquid_from_plate 的输出 output_wells 用于连接 transfer_liquid
resource_last_writer[labware_id] = f"{node_id}:output_wells"
last_control_node_id = last_set_liquid_id
# transfer_liquid 之间通过 ready 串联,从 None 开始
last_control_node_id = None
# 端口名称映射JSON 字段名 -> 实际 handle key
INPUT_PORT_MAPPING = {

View File

@@ -1,16 +1,20 @@
"""
JSON 工作流转换模块
将 workflow/reagent 格式的 JSON 转换为统一工作流格式。
将 workflow/reagent/labware 格式的 JSON 转换为统一工作流格式。
输入格式:
{
"labware": [
{"name": "...", "slot": "1", "type": "lab_xxx"},
...
],
"workflow": [
{"action": "...", "action_args": {...}},
...
],
"reagent": {
"reagent_name": {"slot": int, "well": [...], "labware": "..."},
"reagent_name": {"slot": int, "well": [...]},
...
}
}
@@ -245,18 +249,18 @@ def convert_from_json(
if "workflow" not in json_data or "reagent" not in json_data:
raise ValueError(
"不支持的 JSON 格式。请使用标准格式:\n"
'{"workflow": [{"action": "...", "action_args": {...}}, ...], '
'"reagent": {"name": {"slot": int, "well": [...], "labware": "..."}, ...}}'
'{"labware": [...], "workflow": [...], "reagent": {...}}'
)
# 提取数据
workflow = json_data["workflow"]
reagent = json_data["reagent"]
labware_defs = json_data.get("labware", []) # 新的 labware 定义列表
# 规范化步骤数据
protocol_steps = normalize_workflow_steps(workflow)
# reagent 已经是字典格式,直接使
# reagent 已经是字典格式,用于 set_liquid 和 well 数量查找
labware_info = reagent
# 构建工作流图
@@ -265,6 +269,7 @@ def convert_from_json(
protocol_steps=protocol_steps,
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
labware_defs=labware_defs,
)
# 校验句柄配置

View File

@@ -41,6 +41,7 @@ def upload_workflow(
workflow_name: Optional[str] = None,
tags: Optional[List[str]] = None,
published: bool = False,
description: str = "",
) -> Dict[str, Any]:
"""
上传工作流到服务器
@@ -56,6 +57,7 @@ def upload_workflow(
workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
description: 工作流描述,发布时使用
Returns:
Dict: API响应数据
@@ -75,6 +77,14 @@ def upload_workflow(
print_status(f"工作流文件JSON解析失败: {e}", "error")
return {"code": -1, "message": f"JSON解析失败: {e}"}
# 从 JSON 文件中提取 description 和 tags作为 fallback
if not description and "description" in workflow_data:
description = workflow_data["description"]
print_status(f"从文件中读取 description", "info")
if not tags and "tags" in workflow_data:
tags = workflow_data["tags"]
print_status(f"从文件中读取 tags: {tags}", "info")
# 自动检测并转换格式
if not _is_node_link_format(workflow_data):
try:
@@ -96,6 +106,7 @@ def upload_workflow(
print_status(f" - 节点数量: {len(nodes)}", "info")
print_status(f" - 边数量: {len(edges)}", "info")
print_status(f" - 标签: {tags or []}", "info")
print_status(f" - 描述: {description[:50]}{'...' if len(description) > 50 else ''}", "info")
print_status(f" - 发布状态: {published}", "info")
# 调用 http_client 上传
@@ -107,6 +118,7 @@ def upload_workflow(
edges=edges,
tags=tags,
published=published,
description=description,
)
if result.get("code") == 0:
@@ -131,8 +143,9 @@ def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None:
workflow_name = args_dict.get("workflow_name")
tags = args_dict.get("tags", [])
published = args_dict.get("published", False)
description = args_dict.get("description", "")
if workflow_file:
upload_workflow(workflow_file, workflow_name, tags, published)
upload_workflow(workflow_file, workflow_name, tags, published, description)
else:
print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error")