10 Commits

Author SHA1 Message Date
Xuwznln
13a6795657 Update organic syn station. 2025-12-15 02:34:36 +08:00
Xianwei Qi
53219d8b04 Update docs
update "laiyu" missing init file.

fix "laiyu" missing init file.

fix "🐛 fix"

🐛 fix: config file is overwrited by default args even if not be set.

mix

修改了mix,仿真流程报错问题
2025-12-14 13:13:21 +08:00
Xuwznln
b1cdef9185 update version to 0.10.12 2025-12-04 18:47:16 +08:00
Xuwznln
9854ed8c9c fix ros2 future
print all logs to file
fix resource dict dump error
2025-12-04 18:46:37 +08:00
Xuwznln
52544a2c69 signal when host node is ready 2025-12-02 12:00:26 +08:00
ZiWei
5ce433e235 Fix startup with remote resource error
Resource dict fully change to "pose" key

Update oss link

Reduce pylabrobot conversion warning & force enable log dump.

更新 logo 图片
2025-12-02 11:51:01 +08:00
Xuwznln
c7c14d2332 Auto dump logs, fix workstation input schema 2025-11-27 14:24:40 +08:00
Harry Liu
6fdd482649 Transfer_liquid (#176)
* change 9320 desk row number to 4

* Updated 9320 host address

* Updated 9320 host address

* Add **kwargs in classes: PRCXI9300Deck and PRCXI9300Container

* Removed all sample_id in prcxi_9320.json to avoid KeyError

* 9320 machine testing settings

* Typo

* Typo in base_device_node.py

* Enhance liquid handling functionality by adding support for multiple transfer modes (one-to-many, one-to-one, many-to-one) and improving parameter validation. Default channel usage is set when not specified. Adjusted mixing logic to ensure it only occurs when valid conditions are met. Updated documentation for clarity.
2025-11-27 13:49:04 +08:00
Xuwznln
d390236318 Add get_regular_container func 2025-11-27 13:47:12 +08:00
Xuwznln
ed8ee29732 Add get_regular_container func 2025-11-27 13:46:40 +08:00
36 changed files with 860 additions and 348 deletions

View File

@@ -1,6 +1,6 @@
package: package:
name: unilabos name: unilabos
version: 0.10.11 version: 0.10.12
source: source:
path: ../unilabos path: ../unilabos

View File

@@ -39,7 +39,9 @@ Uni-Lab-OS recommends using `mamba` for environment management. Choose the appro
```bash ```bash
# Create new environment # Create new environment
mamba create -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge mamba create -n unilab python=3.11.11
mamba activate unilab
mamba install -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge
``` ```
## Install Dev Uni-Lab-OS ## Install Dev Uni-Lab-OS

View File

@@ -41,7 +41,9 @@ Uni-Lab-OS 建议使用 `mamba` 管理环境。根据您的操作系统选择适
```bash ```bash
# 创建新环境 # 创建新环境
mamba create -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge mamba create -n unilab python=3.11.11
mamba activate unilab
mamba install -n unilab uni-lab::unilabos -c robostack-staging -c conda-forge
``` ```
2. 安装开发版 Uni-Lab-OS: 2. 安装开发版 Uni-Lab-OS:

Binary file not shown.

Before

Width:  |  Height:  |  Size: 326 KiB

After

Width:  |  Height:  |  Size: 262 KiB

View File

@@ -317,45 +317,6 @@ unilab --help
如果所有命令都正常输出,说明开发环境配置成功! 如果所有命令都正常输出,说明开发环境配置成功!
### 开发工具推荐
#### IDE
- **PyCharm Professional**: 强大的 Python IDE支持远程调试
- **VS Code**: 轻量级,配合 Python 扩展使用
- **Vim/Emacs**: 适合终端开发
#### 推荐的 VS Code 扩展
- Python
- Pylance
- ROS
- URDF
- YAML
#### 调试工具
```bash
# 安装调试工具
pip install ipdb pytest pytest-cov -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
# 代码质量检查
pip install black flake8 mypy -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
```
### 设置 pre-commit 钩子(可选)
```bash
# 安装 pre-commit
pip install pre-commit -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
# 设置钩子
pre-commit install
# 手动运行检查
pre-commit run --all-files
```
--- ---
## 验证安装 ## 验证安装

View File

@@ -1,6 +1,6 @@
package: package:
name: ros-humble-unilabos-msgs name: ros-humble-unilabos-msgs
version: 0.10.11 version: 0.10.12
source: source:
path: ../../unilabos_msgs path: ../../unilabos_msgs
target_directory: src target_directory: src

View File

@@ -1,6 +1,6 @@
package: package:
name: unilabos name: unilabos
version: "0.10.11" version: "0.10.12"
source: source:
path: ../.. path: ../..

View File

@@ -4,7 +4,7 @@ package_name = 'unilabos'
setup( setup(
name=package_name, name=package_name,
version='0.10.11', version='0.10.12',
packages=find_packages(), packages=find_packages(),
include_package_data=True, include_package_data=True,
install_requires=['setuptools'], install_requires=['setuptools'],

View File

@@ -1 +1 @@
__version__ = "0.10.11" __version__ = "0.10.12"

View File

@@ -141,7 +141,7 @@ class CommunicationClientFactory:
""" """
if cls._client_cache is None: if cls._client_cache is None:
cls._client_cache = cls.create_client(protocol) cls._client_cache = cls.create_client(protocol)
logger.info(f"[CommunicationFactory] Created {type(cls._client_cache).__name__} client") logger.trace(f"[CommunicationFactory] Created {type(cls._client_cache).__name__} client")
return cls._client_cache return cls._client_cache

View File

@@ -159,9 +159,10 @@ def parse_args():
def main(): def main():
"""主函数""" """主函数"""
# 解析命令行参数 # 解析命令行参数
args = parse_args() parser = parse_args()
convert_argv_dashes_to_underscores(args) convert_argv_dashes_to_underscores(parser)
args_dict = vars(args.parse_args()) args = parser.parse_args()
args_dict = vars(args)
# 环境检查 - 检查并自动安装必需的包 (可选) # 环境检查 - 检查并自动安装必需的包 (可选)
if not args_dict.get("skip_env_check", False): if not args_dict.get("skip_env_check", False):
@@ -218,19 +219,20 @@ def main():
if hasattr(BasicConfig, "log_level"): if hasattr(BasicConfig, "log_level"):
logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.") logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.")
configure_logger(loglevel=BasicConfig.log_level) configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
if args_dict["addr"] == "test": if args.addr != parser.get_default("addr"):
print_status("使用测试环境地址", "info") if args.addr == "test":
HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1" print_status("使用测试环境地址", "info")
elif args_dict["addr"] == "uat": HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1"
print_status("使用uat环境地址", "info") elif args.addr == "uat":
HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1" print_status("使用uat环境地址", "info")
elif args_dict["addr"] == "local": HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1"
print_status("使用本地环境地址", "info") elif args.addr == "local":
HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1" print_status("使用本地环境地址", "info")
else: HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1"
HTTPConfig.remote_addr = args_dict.get("addr", "") else:
HTTPConfig.remote_addr = args.addr
# 设置BasicConfig参数 # 设置BasicConfig参数
if args_dict.get("ak", ""): if args_dict.get("ak", ""):
@@ -327,6 +329,10 @@ def main():
for ind, i in enumerate(resource_edge_info[::-1]): for ind, i in enumerate(resource_edge_info[::-1]):
source_node: ResourceDict = nodes[i["source"]] source_node: ResourceDict = nodes[i["source"]]
target_node: ResourceDict = nodes[i["target"]] target_node: ResourceDict = nodes[i["target"]]
if "sourceHandle" not in source_node:
continue
if "targetHandle" not in target_node:
continue
source_handle = i["sourceHandle"] source_handle = i["sourceHandle"]
target_handle = i["targetHandle"] target_handle = i["targetHandle"]
source_handler_keys = [ source_handler_keys = [

View File

@@ -34,14 +34,14 @@ def _get_oss_token(
client = http_client client = http_client
# 构造scene参数: driver_name-exp_type # 构造scene参数: driver_name-exp_type
scene = f"{driver_name}-{exp_type}" sub_path = f"{driver_name}-{exp_type}"
# 构造请求URL使用client的remote_addr已包含/api/v1/ # 构造请求URL使用client的remote_addr已包含/api/v1/
url = f"{client.remote_addr}/applications/token" url = f"{client.remote_addr}/applications/token"
params = {"scene": scene, "filename": filename} params = {"sub_path": sub_path, "filename": filename, "scene": "job"}
try: try:
logger.info(f"[OSS] 请求预签名URL: scene={scene}, filename={filename}") logger.info(f"[OSS] 请求预签名URL: sub_path={sub_path}, filename={filename}")
response = requests.get(url, params=params, headers={"Authorization": f"Lab {client.auth}"}, timeout=10) response = requests.get(url, params=params, headers={"Authorization": f"Lab {client.auth}"}, timeout=10)
if response.status_code == 200: if response.status_code == 200:

View File

@@ -389,7 +389,7 @@ class MessageProcessor:
self.is_running = True self.is_running = True
self.thread = threading.Thread(target=self._run, daemon=True, name="MessageProcessor") self.thread = threading.Thread(target=self._run, daemon=True, name="MessageProcessor")
self.thread.start() self.thread.start()
logger.info("[MessageProcessor] Started") logger.trace("[MessageProcessor] Started")
def stop(self) -> None: def stop(self) -> None:
"""停止消息处理线程""" """停止消息处理线程"""
@@ -939,7 +939,7 @@ class QueueProcessor:
# 事件通知机制 # 事件通知机制
self.queue_update_event = threading.Event() self.queue_update_event = threading.Event()
logger.info("[QueueProcessor] Initialized") logger.trace("[QueueProcessor] Initialized")
def set_websocket_client(self, websocket_client: "WebSocketClient"): def set_websocket_client(self, websocket_client: "WebSocketClient"):
"""设置WebSocket客户端引用""" """设置WebSocket客户端引用"""
@@ -954,7 +954,7 @@ class QueueProcessor:
self.is_running = True self.is_running = True
self.thread = threading.Thread(target=self._run, daemon=True, name="QueueProcessor") self.thread = threading.Thread(target=self._run, daemon=True, name="QueueProcessor")
self.thread.start() self.thread.start()
logger.info("[QueueProcessor] Started") logger.trace("[QueueProcessor] Started")
def stop(self) -> None: def stop(self) -> None:
"""停止队列处理线程""" """停止队列处理线程"""
@@ -1314,3 +1314,19 @@ class WebSocketClient(BaseCommunicationClient):
logger.info(f"[WebSocketClient] Job {job_log} cancelled successfully") logger.info(f"[WebSocketClient] Job {job_log} cancelled successfully")
else: else:
logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}") logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}")
def publish_host_ready(self) -> None:
"""发布host_node ready信号"""
if self.is_disabled or not self.is_connected():
logger.debug("[WebSocketClient] Not connected, cannot publish host ready signal")
return
message = {
"action": "host_node_ready",
"data": {
"status": "ready",
"timestamp": time.time(),
},
}
self.message_processor.send_message(message)
logger.info("[WebSocketClient] Host node ready signal published")

View File

@@ -41,7 +41,7 @@ class WSConfig:
# HTTP配置 # HTTP配置
class HTTPConfig: class HTTPConfig:
remote_addr = "http://127.0.0.1:48197/api/v1" remote_addr = "https://uni-lab.bohrium.com/api/v1"
# ROS配置 # ROS配置

View File

@@ -147,6 +147,9 @@ class LiquidHandlerMiddleware(LiquidHandler):
offsets: Optional[List[Coordinate]] = None, offsets: Optional[List[Coordinate]] = None,
**backend_kwargs, **backend_kwargs,
): ):
# 如果 use_channels 为 None使用默认值所有通道
if use_channels is None:
use_channels = list(range(self.channel_num))
if not offsets or (isinstance(offsets, list) and len(offsets) != len(use_channels)): if not offsets or (isinstance(offsets, list) and len(offsets) != len(use_channels)):
offsets = [Coordinate.zero()] * len(use_channels) offsets = [Coordinate.zero()] * len(use_channels)
if self._simulator: if self._simulator:
@@ -759,7 +762,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
blow_out_air_volume=current_dis_blow_out_air_volume, blow_out_air_volume=current_dis_blow_out_air_volume,
spread=spread, spread=spread,
) )
if delays is not None: if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1]) await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets) await self.touch_tip(current_targets)
await self.discard_tips() await self.discard_tips()
@@ -833,17 +836,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
spread=spread, spread=spread,
) )
if delays is not None: if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1]) await self.custom_delay(seconds=delays[1])
await self.mix( # 只有在 mix_time 有效时才调用 mix
targets=[targets[_]], if mix_time is not None and mix_time > 0:
mix_time=mix_time, await self.mix(
mix_vol=mix_vol, targets=[targets[_]],
offsets=offsets if offsets else None, mix_time=mix_time,
height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_vol=mix_vol,
mix_rate=mix_rate if mix_rate else None, offsets=offsets if offsets else None,
) height_to_bottom=mix_liquid_height if mix_liquid_height else None,
if delays is not None: mix_rate=mix_rate if mix_rate else None,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1]) await self.custom_delay(seconds=delays[1])
await self.touch_tip(targets[_]) await self.touch_tip(targets[_])
await self.discard_tips() await self.discard_tips()
@@ -893,18 +898,20 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
blow_out_air_volume=current_dis_blow_out_air_volume, blow_out_air_volume=current_dis_blow_out_air_volume,
spread=spread, spread=spread,
) )
if delays is not None: if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1]) await self.custom_delay(seconds=delays[1])
await self.mix( # 只有在 mix_time 有效时才调用 mix
targets=current_targets, if mix_time is not None and mix_time > 0:
mix_time=mix_time, await self.mix(
mix_vol=mix_vol, targets=current_targets,
offsets=offsets if offsets else None, mix_time=mix_time,
height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_vol=mix_vol,
mix_rate=mix_rate if mix_rate else None, offsets=offsets if offsets else None,
) height_to_bottom=mix_liquid_height if mix_liquid_height else None,
if delays is not None: mix_rate=mix_rate if mix_rate else None,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1]) await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets) await self.touch_tip(current_targets)
await self.discard_tips() await self.discard_tips()
@@ -942,60 +949,158 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
delays: Optional[List[int]] = None, delays: Optional[List[int]] = None,
none_keys: List[str] = [], none_keys: List[str] = [],
): ):
"""Transfer liquid from each *source* well/plate to the corresponding *target*. """Transfer liquid with automatic mode detection.
Supports three transfer modes:
1. One-to-many (1 source -> N targets): Distribute from one source to multiple targets
2. One-to-one (N sources -> N targets): Standard transfer, each source to corresponding target
3. Many-to-one (N sources -> 1 target): Combine multiple sources into one target
Parameters Parameters
---------- ----------
asp_vols, dis_vols asp_vols, dis_vols
Single volume (µL) or list matching the number of transfers. Single volume (µL) or list. Automatically expanded based on transfer mode.
sources, targets sources, targets
Samelength sequences of containers (wells or plates). In 96well mode Containers (wells or plates). Length determines transfer mode:
each must contain exactly one plate. - len(sources) == 1, len(targets) > 1: One-to-many mode
- len(sources) == len(targets): One-to-one mode
- len(sources) > 1, len(targets) == 1: Many-to-one mode
tip_racks tip_racks
One or more TipRacks providing fresh tips. One or more TipRacks providing fresh tips.
is_96_well is_96_well
Set *True* to use the 96channel head. Set *True* to use the 96channel head.
""" """
# 确保 use_channels 有默认值
if use_channels is None:
use_channels = [0] if self.channel_num >= 1 else list(range(self.channel_num))
if is_96_well: if is_96_well:
pass # This mode is not verified. pass # This mode is not verified.
else: else:
if len(asp_vols) != len(targets): # 转换体积参数为列表
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") if isinstance(asp_vols, (int, float)):
asp_vols = [float(asp_vols)]
else:
asp_vols = [float(v) for v in asp_vols]
if isinstance(dis_vols, (int, float)):
dis_vols = [float(dis_vols)]
else:
dis_vols = [float(v) for v in dis_vols]
# 首先应该对任务分组然后每次1个/8个进行操作处理 # 统一混合次数为标量,防止数组/列表与 int 比较时报错
if len(use_channels) == 1: if mix_times is not None and not isinstance(mix_times, (int, float)):
for _ in range(len(targets)): try:
tip = [] mix_times = mix_times[0] if len(mix_times) > 0 else None
for ___ in range(len(use_channels)): except Exception:
tip.extend(next(self.current_tip)) try:
await self.pick_up_tips(tip) mix_times = next(iter(mix_times))
except Exception:
pass
if mix_times is not None:
mix_times = int(mix_times)
# 识别传输模式
num_sources = len(sources)
num_targets = len(targets)
if num_sources == 1 and num_targets > 1:
# 模式1: 一对多 (1 source -> N targets)
await self._transfer_one_to_many(
sources[0], targets, tip_racks, use_channels,
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
offsets, touch_tip, liquid_height, blow_out_air_volume,
spread, mix_stage, mix_times, mix_vol, mix_rate,
mix_liquid_height, delays
)
elif num_sources > 1 and num_targets == 1:
# 模式2: 多对一 (N sources -> 1 target)
await self._transfer_many_to_one(
sources, targets[0], tip_racks, use_channels,
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
offsets, touch_tip, liquid_height, blow_out_air_volume,
spread, mix_stage, mix_times, mix_vol, mix_rate,
mix_liquid_height, delays
)
elif num_sources == num_targets:
# 模式3: 一对一 (N sources -> N targets) - 原有逻辑
await self._transfer_one_to_one(
sources, targets, tip_racks, use_channels,
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
offsets, touch_tip, liquid_height, blow_out_air_volume,
spread, mix_stage, mix_times, mix_vol, mix_rate,
mix_liquid_height, delays
)
else:
raise ValueError(
f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
"Supported modes: 1->N, N->1, or N->N."
)
await self.aspirate( async def _transfer_one_to_one(
resources=[sources[_]], self,
vols=[asp_vols[_]], sources: Sequence[Container],
use_channels=use_channels, targets: Sequence[Container],
flow_rates=[asp_flow_rates[0]] if asp_flow_rates else None, tip_racks: Sequence[TipRack],
offsets=[offsets[0]] if offsets else None, use_channels: List[int],
liquid_height=[liquid_height[0]] if liquid_height else None, asp_vols: List[float],
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, dis_vols: List[float],
spread=spread, asp_flow_rates: Optional[List[Optional[float]]],
) dis_flow_rates: Optional[List[Optional[float]]],
if delays is not None: offsets: Optional[List[Coordinate]],
await self.custom_delay(seconds=delays[0]) touch_tip: bool,
await self.dispense( liquid_height: Optional[List[Optional[float]]],
resources=[targets[_]], blow_out_air_volume: Optional[List[Optional[float]]],
vols=[dis_vols[_]], spread: Literal["wide", "tight", "custom"],
use_channels=use_channels, mix_stage: Optional[Literal["none", "before", "after", "both"]],
flow_rates=[dis_flow_rates[1]] if dis_flow_rates else None, mix_times: Optional[int],
offsets=[offsets[1]] if offsets else None, mix_vol: Optional[int],
blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None, mix_rate: Optional[int],
liquid_height=[liquid_height[1]] if liquid_height else None, mix_liquid_height: Optional[float],
spread=spread, delays: Optional[List[int]],
) ):
if delays is not None: """一对一传输模式N sources -> N targets"""
await self.custom_delay(seconds=delays[1]) # 验证参数长度
if len(asp_vols) != len(targets):
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
if len(dis_vols) != len(targets):
raise ValueError(f"Length of `dis_vols` {len(dis_vols)} must match `targets` {len(targets)}.")
if len(sources) != len(targets):
raise ValueError(f"Length of `sources` {len(sources)} must match `targets` {len(targets)}.")
if len(use_channels) == 1:
for _ in range(len(targets)):
tip = []
for ___ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.aspirate(
resources=[sources[_]],
vols=[asp_vols[_]],
use_channels=use_channels,
flow_rates=[asp_flow_rates[_]] if asp_flow_rates and len(asp_flow_rates) > _ else None,
offsets=[offsets[_]] if offsets and len(offsets) > _ else None,
liquid_height=[liquid_height[_]] if liquid_height and len(liquid_height) > _ else None,
blow_out_air_volume=[blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=[targets[_]],
vols=[dis_vols[_]],
use_channels=use_channels,
flow_rates=[dis_flow_rates[_]] if dis_flow_rates and len(dis_flow_rates) > _ else None,
offsets=[offsets[_]] if offsets and len(offsets) > _ else None,
blow_out_air_volume=[blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None,
liquid_height=[liquid_height[_]] if liquid_height and len(liquid_height) > _ else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix( await self.mix(
targets=[targets[_]], targets=[targets[_]],
mix_time=mix_times, mix_time=mix_times,
@@ -1004,63 +1109,60 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom=mix_liquid_height if mix_liquid_height else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None, mix_rate=mix_rate if mix_rate else None,
) )
if delays is not None: if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1]) await self.custom_delay(seconds=delays[1])
await self.touch_tip(targets[_]) await self.touch_tip(targets[_])
await self.discard_tips() await self.discard_tips(use_channels=use_channels)
elif len(use_channels) == 8: elif len(use_channels) == 8:
# 对于8个的情况需要判断此时任务是不是能被8通道移液站来成功处理 if len(targets) % 8 != 0:
if len(targets) % 8 != 0: raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
# 8个8个来取任务序列 for i in range(0, len(targets), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
current_targets = targets[i:i + 8]
current_reagent_sources = sources[i:i + 8]
current_asp_vols = asp_vols[i:i + 8]
current_dis_vols = dis_vols[i:i + 8]
current_asp_flow_rates = asp_flow_rates[i:i + 8] if asp_flow_rates else None
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None
for i in range(0, len(targets), 8): await self.aspirate(
# 取出8个任务 resources=current_reagent_sources,
tip = [] vols=current_asp_vols,
for _ in range(len(use_channels)): use_channels=use_channels,
tip.extend(next(self.current_tip)) flow_rates=current_asp_flow_rates,
await self.pick_up_tips(tip) offsets=current_asp_offset,
current_targets = targets[i:i + 8] blow_out_air_volume=current_asp_blow_out_air_volume,
current_reagent_sources = sources[i:i + 8] liquid_height=current_asp_liquid_height,
current_asp_vols = asp_vols[i:i + 8] spread=spread,
current_dis_vols = dis_vols[i:i + 8] )
current_asp_flow_rates = asp_flow_rates[i:i + 8]
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else [None] * 8
await self.aspirate( if delays is not None:
resources=current_reagent_sources, await self.custom_delay(seconds=delays[0])
vols=current_asp_vols, await self.dispense(
use_channels=use_channels, resources=current_targets,
flow_rates=current_asp_flow_rates, vols=current_dis_vols,
offsets=current_asp_offset, use_channels=use_channels,
blow_out_air_volume=current_asp_blow_out_air_volume, flow_rates=current_dis_flow_rates,
liquid_height=current_asp_liquid_height, offsets=current_dis_offset,
spread=spread, blow_out_air_volume=current_dis_blow_out_air_volume,
) liquid_height=current_dis_liquid_height,
spread=spread,
if delays is not None: )
await self.custom_delay(seconds=delays[0]) if delays is not None and len(delays) > 1:
await self.dispense( await self.custom_delay(seconds=delays[1])
resources=current_targets,
vols=current_dis_vols,
use_channels=use_channels,
flow_rates=current_dis_flow_rates,
offsets=current_dis_offset,
blow_out_air_volume=current_dis_blow_out_air_volume,
liquid_height=current_dis_liquid_height,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix( await self.mix(
targets=current_targets, targets=current_targets,
mix_time=mix_times, mix_time=mix_times,
@@ -1069,10 +1171,363 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom=mix_liquid_height if mix_liquid_height else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None, mix_rate=mix_rate if mix_rate else None,
) )
if delays is not None: if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1]) await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets)
await self.discard_tips([0,1,2,3,4,5,6,7])
async def _transfer_one_to_many(
self,
source: Container,
targets: Sequence[Container],
tip_racks: Sequence[TipRack],
use_channels: List[int],
asp_vols: List[float],
dis_vols: List[float],
asp_flow_rates: Optional[List[Optional[float]]],
dis_flow_rates: Optional[List[Optional[float]]],
offsets: Optional[List[Coordinate]],
touch_tip: bool,
liquid_height: Optional[List[Optional[float]]],
blow_out_air_volume: Optional[List[Optional[float]]],
spread: Literal["wide", "tight", "custom"],
mix_stage: Optional[Literal["none", "before", "after", "both"]],
mix_times: Optional[int],
mix_vol: Optional[int],
mix_rate: Optional[int],
mix_liquid_height: Optional[float],
delays: Optional[List[int]],
):
"""一对多传输模式1 source -> N targets"""
# 验证和扩展体积参数
if len(asp_vols) == 1:
# 如果只提供一个吸液体积,计算总吸液体积(所有分液体积之和)
total_asp_vol = sum(dis_vols)
asp_vol = asp_vols[0] if asp_vols[0] >= total_asp_vol else total_asp_vol
else:
raise ValueError("For one-to-many mode, `asp_vols` should be a single value or list with one element.")
if len(dis_vols) != len(targets):
raise ValueError(f"Length of `dis_vols` {len(dis_vols)} must match `targets` {len(targets)}.")
if len(use_channels) == 1:
# 单通道模式:一次吸液,多次分液
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
# 从源容器吸液(总体积)
await self.aspirate(
resources=[source],
vols=[asp_vol],
use_channels=use_channels,
flow_rates=[asp_flow_rates[0]] if asp_flow_rates and len(asp_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分多次分液到不同的目标容器
for idx, target in enumerate(targets):
await self.dispense(
resources=[target],
vols=[dis_vols[idx]],
use_channels=use_channels,
flow_rates=[dis_flow_rates[idx]] if dis_flow_rates and len(dis_flow_rates) > idx else None,
offsets=[offsets[idx]] if offsets and len(offsets) > idx else None,
blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None,
liquid_height=[liquid_height[idx]] if liquid_height and len(liquid_height) > idx else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[target],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets[idx:idx+1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if touch_tip:
await self.touch_tip([target])
await self.discard_tips(use_channels=use_channels)
elif len(use_channels) == 8:
# 8通道模式需要确保目标数量是8的倍数
if len(targets) % 8 != 0:
raise ValueError(f"For 8-channel mode, number of targets {len(targets)} must be a multiple of 8.")
# 每次处理8个目标
for i in range(0, len(targets), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
current_targets = targets[i:i + 8]
current_dis_vols = dis_vols[i:i + 8]
# 8个通道都从同一个源容器吸液每个通道的吸液体积等于对应的分液体积
current_asp_flow_rates = asp_flow_rates[0:1] * 8 if asp_flow_rates and len(asp_flow_rates) > 0 else None
current_asp_offset = offsets[0:1] * 8 if offsets and len(offsets) > 0 else [None] * 8
current_asp_liquid_height = liquid_height[0:1] * 8 if liquid_height and len(liquid_height) > 0 else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume and len(blow_out_air_volume) > 0 else [None] * 8
# 从源容器吸液8个通道都从同一个源但每个通道的吸液体积不同
await self.aspirate(
resources=[source] * 8, # 8个通道都从同一个源
vols=current_dis_vols, # 每个通道的吸液体积等于对应的分液体积
use_channels=use_channels,
flow_rates=current_asp_flow_rates,
offsets=current_asp_offset,
liquid_height=current_asp_liquid_height,
blow_out_air_volume=current_asp_blow_out_air_volume,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分液到8个目标
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None
current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
await self.dispense(
resources=current_targets,
vols=current_dis_vols,
use_channels=use_channels,
flow_rates=current_dis_flow_rates,
offsets=current_dis_offset,
blow_out_air_volume=current_dis_blow_out_air_volume,
liquid_height=current_dis_liquid_height,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=current_targets,
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if touch_tip:
await self.touch_tip(current_targets) await self.touch_tip(current_targets)
await self.discard_tips([0,1,2,3,4,5,6,7])
await self.discard_tips([0,1,2,3,4,5,6,7])
async def _transfer_many_to_one(
self,
sources: Sequence[Container],
target: Container,
tip_racks: Sequence[TipRack],
use_channels: List[int],
asp_vols: List[float],
dis_vols: List[float],
asp_flow_rates: Optional[List[Optional[float]]],
dis_flow_rates: Optional[List[Optional[float]]],
offsets: Optional[List[Coordinate]],
touch_tip: bool,
liquid_height: Optional[List[Optional[float]]],
blow_out_air_volume: Optional[List[Optional[float]]],
spread: Literal["wide", "tight", "custom"],
mix_stage: Optional[Literal["none", "before", "after", "both"]],
mix_times: Optional[int],
mix_vol: Optional[int],
mix_rate: Optional[int],
mix_liquid_height: Optional[float],
delays: Optional[List[int]],
):
"""多对一传输模式N sources -> 1 target汇总/混合)"""
# 验证和扩展体积参数
if len(asp_vols) != len(sources):
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `sources` {len(sources)}.")
# 支持两种模式:
# 1. dis_vols 为单个值:所有源汇总,使用总吸液体积或指定分液体积
# 2. dis_vols 长度等于 asp_vols每个源按不同比例分液按比例混合
if len(dis_vols) == 1:
# 模式1使用单个分液体积
total_dis_vol = sum(asp_vols)
dis_vol = dis_vols[0] if dis_vols[0] >= total_dis_vol else total_dis_vol
use_proportional_mixing = False
elif len(dis_vols) == len(asp_vols):
# 模式2按不同比例混合
use_proportional_mixing = True
else:
raise ValueError(
f"For many-to-one mode, `dis_vols` should be a single value or list with length {len(asp_vols)} "
f"(matching `asp_vols`). Got length {len(dis_vols)}."
)
if len(use_channels) == 1:
# 单通道模式:多次吸液,一次分液
# 先混合前(如果需要)
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
# 注意:在吸液前混合源容器通常不常见,这里跳过
pass
# 从每个源容器吸液并分液到目标容器
for idx, source in enumerate(sources):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.aspirate(
resources=[source],
vols=[asp_vols[idx]],
use_channels=use_channels,
flow_rates=[asp_flow_rates[idx]] if asp_flow_rates and len(asp_flow_rates) > idx else None,
offsets=[offsets[idx]] if offsets and len(offsets) > idx else None,
liquid_height=[liquid_height[idx]] if liquid_height and len(liquid_height) > idx else None,
blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分液到目标容器
if use_proportional_mixing:
# 按不同比例混合:使用对应的 dis_vols
dis_vol = dis_vols[idx]
dis_flow_rate = dis_flow_rates[idx] if dis_flow_rates and len(dis_flow_rates) > idx else None
dis_offset = offsets[idx] if offsets and len(offsets) > idx else None
dis_liquid_height = liquid_height[idx] if liquid_height and len(liquid_height) > idx else None
dis_blow_out = blow_out_air_volume[idx] if blow_out_air_volume and len(blow_out_air_volume) > idx else None
else:
# 标准模式:分液体积等于吸液体积
dis_vol = asp_vols[idx]
dis_flow_rate = dis_flow_rates[0] if dis_flow_rates and len(dis_flow_rates) > 0 else None
dis_offset = offsets[0] if offsets and len(offsets) > 0 else None
dis_liquid_height = liquid_height[0] if liquid_height and len(liquid_height) > 0 else None
dis_blow_out = blow_out_air_volume[0] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
await self.dispense(
resources=[target],
vols=[dis_vol],
use_channels=use_channels,
flow_rates=[dis_flow_rate] if dis_flow_rate is not None else None,
offsets=[dis_offset] if dis_offset is not None else None,
blow_out_air_volume=[dis_blow_out] if dis_blow_out is not None else None,
liquid_height=[dis_liquid_height] if dis_liquid_height is not None else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.discard_tips(use_channels=use_channels)
# 最后在目标容器中混合(如果需要)
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[target],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets[0:1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if touch_tip:
await self.touch_tip([target])
elif len(use_channels) == 8:
# 8通道模式需要确保源数量是8的倍数
if len(sources) % 8 != 0:
raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.")
# 每次处理8个源
for i in range(0, len(sources), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
current_sources = sources[i:i + 8]
current_asp_vols = asp_vols[i:i + 8]
current_asp_flow_rates = asp_flow_rates[i:i + 8] if asp_flow_rates else None
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
# 从8个源容器吸液
await self.aspirate(
resources=current_sources,
vols=current_asp_vols,
use_channels=use_channels,
flow_rates=current_asp_flow_rates,
offsets=current_asp_offset,
blow_out_air_volume=current_asp_blow_out_air_volume,
liquid_height=current_asp_liquid_height,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分液到目标容器(每个通道分液到同一个目标)
if use_proportional_mixing:
# 按比例混合:使用对应的 dis_vols
current_dis_vols = dis_vols[i:i + 8]
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None
current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
else:
# 标准模式:每个通道分液体积等于其吸液体积
current_dis_vols = current_asp_vols
current_dis_flow_rates = dis_flow_rates[0:1] * 8 if dis_flow_rates else None
current_dis_offset = offsets[0:1] * 8 if offsets else [None] * 8
current_dis_liquid_height = liquid_height[0:1] * 8 if liquid_height else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume else [None] * 8
await self.dispense(
resources=[target] * 8, # 8个通道都分到同一个目标
vols=current_dis_vols,
use_channels=use_channels,
flow_rates=current_dis_flow_rates,
offsets=current_dis_offset,
blow_out_air_volume=current_dis_blow_out_air_volume,
liquid_height=current_dis_liquid_height,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.discard_tips([0,1,2,3,4,5,6,7])
# 最后在目标容器中混合(如果需要)
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[target],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets[0:1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if touch_tip:
await self.touch_tip([target])
# except Exception as e: # except Exception as e:
# traceback.print_exc() # traceback.print_exc()

View File

@@ -5,6 +5,7 @@ import json
import os import os
import socket import socket
import time import time
import uuid
from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal
from pylabrobot.liquid_handling import ( from pylabrobot.liquid_handling import (
@@ -856,7 +857,30 @@ class PRCXI9300Api:
def _raw_request(self, payload: str) -> str: def _raw_request(self, payload: str) -> str:
if self.debug: if self.debug:
return " " # 调试/仿真模式下直接返回可解析的模拟 JSON避免后续 json.loads 报错
try:
req = json.loads(payload)
method = req.get("MethodName")
except Exception:
method = None
data: Any = True
if method in {"AddSolution"}:
data = str(uuid.uuid4())
elif method in {"AddWorkTabletMatrix", "AddWorkTabletMatrix2"}:
data = {"Success": True, "Message": "debug mock"}
elif method in {"GetErrorCode"}:
data = ""
elif method in {"RemoveErrorCodet", "Reset", "Start", "LoadSolution", "Pause", "Resume", "Stop"}:
data = True
elif method in {"GetStepStateList", "GetStepStatus", "GetStepState"}:
data = []
elif method in {"GetLocation"}:
data = {"X": 0, "Y": 0, "Z": 0}
elif method in {"GetResetStatus"}:
data = False
return json.dumps({"Success": True, "Msg": "debug mock", "Data": data})
with contextlib.closing(socket.socket()) as sock: with contextlib.closing(socket.socket()) as sock:
sock.settimeout(self.timeout) sock.settimeout(self.timeout)
sock.connect((self.host, self.port)) sock.connect((self.host, self.port))

View File

@@ -7,7 +7,7 @@ class VirtualMultiwayValve:
""" """
虚拟九通阀门 - 0号位连接transfer pump1-8号位连接其他设备 🔄 虚拟九通阀门 - 0号位连接transfer pump1-8号位连接其他设备 🔄
""" """
def __init__(self, port: str = "VIRTUAL", positions: int = 8): def __init__(self, port: str = "VIRTUAL", positions: int = 8, **kwargs):
self.port = port self.port = port
self.max_positions = positions # 1-8号位 self.max_positions = positions # 1-8号位
self.total_positions = positions + 1 # 0-8号位共9个位置 self.total_positions = positions + 1 # 0-8号位共9个位置

View File

@@ -147,7 +147,7 @@ class WorkstationBase(ABC):
def __init__( def __init__(
self, self,
deck: Deck, deck: Optional[Deck],
*args, *args,
**kwargs, # 必须有kwargs **kwargs, # 必须有kwargs
): ):
@@ -349,5 +349,5 @@ class WorkstationBase(ABC):
class ProtocolNode(WorkstationBase): class ProtocolNode(WorkstationBase):
def __init__(self, deck: Optional[PLRResource], *args, **kwargs): def __init__(self, protocol_type: List[str], deck: Optional[PLRResource], *args, **kwargs):
super().__init__(deck, *args, **kwargs) super().__init__(deck, *args, **kwargs)

View File

@@ -174,35 +174,6 @@ bioyond_dispensing_station:
title: query_resource_by_name参数 title: query_resource_by_name参数
type: object type: object
type: UniLabJsonCommand type: UniLabJsonCommand
auto-transfer_materials_to_reaction_station:
feedback: {}
goal: {}
goal_default:
target_device_id: null
transfer_groups: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
target_device_id:
type: string
transfer_groups:
type: array
required:
- target_device_id
- transfer_groups
type: object
result: {}
required:
- goal
title: transfer_materials_to_reaction_station参数
type: object
type: UniLabJsonCommand
auto-workflow_sample_locations: auto-workflow_sample_locations:
feedback: {} feedback: {}
goal: {} goal: {}

View File

@@ -6036,7 +6036,12 @@ workstation:
properties: properties:
deck: deck:
type: string type: string
protocol_type:
items:
type: string
type: array
required: required:
- protocol_type
- deck - deck
type: object type: object
data: data:

View File

@@ -2,7 +2,7 @@ container:
category: category:
- container - container
class: class:
module: unilabos.resources.container:RegularContainer module: unilabos.resources.container:get_regular_container
type: pylabrobot type: pylabrobot
description: regular organic container description: regular organic container
handles: handles:

View File

@@ -22,6 +22,13 @@ class RegularContainer(Container):
def load_state(self, state: Dict[str, Any]): def load_state(self, state: Dict[str, Any]):
self.state = state self.state = state
def get_regular_container(name="container"):
r = RegularContainer(name=name)
r.category = "container"
return RegularContainer(name=name)
# #
# class RegularContainer(object): # class RegularContainer(object):
# # 第一个参数必须是id传入 # # 第一个参数必须是id传入

View File

@@ -45,10 +45,13 @@ def canonicalize_nodes_data(
print_status(f"{len(nodes)} Resources loaded:", "info") print_status(f"{len(nodes)} Resources loaded:", "info")
# 第一步基本预处理处理graphml的label字段 # 第一步基本预处理处理graphml的label字段
for node in nodes: outer_host_node_id = None
for idx, node in enumerate(nodes):
if node.get("label") is not None: if node.get("label") is not None:
node_id = node.pop("label") node_id = node.pop("label")
node["id"] = node["name"] = node_id node["id"] = node["name"] = node_id
if node["id"] == "host_node":
outer_host_node_id = idx
if not isinstance(node.get("config"), dict): if not isinstance(node.get("config"), dict):
node["config"] = {} node["config"] = {}
if not node.get("type"): if not node.get("type"):
@@ -58,25 +61,26 @@ def canonicalize_nodes_data(
node["name"] = node.get("id") node["name"] = node.get("id")
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning") print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning")
if not isinstance(node.get("position"), dict): if not isinstance(node.get("position"), dict):
node["position"] = {"position": {}} node["pose"] = {"position": {}}
x = node.pop("x", None) x = node.pop("x", None)
if x is not None: if x is not None:
node["position"]["position"]["x"] = x node["pose"]["position"]["x"] = x
y = node.pop("y", None) y = node.pop("y", None)
if y is not None: if y is not None:
node["position"]["position"]["y"] = y node["pose"]["position"]["y"] = y
z = node.pop("z", None) z = node.pop("z", None)
if z is not None: if z is not None:
node["position"]["position"]["z"] = z node["pose"]["position"]["z"] = z
if "sample_id" in node: if "sample_id" in node:
sample_id = node.pop("sample_id") sample_id = node.pop("sample_id")
if sample_id: if sample_id:
logger.error(f"{node}的sample_id参数已弃用sample_id: {sample_id}") logger.error(f"{node}的sample_id参数已弃用sample_id: {sample_id}")
for k in list(node.keys()): for k in list(node.keys()):
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children"]: if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children", "pose"]:
v = node.pop(k) v = node.pop(k)
node["config"][k] = v node["config"][k] = v
if outer_host_node_id is not None:
nodes.pop(outer_host_node_id)
# 第二步处理parent_relation # 第二步处理parent_relation
id2idx = {node["id"]: idx for idx, node in enumerate(nodes)} id2idx = {node["id"]: idx for idx, node in enumerate(nodes)}
for parent, children in parent_relation.items(): for parent, children in parent_relation.items():
@@ -93,7 +97,7 @@ def canonicalize_nodes_data(
for node in nodes: for node in nodes:
try: try:
print_status(f"DeviceId: {node['id']}, Class: {node['class']}", "info") # print_status(f"DeviceId: {node['id']}, Class: {node['class']}", "info")
# 使用标准化方法 # 使用标准化方法
resource_instance = ResourceDictInstance.get_resource_instance_from_dict(node) resource_instance = ResourceDictInstance.get_resource_instance_from_dict(node)
known_nodes[node["id"]] = resource_instance known_nodes[node["id"]] = resource_instance
@@ -280,10 +284,18 @@ def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]]
edge["sourceHandle"] = port[source] edge["sourceHandle"] = port[source]
elif "source_port" in edge: elif "source_port" in edge:
edge["sourceHandle"] = edge.pop("source_port") edge["sourceHandle"] = edge.pop("source_port")
else:
typ = edge.get("type")
if typ == "communication":
continue
if target in port: if target in port:
edge["targetHandle"] = port[target] edge["targetHandle"] = port[target]
elif "target_port" in edge: elif "target_port" in edge:
edge["targetHandle"] = edge.pop("target_port") edge["targetHandle"] = edge.pop("target_port")
else:
typ = edge.get("type")
if typ == "communication":
continue
edge["id"] = f"reactflow__edge-{source}-{edge['sourceHandle']}-{target}-{edge['targetHandle']}" edge["id"] = f"reactflow__edge-{source}-{edge['sourceHandle']}-{target}-{edge['targetHandle']}"
for key in ["source_port", "target_port"]: for key in ["source_port", "target_port"]:
if key in edge: if key in edge:
@@ -582,11 +594,15 @@ def resource_plr_to_ulab(resource_plr: "ResourcePLR", parent_name: str = None, w
"tip_rack": "tip_rack", "tip_rack": "tip_rack",
"warehouse": "warehouse", "warehouse": "warehouse",
"container": "container", "container": "container",
"tube": "tube",
"bottle_carrier": "bottle_carrier",
"plate_adapter": "plate_adapter",
} }
if source in replace_info: if source in replace_info:
return replace_info[source] return replace_info[source]
else: else:
logger.warning(f"转换pylabrobot的时候出现未知类型: {source}") if source is not None:
logger.warning(f"转换pylabrobot的时候出现未知类型: {source}")
return source return source
def resource_plr_to_ulab_inner(d: dict, all_states: dict, child=True) -> dict: def resource_plr_to_ulab_inner(d: dict, all_states: dict, child=True) -> dict:

View File

@@ -5,6 +5,7 @@ from unilabos.ros.msgs.message_converter import (
get_action_type, get_action_type,
) )
from unilabos.ros.nodes.base_device_node import init_wrapper, ROS2DeviceNode from unilabos.ros.nodes.base_device_node import init_wrapper, ROS2DeviceNode
from unilabos.ros.nodes.resource_tracker import ResourceDictInstance
# 定义泛型类型变量 # 定义泛型类型变量
T = TypeVar("T") T = TypeVar("T")
@@ -18,12 +19,11 @@ class ROS2DeviceNodeWrapper(ROS2DeviceNode):
def ros2_device_node( def ros2_device_node(
cls: Type[T], cls: Type[T],
device_config: Optional[Dict[str, Any]] = None, device_config: Optional[ResourceDictInstance] = None,
status_types: Optional[Dict[str, Any]] = None, status_types: Optional[Dict[str, Any]] = None,
action_value_mappings: Optional[Dict[str, Any]] = None, action_value_mappings: Optional[Dict[str, Any]] = None,
hardware_interface: Optional[Dict[str, Any]] = None, hardware_interface: Optional[Dict[str, Any]] = None,
print_publish: bool = False, print_publish: bool = False,
children: Optional[Dict[str, Any]] = None,
) -> Type[ROS2DeviceNodeWrapper]: ) -> Type[ROS2DeviceNodeWrapper]:
"""Create a ROS2 Node class for a device class with properties and actions. """Create a ROS2 Node class for a device class with properties and actions.
@@ -45,7 +45,7 @@ def ros2_device_node(
if status_types is None: if status_types is None:
status_types = {} status_types = {}
if device_config is None: if device_config is None:
device_config = {} raise ValueError("device_config cannot be None")
if action_value_mappings is None: if action_value_mappings is None:
action_value_mappings = {} action_value_mappings = {}
if hardware_interface is None: if hardware_interface is None:
@@ -82,7 +82,6 @@ def ros2_device_node(
action_value_mappings=action_value_mappings, action_value_mappings=action_value_mappings,
hardware_interface=hardware_interface, hardware_interface=hardware_interface,
print_publish=print_publish, print_publish=print_publish,
children=children,
*args, *args,
**kwargs, **kwargs,
), ),

View File

@@ -4,13 +4,14 @@ from typing import Optional
from unilabos.registry.registry import lab_registry from unilabos.registry.registry import lab_registry
from unilabos.ros.device_node_wrapper import ros2_device_node from unilabos.ros.device_node_wrapper import ros2_device_node
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, DeviceInitError from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, DeviceInitError
from unilabos.ros.nodes.resource_tracker import ResourceDictInstance
from unilabos.utils import logger from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.import_manager import default_manager from unilabos.utils.import_manager import default_manager
def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2DeviceNode]: def initialize_device_from_dict(device_id, device_config: ResourceDictInstance) -> Optional[ROS2DeviceNode]:
"""Initializes a device based on its configuration. """Initializes a device based on its configuration.
This function dynamically imports the appropriate device class and creates an instance of it using the provided device configuration. This function dynamically imports the appropriate device class and creates an instance of it using the provided device configuration.
@@ -24,15 +25,14 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device
None None
""" """
d = None d = None
original_device_config = copy.deepcopy(device_config) device_class_config = device_config.res_content.klass
device_class_config = device_config["class"] uid = device_config.res_content.uuid
uid = device_config["uuid"]
if isinstance(device_class_config, str): # 如果是字符串则直接去lab_registry中查找获取class if isinstance(device_class_config, str): # 如果是字符串则直接去lab_registry中查找获取class
if len(device_class_config) == 0: if len(device_class_config) == 0:
raise DeviceClassInvalid(f"Device [{device_id}] class cannot be an empty string. {device_config}") raise DeviceClassInvalid(f"Device [{device_id}] class cannot be an empty string. {device_config}")
if device_class_config not in lab_registry.device_type_registry: if device_class_config not in lab_registry.device_type_registry:
raise DeviceClassInvalid(f"Device [{device_id}] class {device_class_config} not found. {device_config}") raise DeviceClassInvalid(f"Device [{device_id}] class {device_class_config} not found. {device_config}")
device_class_config = device_config["class"] = lab_registry.device_type_registry[device_class_config]["class"] device_class_config = lab_registry.device_type_registry[device_class_config]["class"]
elif isinstance(device_class_config, dict): elif isinstance(device_class_config, dict):
raise DeviceClassInvalid(f"Device [{device_id}] class config should be type 'str' but 'dict' got. {device_config}") raise DeviceClassInvalid(f"Device [{device_id}] class config should be type 'str' but 'dict' got. {device_config}")
if isinstance(device_class_config, dict): if isinstance(device_class_config, dict):
@@ -41,17 +41,16 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device
DEVICE = ros2_device_node( DEVICE = ros2_device_node(
DEVICE, DEVICE,
status_types=device_class_config.get("status_types", {}), status_types=device_class_config.get("status_types", {}),
device_config=original_device_config, device_config=device_config,
action_value_mappings=device_class_config.get("action_value_mappings", {}), action_value_mappings=device_class_config.get("action_value_mappings", {}),
hardware_interface=device_class_config.get( hardware_interface=device_class_config.get(
"hardware_interface", "hardware_interface",
{"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []}, {"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []},
), )
children=device_config.get("children", {})
) )
try: try:
d = DEVICE( d = DEVICE(
device_id=device_id, device_uuid=uid, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.get("config", {}) device_id=device_id, device_uuid=uid, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.res_content.config
) )
except DeviceInitError as ex: except DeviceInitError as ex:
return d return d

View File

@@ -192,7 +192,7 @@ def slave(
for device_config in devices_config.root_nodes: for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id device_id = device_config.res_content.id
if device_config.res_content.type == "device": if device_config.res_content.type == "device":
d = initialize_device_from_dict(device_id, device_config.get_nested_dict()) d = initialize_device_from_dict(device_id, device_config)
if d is not None: if d is not None:
devices_instances[device_id] = d devices_instances[device_id] = d
logger.info(f"Device {device_id} initialized.") logger.info(f"Device {device_id} initialized.")

View File

@@ -48,7 +48,7 @@ from unilabos_msgs.msg import Resource # type: ignore
from unilabos.ros.nodes.resource_tracker import ( from unilabos.ros.nodes.resource_tracker import (
DeviceNodeResourceTracker, DeviceNodeResourceTracker,
ResourceTreeSet, ResourceTreeSet,
ResourceTreeInstance, ResourceTreeInstance, ResourceDictInstance,
) )
from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
@@ -133,12 +133,11 @@ def init_wrapper(
device_id: str, device_id: str,
device_uuid: str, device_uuid: str,
driver_class: type[T], driver_class: type[T],
device_config: Dict[str, Any], device_config: ResourceTreeInstance,
status_types: Dict[str, Any], status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any], action_value_mappings: Dict[str, Any],
hardware_interface: Dict[str, Any], hardware_interface: Dict[str, Any],
print_publish: bool, print_publish: bool,
children: Optional[list] = None,
driver_params: Optional[Dict[str, Any]] = None, driver_params: Optional[Dict[str, Any]] = None,
driver_is_ros: bool = False, driver_is_ros: bool = False,
*args, *args,
@@ -147,8 +146,6 @@ def init_wrapper(
"""初始化设备节点的包装函数和ROS2DeviceNode初始化保持一致""" """初始化设备节点的包装函数和ROS2DeviceNode初始化保持一致"""
if driver_params is None: if driver_params is None:
driver_params = kwargs.copy() driver_params = kwargs.copy()
if children is None:
children = []
kwargs["device_id"] = device_id kwargs["device_id"] = device_id
kwargs["device_uuid"] = device_uuid kwargs["device_uuid"] = device_uuid
kwargs["driver_class"] = driver_class kwargs["driver_class"] = driver_class
@@ -157,7 +154,6 @@ def init_wrapper(
kwargs["status_types"] = status_types kwargs["status_types"] = status_types
kwargs["action_value_mappings"] = action_value_mappings kwargs["action_value_mappings"] = action_value_mappings
kwargs["hardware_interface"] = hardware_interface kwargs["hardware_interface"] = hardware_interface
kwargs["children"] = children
kwargs["print_publish"] = print_publish kwargs["print_publish"] = print_publish
kwargs["driver_is_ros"] = driver_is_ros kwargs["driver_is_ros"] = driver_is_ros
super(type(self), self).__init__(*args, **kwargs) super(type(self), self).__init__(*args, **kwargs)
@@ -586,7 +582,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
except Exception as e: except Exception as e:
self.lab_logger().error(f"更新资源uuid失败: {e}") self.lab_logger().error(f"更新资源uuid失败: {e}")
self.lab_logger().error(traceback.format_exc()) self.lab_logger().error(traceback.format_exc())
self.lab_logger().debug(f"资源更新结果: {response}") self.lab_logger().trace(f"资源更新结果: {response}")
async def get_resource(self, resources_uuid: List[str], with_children: bool = True) -> ResourceTreeSet: async def get_resource(self, resources_uuid: List[str], with_children: bool = True) -> ResourceTreeSet:
""" """
@@ -1144,7 +1140,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
queried_resources = [] queried_resources = []
for resource_data in resource_inputs: for resource_data in resource_inputs:
plr_resource = await self.get_resource_with_dir( plr_resource = await self.get_resource_with_dir(
resource_ids=resource_data["id"], with_children=True resource_id=resource_data["id"], with_children=True
) )
queried_resources.append(plr_resource) queried_resources.append(plr_resource)
@@ -1168,7 +1164,6 @@ class BaseROS2DeviceNode(Node, Generic[T]):
execution_error = traceback.format_exc() execution_error = traceback.format_exc()
break break
##### self.lab_logger().info(f"准备执行: {action_kwargs}, 函数: {ACTION.__name__}")
time_start = time.time() time_start = time.time()
time_overall = 100 time_overall = 100
future = None future = None
@@ -1176,35 +1171,36 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 将阻塞操作放入线程池执行 # 将阻塞操作放入线程池执行
if asyncio.iscoroutinefunction(ACTION): if asyncio.iscoroutinefunction(ACTION):
try: try:
##### self.lab_logger().info(f"异步执行动作 {ACTION}") self.lab_logger().trace(f"异步执行动作 {ACTION}")
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) def _handle_future_exception(fut: Future):
def _handle_future_exception(fut):
nonlocal execution_error, execution_success, action_return_value nonlocal execution_error, execution_success, action_return_value
try: try:
action_return_value = fut.result() action_return_value = fut.result()
if isinstance(action_return_value, BaseException):
raise action_return_value
execution_success = True execution_success = True
except Exception as e: except Exception as _:
execution_error = traceback.format_exc() execution_error = traceback.format_exc()
error( error(
f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
) )
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs)
future.add_done_callback(_handle_future_exception) future.add_done_callback(_handle_future_exception)
except Exception as e: except Exception as e:
execution_error = traceback.format_exc() execution_error = traceback.format_exc()
execution_success = False execution_success = False
self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}") self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}")
else: else:
##### self.lab_logger().info(f"同步执行动作 {ACTION}") self.lab_logger().trace(f"同步执行动作 {ACTION}")
future = self._executor.submit(ACTION, **action_kwargs) future = self._executor.submit(ACTION, **action_kwargs)
def _handle_future_exception(fut): def _handle_future_exception(fut: Future):
nonlocal execution_error, execution_success, action_return_value nonlocal execution_error, execution_success, action_return_value
try: try:
action_return_value = fut.result() action_return_value = fut.result()
execution_success = True execution_success = True
except Exception as e: except Exception as _:
execution_error = traceback.format_exc() execution_error = traceback.format_exc()
error( error(
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
@@ -1309,7 +1305,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
get_result_info_str(execution_error, execution_success, action_return_value), get_result_info_str(execution_error, execution_success, action_return_value),
) )
##### self.lab_logger().info(f"动作 {action_name} 完成并返回结果") self.lab_logger().trace(f"动作 {action_name} 完成并返回结果")
return result_msg return result_msg
return execute_callback return execute_callback
@@ -1544,17 +1540,29 @@ class ROS2DeviceNode:
这个类封装了设备类实例和ROS2节点的功能提供ROS2接口。 这个类封装了设备类实例和ROS2节点的功能提供ROS2接口。
它不继承设备类,而是通过代理模式访问设备类的属性和方法。 它不继承设备类,而是通过代理模式访问设备类的属性和方法。
""" """
@staticmethod
async def safe_task_wrapper(trace_callback, func, **kwargs):
try:
if callable(trace_callback):
trace_callback(await func(**kwargs))
return await func(**kwargs)
except Exception as e:
if callable(trace_callback):
trace_callback(e)
return e
@classmethod @classmethod
def run_async_func(cls, func, trace_error=True, **kwargs) -> Task: def run_async_func(cls, func, trace_error=True, inner_trace_callback=None, **kwargs) -> Task:
def _handle_future_exception(fut): def _handle_future_exception(fut: Future):
try: try:
fut.result() ret = fut.result()
if isinstance(ret, BaseException):
raise ret
except Exception as e: except Exception as e:
error(f"异步任务 {func.__name__} 报错了") error(f"异步任务 {func.__name__} 获取结果失败")
error(traceback.format_exc()) error(traceback.format_exc())
future = rclpy.get_global_executor().create_task(func(**kwargs)) future = rclpy.get_global_executor().create_task(ROS2DeviceNode.safe_task_wrapper(inner_trace_callback, func, **kwargs))
if trace_error: if trace_error:
future.add_done_callback(_handle_future_exception) future.add_done_callback(_handle_future_exception)
return future return future
@@ -1582,12 +1590,11 @@ class ROS2DeviceNode:
device_id: str, device_id: str,
device_uuid: str, device_uuid: str,
driver_class: Type[T], driver_class: Type[T],
device_config: Dict[str, Any], device_config: ResourceDictInstance,
driver_params: Dict[str, Any], driver_params: Dict[str, Any],
status_types: Dict[str, Any], status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any], action_value_mappings: Dict[str, Any],
hardware_interface: Dict[str, Any], hardware_interface: Dict[str, Any],
children: Dict[str, Any],
print_publish: bool = True, print_publish: bool = True,
driver_is_ros: bool = False, driver_is_ros: bool = False,
): ):
@@ -1598,7 +1605,7 @@ class ROS2DeviceNode:
device_id: 设备标识符 device_id: 设备标识符
device_uuid: 设备uuid device_uuid: 设备uuid
driver_class: 设备类 driver_class: 设备类
device_config: 原始初始化的json device_config: 原始初始化的ResourceDictInstance
driver_params: driver初始化的参数 driver_params: driver初始化的参数
status_types: 状态类型映射 status_types: 状态类型映射
action_value_mappings: 动作值映射 action_value_mappings: 动作值映射
@@ -1612,6 +1619,7 @@ class ROS2DeviceNode:
self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__") self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__")
self._driver_class = driver_class self._driver_class = driver_class
self.device_config = device_config self.device_config = device_config
children: List[ResourceDictInstance] = device_config.children
self.driver_is_ros = driver_is_ros self.driver_is_ros = driver_is_ros
self.driver_is_workstation = False self.driver_is_workstation = False
self.resource_tracker = DeviceNodeResourceTracker() self.resource_tracker = DeviceNodeResourceTracker()

View File

@@ -289,6 +289,12 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info("[Host Node] Host node initialized.") self.lab_logger().info("[Host Node] Host node initialized.")
HostNode._ready_event.set() HostNode._ready_event.set()
# 发送host_node ready信号到所有桥接器
for bridge in self.bridges:
if hasattr(bridge, "publish_host_ready"):
bridge.publish_host_ready()
self.lab_logger().debug(f"Host ready signal sent via {bridge.__class__.__name__}")
def _send_re_register(self, sclient): def _send_re_register(self, sclient):
sclient.wait_for_service() sclient.wait_for_service()
request = SerialCommand.Request() request = SerialCommand.Request()
@@ -532,7 +538,7 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info(f"[Host Node] Initializing device: {device_id}") self.lab_logger().info(f"[Host Node] Initializing device: {device_id}")
try: try:
d = initialize_device_from_dict(device_id, device_config.get_nested_dict()) d = initialize_device_from_dict(device_id, device_config)
except DeviceClassInvalid as e: except DeviceClassInvalid as e:
self.lab_logger().error(f"[Host Node] Device class invalid: {e}") self.lab_logger().error(f"[Host Node] Device class invalid: {e}")
d = None d = None
@@ -712,7 +718,7 @@ class HostNode(BaseROS2DeviceNode):
feedback_callback=lambda feedback_msg: self.feedback_callback(item, action_id, feedback_msg), feedback_callback=lambda feedback_msg: self.feedback_callback(item, action_id, feedback_msg),
goal_uuid=goal_uuid_obj, goal_uuid=goal_uuid_obj,
) )
future.add_done_callback(lambda future: self.goal_response_callback(item, action_id, future)) future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f))
def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None: def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""目标响应回调""" """目标响应回调"""
@@ -723,9 +729,11 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info(f"[Host Node] Goal {action_id} ({item.job_id}) accepted") self.lab_logger().info(f"[Host Node] Goal {action_id} ({item.job_id}) accepted")
self._goals[item.job_id] = goal_handle self._goals[item.job_id] = goal_handle
goal_handle.get_result_async().add_done_callback( goal_future = goal_handle.get_result_async()
lambda future: self.get_result_callback(item, action_id, future) goal_future.add_done_callback(
lambda f: self.get_result_callback(item, action_id, f)
) )
goal_future.result()
def feedback_callback(self, item: "QueueItem", action_id: str, feedback_msg) -> None: def feedback_callback(self, item: "QueueItem", action_id: str, feedback_msg) -> None:
"""反馈回调""" """反馈回调"""
@@ -794,6 +802,7 @@ class HostNode(BaseROS2DeviceNode):
# 存储结果供 HTTP API 查询 # 存储结果供 HTTP API 查询
try: try:
from unilabos.app.web.controller import store_job_result from unilabos.app.web.controller import store_job_result
if goal_status == GoalStatus.STATUS_CANCELED: if goal_status == GoalStatus.STATUS_CANCELED:
store_job_result(job_id, status, return_info, {}) store_job_result(job_id, status, return_info, {})
else: else:

View File

@@ -24,7 +24,7 @@ from unilabos.ros.msgs.message_converter import (
convert_from_ros_msg_with_mapping, convert_from_ros_msg_with_mapping,
) )
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode
from unilabos.ros.nodes.resource_tracker import ResourceTreeSet from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDictInstance
from unilabos.utils.type_check import get_result_info_str from unilabos.utils.type_check import get_result_info_str
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -47,7 +47,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
def __init__( def __init__(
self, self,
protocol_type: List[str], protocol_type: List[str],
children: Dict[str, Any], children: List[ResourceDictInstance],
*, *,
driver_instance: "WorkstationBase", driver_instance: "WorkstationBase",
device_id: str, device_id: str,
@@ -81,10 +81,11 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
# 初始化子设备 # 初始化子设备
self.communication_node_id_to_instance = {} self.communication_node_id_to_instance = {}
for device_id, device_config in self.children.items(): for device_config in self.children:
if device_config.get("type", "device") != "device": device_id = device_config.res_content.id
if device_config.res_content.type != "device":
self.lab_logger().debug( self.lab_logger().debug(
f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping." f"[Protocol Node] Skipping type {device_config.res_content.type} {device_id} already existed, skipping."
) )
continue continue
try: try:
@@ -101,8 +102,9 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
self.communication_node_id_to_instance[device_id] = d self.communication_node_id_to_instance[device_id] = d
continue continue
for device_id, device_config in self.children.items(): for device_config in self.children:
if device_config.get("type", "device") != "device": device_id = device_config.res_content.id
if device_config.res_content.type != "device":
continue continue
# 设置硬件接口代理 # 设置硬件接口代理
if device_id not in self.sub_devices: if device_id not in self.sub_devices:

View File

@@ -1,9 +1,11 @@
import inspect
import traceback import traceback
import uuid import uuid
from pydantic import BaseModel, field_serializer, field_validator from pydantic import BaseModel, field_serializer, field_validator
from pydantic import Field from pydantic import Field
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
from unilabos.resources.plr_additional_res_reg import register
from unilabos.utils.log import logger from unilabos.utils.log import logger
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -62,7 +64,6 @@ class ResourceDict(BaseModel):
parent: Optional["ResourceDict"] = Field(description="Parent resource object", default=None, exclude=True) parent: Optional["ResourceDict"] = Field(description="Parent resource object", default=None, exclude=True)
type: Union[Literal["device"], str] = Field(description="Resource type") type: Union[Literal["device"], str] = Field(description="Resource type")
klass: str = Field(alias="class", description="Resource class name") klass: str = Field(alias="class", description="Resource class name")
position: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
config: Dict[str, Any] = Field(description="Resource configuration") config: Dict[str, Any] = Field(description="Resource configuration")
data: Dict[str, Any] = Field(description="Resource data") data: Dict[str, Any] = Field(description="Resource data")
@@ -146,15 +147,16 @@ class ResourceDictInstance(object):
if not content.get("extra"): # MagicCode if not content.get("extra"): # MagicCode
content["extra"] = {} content["extra"] = {}
if "pose" not in content: if "pose" not in content:
content["pose"] = content.get("position", {}) content["pose"] = content.pop("position", {})
return ResourceDictInstance(ResourceDict.model_validate(content)) return ResourceDictInstance(ResourceDict.model_validate(content))
def get_nested_dict(self) -> Dict[str, Any]: def get_plr_nested_dict(self) -> Dict[str, Any]:
"""获取资源实例的嵌套字典表示""" """获取资源实例的嵌套字典表示"""
res_dict = self.res_content.model_dump(by_alias=True) res_dict = self.res_content.model_dump(by_alias=True)
res_dict["children"] = {child.res_content.id: child.get_nested_dict() for child in self.children} res_dict["children"] = {child.res_content.id: child.get_plr_nested_dict() for child in self.children}
res_dict["parent"] = self.res_content.parent_instance_name res_dict["parent"] = self.res_content.parent_instance_name
res_dict["position"] = self.res_content.position.position.model_dump() res_dict["position"] = self.res_content.pose.position.model_dump()
del res_dict["pose"]
return res_dict return res_dict
@@ -429,9 +431,9 @@ class ResourceTreeSet(object):
Returns: Returns:
List[PLRResource]: PLR 资源实例列表 List[PLRResource]: PLR 资源实例列表
""" """
register()
from pylabrobot.resources import Resource as PLRResource from pylabrobot.resources import Resource as PLRResource
from pylabrobot.utils.object_parsing import find_subclass from pylabrobot.utils.object_parsing import find_subclass
import inspect
# 类型映射 # 类型映射
TYPE_MAP = {"plate": "Plate", "well": "Well", "deck": "Deck", "container": "RegularContainer"} TYPE_MAP = {"plate": "Plate", "well": "Well", "deck": "Deck", "container": "RegularContainer"}
@@ -459,9 +461,9 @@ class ResourceTreeSet(object):
"size_y": res.config.get("size_y", 0), "size_y": res.config.get("size_y", 0),
"size_z": res.config.get("size_z", 0), "size_z": res.config.get("size_z", 0),
"location": { "location": {
"x": res.position.position.x, "x": res.pose.position.x,
"y": res.position.position.y, "y": res.pose.position.y,
"z": res.position.position.z, "z": res.pose.position.z,
"type": "Coordinate", "type": "Coordinate",
}, },
"rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"}, "rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"},

View File

@@ -9,10 +9,11 @@ import asyncio
import inspect import inspect
import traceback import traceback
from abc import abstractmethod from abc import abstractmethod
from typing import Type, Any, Dict, Optional, TypeVar, Generic from typing import Type, Any, Dict, Optional, TypeVar, Generic, List
from unilabos.resources.graphio import nested_dict_to_list, resource_ulab_to_plr from unilabos.resources.graphio import nested_dict_to_list, resource_ulab_to_plr
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet, ResourceDictInstance, \
ResourceTreeInstance
from unilabos.utils import logger, import_manager from unilabos.utils import logger, import_manager
from unilabos.utils.cls_creator import create_instance_from_config from unilabos.utils.cls_creator import create_instance_from_config
@@ -33,7 +34,7 @@ class DeviceClassCreator(Generic[T]):
这个类提供了从任意类创建实例的通用方法。 这个类提供了从任意类创建实例的通用方法。
""" """
def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker): def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker):
""" """
初始化设备类创建器 初始化设备类创建器
@@ -50,9 +51,9 @@ class DeviceClassCreator(Generic[T]):
附加资源到设备类实例 附加资源到设备类实例
""" """
if self.device_instance is not None: if self.device_instance is not None:
for c in self.children.values(): for c in self.children:
if c["type"] != "device": if c.res_content.type != "device":
self.resource_tracker.add_resource(c) self.resource_tracker.add_resource(c.get_plr_nested_dict())
def create_instance(self, data: Dict[str, Any]) -> T: def create_instance(self, data: Dict[str, Any]) -> T:
""" """
@@ -94,7 +95,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
这个类提供了针对PyLabRobot设备类的实例创建方法特别处理deserialize方法。 这个类提供了针对PyLabRobot设备类的实例创建方法特别处理deserialize方法。
""" """
def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker): def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker):
""" """
初始化PyLabRobot设备类创建器 初始化PyLabRobot设备类创建器
@@ -111,12 +112,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
def attach_resource(self): def attach_resource(self):
pass # 只能增加实例化物料,原来默认物料仅为字典查询 pass # 只能增加实例化物料,原来默认物料仅为字典查询
def _process_resource_mapping(self, resource, source_type): # def _process_resource_mapping(self, resource, source_type):
if source_type == dict: # if source_type == dict:
from pylabrobot.resources.resource import Resource # from pylabrobot.resources.resource import Resource
#
return nested_dict_to_list(resource), Resource # return nested_dict_to_list(resource), Resource
return resource, source_type # return resource, source_type
def _process_resource_references( def _process_resource_references(
self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None
@@ -142,15 +143,21 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
if isinstance(data, dict): if isinstance(data, dict):
if "_resource_child_name" in data: if "_resource_child_name" in data:
child_name = data["_resource_child_name"] child_name = data["_resource_child_name"]
if child_name in self.children: resource: Optional[ResourceDictInstance] = None
resource = self.children[child_name] for child in self.children:
if child.res_content.name == child_name:
resource = child
if resource is not None:
if "_resource_type" in data: if "_resource_type" in data:
type_path = data["_resource_type"] type_path = data["_resource_type"]
try: try:
target_type = import_manager.get_class(type_path) # target_type = import_manager.get_class(type_path)
contain_model = not issubclass(target_type, Deck) # contain_model = not issubclass(target_type, Deck)
resource, target_type = self._process_resource_mapping(resource, target_type) # resource, target_type = self._process_resource_mapping(resource, target_type)
resource_instance: Resource = resource_ulab_to_plr(resource, contain_model) # 带state res_tree = ResourceTreeInstance(resource)
res_tree_set = ResourceTreeSet([res_tree])
resource_instance: Resource = res_tree_set.to_plr_resources()[0]
# resource_instance: Resource = resource_ulab_to_plr(resource, contain_model) # 带state
states[prefix_path] = resource_instance.serialize_all_state() states[prefix_path] = resource_instance.serialize_all_state()
# 使用 prefix_path 作为 key 存储资源状态 # 使用 prefix_path 作为 key 存储资源状态
if to_dict: if to_dict:
@@ -202,12 +209,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
stack = None stack = None
# 递归遍历 children 构建 name_to_uuid 映射 # 递归遍历 children 构建 name_to_uuid 映射
def collect_name_to_uuid(children_dict: Dict[str, Any], result: Dict[str, str]): def collect_name_to_uuid(children_list: List[ResourceDictInstance], result: Dict[str, str]):
"""递归遍历嵌套的 children 字典,收集 name 到 uuid 的映射""" """递归遍历嵌套的 children 字典,收集 name 到 uuid 的映射"""
for child in children_dict.values(): for child in children_list:
if isinstance(child, dict): if isinstance(child, ResourceDictInstance):
result[child["name"]] = child["uuid"] result[child.res_content.name] = child.res_content.uuid
collect_name_to_uuid(child["children"], result) collect_name_to_uuid(child.children, result)
name_to_uuid = {} name_to_uuid = {}
collect_name_to_uuid(self.children, name_to_uuid) collect_name_to_uuid(self.children, name_to_uuid)
@@ -313,7 +320,7 @@ class WorkstationNodeCreator(DeviceClassCreator[T]):
这个类提供了针对WorkstationNode设备类的实例创建方法处理children参数。 这个类提供了针对WorkstationNode设备类的实例创建方法处理children参数。
""" """
def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker): def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker):
""" """
初始化WorkstationNode设备类创建器 初始化WorkstationNode设备类创建器
@@ -336,9 +343,9 @@ class WorkstationNodeCreator(DeviceClassCreator[T]):
try: try:
# 创建实例额外补充一个给protocol node的字段后面考虑取消 # 创建实例额外补充一个给protocol node的字段后面考虑取消
data["children"] = self.children data["children"] = self.children
for material_id, child in self.children.items(): for child in self.children:
if child["type"] != "device": if child.res_content.type != "device":
self.resource_tracker.add_resource(self.children[material_id]) self.resource_tracker.add_resource(child.get_plr_nested_dict())
deck_dict = data.get("deck") deck_dict = data.get("deck")
if deck_dict: if deck_dict:
from pylabrobot.resources import Deck, Resource from pylabrobot.resources import Deck, Resource

View File

@@ -69,7 +69,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "device", "type": "device",
"class": "syringepump.runze", "class": "syringe_pump_with_valve.runze.SY03B-T08",
"position": { "position": {
"x": 620.6111111111111, "x": 620.6111111111111,
"y": 171, "y": 171,
@@ -93,7 +93,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 430.4087301587302, "x": 430.4087301587302,
"y": 428, "y": 428,
@@ -117,7 +117,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 295.36944444444447, "x": 295.36944444444447,
"y": 428, "y": 428,
@@ -141,7 +141,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 165.36944444444444, "x": 165.36944444444444,
"y": 428, "y": 428,
@@ -165,7 +165,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 165.36944444444444, "x": 165.36944444444444,
"y": 428, "y": 428,
@@ -189,7 +189,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 35, "x": 35,
"y": 428, "y": 428,
@@ -213,7 +213,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 698.1111111111111, "x": 698.1111111111111,
"y": 428, "y": 428,
@@ -255,7 +255,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "device", "type": "device",
"class": "syringepump.runze", "class": "syringe_pump_with_valve.runze.SY03B-T08",
"position": { "position": {
"x": 1195.611507936508, "x": 1195.611507936508,
"y": 686, "y": 686,
@@ -279,7 +279,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 1587.703373015873, "x": 1587.703373015873,
"y": 1172.5, "y": 1172.5,
@@ -299,7 +299,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "device", "type": "device",
"class": "separator_controller", "class": "separator.homemade",
"position": { "position": {
"x": 1624.4027777777778, "x": 1624.4027777777778,
"y": 665.5, "y": 665.5,
@@ -320,7 +320,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 1614.404365079365, "x": 1614.404365079365,
"y": 948, "y": 948,
@@ -340,7 +340,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 1915.7035714285714, "x": 1915.7035714285714,
"y": 665.5, "y": 665.5,
@@ -360,7 +360,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 1785.7035714285714, "x": 1785.7035714285714,
"y": 665.5, "y": 665.5,
@@ -384,7 +384,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 2054.0650793650793, "x": 2054.0650793650793,
"y": 665.5, "y": 665.5,
@@ -408,7 +408,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "device", "type": "device",
"class": "syringepump.runze", "class": "syringe_pump_with_valve.runze.SY03B-T08",
"position": { "position": {
"x": 1630.6527777777778, "x": 1630.6527777777778,
"y": 448.5, "y": 448.5,
@@ -432,7 +432,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "device", "type": "device",
"class": "rotavap", "class": "rotavap.one",
"position": { "position": {
"x": 1339.7031746031746, "x": 1339.7031746031746,
"y": 968.5, "y": 968.5,
@@ -453,7 +453,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 1339.7031746031746, "x": 1339.7031746031746,
"y": 1152, "y": 1152,
@@ -473,7 +473,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 909.722619047619, "x": 909.722619047619,
"y": 948, "y": 948,
@@ -493,7 +493,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 867.972619047619, "x": 867.972619047619,
"y": 1152, "y": 1152,
@@ -513,7 +513,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 742.722619047619, "x": 742.722619047619,
"y": 948, "y": 948,
@@ -533,7 +533,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 1206.722619047619, "x": 1206.722619047619,
"y": 948, "y": 948,
@@ -553,7 +553,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "container", "type": "container",
"class": null, "class": "container",
"position": { "position": {
"x": 1148.222619047619, "x": 1148.222619047619,
"y": 1152, "y": 1152,
@@ -573,7 +573,7 @@
"children": [], "children": [],
"parent": "YugongStation", "parent": "YugongStation",
"type": "device", "type": "device",
"class": "syringepump.runze", "class": "syringe_pump_with_valve.runze.SY03B-T08",
"position": { "position": {
"x": 1469.7031746031746, "x": 1469.7031746031746,
"y": 968.5, "y": 968.5,

View File

@@ -124,11 +124,14 @@ class ColoredFormatter(logging.Formatter):
def _format_basic(self, record): def _format_basic(self, record):
"""基本格式化,不包含颜色""" """基本格式化,不包含颜色"""
datetime_str = datetime.fromtimestamp(record.created).strftime("%y-%m-%d [%H:%M:%S,%f")[:-3] + "]" datetime_str = datetime.fromtimestamp(record.created).strftime("%y-%m-%d [%H:%M:%S,%f")[:-3] + "]"
filename = os.path.basename(record.filename).rsplit(".", 1)[0] # 提取文件名(不含路径和扩展名) filename = record.filename.replace(".py", "").split("\\")[-1] # 提取文件名(不含路径和扩展名)
if "/" in filename:
filename = filename.split("/")[-1]
module_path = f"{record.name}.{filename}" module_path = f"{record.name}.{filename}"
func_line = f"{record.funcName}:{record.lineno}" func_line = f"{record.funcName}:{record.lineno}"
right_info = f" [{func_line}] [{module_path}]"
formatted_message = f"{datetime_str} [{record.levelname}] [{module_path}] [{func_line}]: {record.getMessage()}" formatted_message = f"{datetime_str} [{record.levelname}] {record.getMessage()}{right_info}"
if record.exc_info: if record.exc_info:
exc_text = self.formatException(record.exc_info) exc_text = self.formatException(record.exc_info)
@@ -150,7 +153,7 @@ class ColoredFormatter(logging.Formatter):
# 配置日志处理器 # 配置日志处理器
def configure_logger(loglevel=None): def configure_logger(loglevel=None, working_dir=None):
"""配置日志记录器 """配置日志记录器
Args: Args:
@@ -159,8 +162,9 @@ def configure_logger(loglevel=None):
""" """
# 获取根日志记录器 # 获取根日志记录器
root_logger = logging.getLogger() root_logger = logging.getLogger()
root_logger.setLevel(TRACE_LEVEL)
# 设置日志级别 # 设置日志级别
numeric_level = logging.DEBUG
if loglevel is not None: if loglevel is not None:
if isinstance(loglevel, str): if isinstance(loglevel, str):
# 将字符串转换为logging级别 # 将字符串转换为logging级别
@@ -170,12 +174,8 @@ def configure_logger(loglevel=None):
numeric_level = getattr(logging, loglevel.upper(), None) numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int): if not isinstance(numeric_level, int):
print(f"警告: 无效的日志级别 '{loglevel}',使用默认级别 DEBUG") print(f"警告: 无效的日志级别 '{loglevel}',使用默认级别 DEBUG")
numeric_level = logging.DEBUG
else: else:
numeric_level = loglevel numeric_level = loglevel
root_logger.setLevel(numeric_level)
else:
root_logger.setLevel(logging.DEBUG) # 默认级别
# 移除已存在的处理器 # 移除已存在的处理器
for handler in root_logger.handlers[:]: for handler in root_logger.handlers[:]:
@@ -183,7 +183,7 @@ def configure_logger(loglevel=None):
# 创建控制台处理器 # 创建控制台处理器
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler()
console_handler.setLevel(root_logger.level) # 使用与根记录器相同的级别 console_handler.setLevel(numeric_level) # 使用与根记录器相同的级别
# 使用自定义的颜色格式化器 # 使用自定义的颜色格式化器
color_formatter = ColoredFormatter() color_formatter = ColoredFormatter()
@@ -191,9 +191,30 @@ def configure_logger(loglevel=None):
# 添加处理器到根日志记录器 # 添加处理器到根日志记录器
root_logger.addHandler(console_handler) root_logger.addHandler(console_handler)
# 如果指定了工作目录,添加文件处理器
if working_dir is not None:
logs_dir = os.path.join(working_dir, "logs")
os.makedirs(logs_dir, exist_ok=True)
# 生成日志文件名:日期 时间.log
log_filename = datetime.now().strftime("%Y-%m-%d %H-%M-%S") + ".log"
log_filepath = os.path.join(logs_dir, log_filename)
# 创建文件处理器
file_handler = logging.FileHandler(log_filepath, encoding="utf-8")
file_handler.setLevel(TRACE_LEVEL)
# 使用不带颜色的格式化器
file_formatter = ColoredFormatter(use_colors=False)
file_handler.setFormatter(file_formatter)
root_logger.addHandler(file_handler)
logging.getLogger("asyncio").setLevel(logging.INFO) logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.INFO)
# 配置日志系统 # 配置日志系统
configure_logger() configure_logger()

View File

@@ -2,7 +2,7 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"> <package format="3">
<name>unilabos_msgs</name> <name>unilabos_msgs</name>
<version>0.10.11</version> <version>0.10.12</version>
<description>ROS2 Messages package for unilabos devices</description> <description>ROS2 Messages package for unilabos devices</description>
<maintainer email="changjh@pku.edu.cn">Junhan Chang</maintainer> <maintainer email="changjh@pku.edu.cn">Junhan Chang</maintainer>
<maintainer email="18435084+Xuwznln@users.noreply.github.com">Xuwznln</maintainer> <maintainer email="18435084+Xuwznln@users.noreply.github.com">Xuwznln</maintainer>