Merge dev branch: Add battery resources, bioyond_cell device registry, and fix file path resolution

This commit is contained in:
dijkstra402
2025-12-18 11:11:59 +08:00
306 changed files with 47518 additions and 4826 deletions

View File

@@ -5,6 +5,7 @@
import argparse
import importlib
import locale
import subprocess
import sys
@@ -22,13 +23,33 @@ class EnvironmentChecker:
"websockets": "websockets",
"msgcenterpy": "msgcenterpy",
"opentrons_shared_data": "opentrons_shared_data",
"typing_extensions": "typing_extensions",
}
# 特殊安装包(需要特殊处理的包)
self.special_packages = {"pylabrobot": "git+https://github.com/Xuwznln/pylabrobot.git"}
# 包版本要求(包名: 最低版本)
self.version_requirements = {
"msgcenterpy": "0.1.5", # msgcenterpy 最低版本要求
}
self.missing_packages = []
self.failed_installs = []
self.packages_need_upgrade = []
# 检测系统语言
self.is_chinese = self._is_chinese_locale()
def _is_chinese_locale(self) -> bool:
"""检测系统是否为中文环境"""
try:
lang = locale.getdefaultlocale()[0]
if lang and ("zh" in lang.lower() or "chinese" in lang.lower()):
return True
except Exception:
pass
return False
def check_package_installed(self, package_name: str) -> bool:
"""检查包是否已安装"""
@@ -38,31 +59,74 @@ class EnvironmentChecker:
except ImportError:
return False
def install_package(self, package_name: str, pip_name: str) -> bool:
def get_package_version(self, package_name: str) -> str | None:
"""获取已安装包的版本"""
try:
module = importlib.import_module(package_name)
return getattr(module, "__version__", None)
except (ImportError, AttributeError):
return None
def compare_version(self, current: str, required: str) -> bool:
"""
比较版本号
Returns:
True: current >= required
False: current < required
"""
try:
current_parts = [int(x) for x in current.split(".")]
required_parts = [int(x) for x in required.split(".")]
# 补齐长度
max_len = max(len(current_parts), len(required_parts))
current_parts.extend([0] * (max_len - len(current_parts)))
required_parts.extend([0] * (max_len - len(required_parts)))
return current_parts >= required_parts
except Exception:
return True # 如果无法比较,假设版本满足要求
def install_package(self, package_name: str, pip_name: str, upgrade: bool = False) -> bool:
"""安装包"""
try:
print_status(f"正在安装 {package_name} ({pip_name})...", "info")
action = "升级" if upgrade else "安装"
print_status(f"正在{action} {package_name} ({pip_name})...", "info")
# 构建安装命令
cmd = [sys.executable, "-m", "pip", "install", pip_name]
cmd = [sys.executable, "-m", "pip", "install"]
# 如果是升级操作,添加 --upgrade 参数
if upgrade:
cmd.append("--upgrade")
cmd.append(pip_name)
# 如果是中文环境,使用清华镜像源
if self.is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
# 执行安装
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) # 5分钟超时
if result.returncode == 0:
print_status(f"{package_name} 安装成功", "success")
print_status(f"{package_name} {action}成功", "success")
return True
else:
print_status(f"× {package_name} 安装失败: {result.stderr}", "error")
print_status(f"× {package_name} {action}失败: {result.stderr}", "error")
return False
except subprocess.TimeoutExpired:
print_status(f"× {package_name} 安装超时", "error")
print_status(f"× {package_name} {action}超时", "error")
return False
except Exception as e:
print_status(f"× {package_name} 安装异常: {str(e)}", "error")
print_status(f"× {package_name} {action}异常: {str(e)}", "error")
return False
def upgrade_package(self, package_name: str, pip_name: str) -> bool:
"""升级包"""
return self.install_package(package_name, pip_name, upgrade=True)
def check_all_packages(self) -> bool:
"""检查所有必需的包"""
print_status("开始检查环境依赖...", "info")
@@ -71,60 +135,116 @@ class EnvironmentChecker:
for import_name, pip_name in self.required_packages.items():
if not self.check_package_installed(import_name):
self.missing_packages.append((import_name, pip_name))
else:
# 检查版本要求
if import_name in self.version_requirements:
current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name]
if current_version:
if not self.compare_version(current_version, required_version):
print_status(
f"{import_name} 版本过低 (当前: {current_version}, 需要: >={required_version})",
"warning",
)
self.packages_need_upgrade.append((import_name, pip_name))
# 检查特殊包
for package_name, install_url in self.special_packages.items():
if not self.check_package_installed(package_name):
self.missing_packages.append((package_name, install_url))
if not self.missing_packages:
all_ok = not self.missing_packages and not self.packages_need_upgrade
if all_ok:
print_status("✓ 所有依赖包检查完成,环境正常", "success")
return True
print_status(f"发现 {len(self.missing_packages)} 个缺失的包", "warning")
if self.missing_packages:
print_status(f"发现 {len(self.missing_packages)} 个缺失的包", "warning")
if self.packages_need_upgrade:
print_status(f"发现 {len(self.packages_need_upgrade)} 个需要升级的包", "warning")
return False
def install_missing_packages(self, auto_install: bool = True) -> bool:
"""安装缺失的包"""
if not self.missing_packages:
if not self.missing_packages and not self.packages_need_upgrade:
return True
if not auto_install:
print_status("缺失以下包:", "warning")
for import_name, pip_name in self.missing_packages:
print_status(f" - {import_name} (pip install {pip_name})", "warning")
if self.missing_packages:
print_status("缺失以下包:", "warning")
for import_name, pip_name in self.missing_packages:
print_status(f" - {import_name} (pip install {pip_name})", "warning")
if self.packages_need_upgrade:
print_status("需要升级以下包:", "warning")
for import_name, pip_name in self.packages_need_upgrade:
print_status(f" - {import_name} (pip install --upgrade {pip_name})", "warning")
return False
print_status(f"开始自动安装 {len(self.missing_packages)} 个缺失的包...", "info")
# 安装缺失的包
if self.missing_packages:
print_status(f"开始自动安装 {len(self.missing_packages)} 个缺失的包...", "info")
success_count = 0
for import_name, pip_name in self.missing_packages:
if self.install_package(import_name, pip_name):
success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
success_count = 0
for import_name, pip_name in self.missing_packages:
if self.install_package(import_name, pip_name):
success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功安装 {success_count}/{len(self.missing_packages)} 个包", "success")
# 升级需要更新的包
if self.packages_need_upgrade:
print_status(f"开始自动升级 {len(self.packages_need_upgrade)} 个包...", "info")
upgrade_success_count = 0
for import_name, pip_name in self.packages_need_upgrade:
if self.upgrade_package(import_name, pip_name):
upgrade_success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功升级 {upgrade_success_count}/{len(self.packages_need_upgrade)} 个包", "success")
if self.failed_installs:
print_status(f"{len(self.failed_installs)} 个包安装失败:", "error")
print_status(f"{len(self.failed_installs)} 个包操作失败:", "error")
for import_name, pip_name in self.failed_installs:
print_status(f" - {import_name} (pip install {pip_name})", "error")
print_status(f" - {import_name} ({pip_name})", "error")
return False
print_status(f"✓ 成功安装 {success_count} 个包", "success")
return True
def verify_installation(self) -> bool:
"""验证安装结果"""
if not self.missing_packages:
if not self.missing_packages and not self.packages_need_upgrade:
return True
print_status("验证安装结果...", "info")
failed_verification = []
# 验证新安装的包
for import_name, pip_name in self.missing_packages:
if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name))
# 验证升级的包
for import_name, pip_name in self.packages_need_upgrade:
if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name))
elif import_name in self.version_requirements:
current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name]
if current_version and not self.compare_version(current_version, required_version):
failed_verification.append((import_name, pip_name))
print_status(
f" {import_name} 版本仍然过低 (当前: {current_version}, 需要: >={required_version})",
"error",
)
if failed_verification:
print_status(f"{len(failed_verification)} 个包验证失败:", "error")
for import_name, pip_name in failed_verification:

View File

@@ -239,8 +239,12 @@ class ImportManager:
cls = get_class(class_path)
class_name = cls.__name__
result = {"class_name": class_name, "init_params": self._analyze_method_signature(cls.__init__)["args"],
"status_methods": {}, "action_methods": {}}
result = {
"class_name": class_name,
"init_params": self._analyze_method_signature(cls.__init__)["args"],
"status_methods": {},
"action_methods": {},
}
# 分析类的所有成员
for name, method in cls.__dict__.items():
if name.startswith("_"):
@@ -374,6 +378,7 @@ class ImportManager:
"name": method.__name__,
"args": args,
"return_type": self._get_type_string(signature.return_annotation),
"return_annotation": signature.return_annotation, # 保留原始类型注解用于TypedDict等特殊处理
"is_async": inspect.iscoroutinefunction(method),
}

View File

@@ -124,11 +124,14 @@ class ColoredFormatter(logging.Formatter):
def _format_basic(self, record):
"""基本格式化,不包含颜色"""
datetime_str = datetime.fromtimestamp(record.created).strftime("%y-%m-%d [%H:%M:%S,%f")[:-3] + "]"
filename = os.path.basename(record.filename).rsplit(".", 1)[0] # 提取文件名(不含路径和扩展名)
filename = record.filename.replace(".py", "").split("\\")[-1] # 提取文件名(不含路径和扩展名)
if "/" in filename:
filename = filename.split("/")[-1]
module_path = f"{record.name}.{filename}"
func_line = f"{record.funcName}:{record.lineno}"
right_info = f" [{func_line}] [{module_path}]"
formatted_message = f"{datetime_str} [{record.levelname}] [{module_path}] [{func_line}]: {record.getMessage()}"
formatted_message = f"{datetime_str} [{record.levelname}] {record.getMessage()}{right_info}"
if record.exc_info:
exc_text = self.formatException(record.exc_info)
@@ -150,7 +153,7 @@ class ColoredFormatter(logging.Formatter):
# 配置日志处理器
def configure_logger(loglevel=None):
def configure_logger(loglevel=None, working_dir=None):
"""配置日志记录器
Args:
@@ -159,8 +162,9 @@ def configure_logger(loglevel=None):
"""
# 获取根日志记录器
root_logger = logging.getLogger()
root_logger.setLevel(TRACE_LEVEL)
# 设置日志级别
numeric_level = logging.DEBUG
if loglevel is not None:
if isinstance(loglevel, str):
# 将字符串转换为logging级别
@@ -170,12 +174,8 @@ def configure_logger(loglevel=None):
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
print(f"警告: 无效的日志级别 '{loglevel}',使用默认级别 DEBUG")
numeric_level = logging.DEBUG
else:
numeric_level = loglevel
root_logger.setLevel(numeric_level)
else:
root_logger.setLevel(logging.DEBUG) # 默认级别
# 移除已存在的处理器
for handler in root_logger.handlers[:]:
@@ -183,7 +183,7 @@ def configure_logger(loglevel=None):
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(root_logger.level) # 使用与根记录器相同的级别
console_handler.setLevel(numeric_level) # 使用与根记录器相同的级别
# 使用自定义的颜色格式化器
color_formatter = ColoredFormatter()
@@ -204,6 +204,28 @@ def configure_logger(loglevel=None):
logging.getLogger('websockets.client').setLevel(logging.WARNING)
logging.getLogger('websockets.server').setLevel(logging.WARNING)
# 如果指定了工作目录,添加文件处理器
if working_dir is not None:
logs_dir = os.path.join(working_dir, "logs")
os.makedirs(logs_dir, exist_ok=True)
# 生成日志文件名:日期 时间.log
log_filename = datetime.now().strftime("%Y-%m-%d %H-%M-%S") + ".log"
log_filepath = os.path.join(logs_dir, log_filename)
# 创建文件处理器
file_handler = logging.FileHandler(log_filepath, encoding="utf-8")
file_handler.setLevel(TRACE_LEVEL)
# 使用不带颜色的格式化器
file_formatter = ColoredFormatter(use_colors=False)
file_handler.setFormatter(file_formatter)
root_logger.addHandler(file_handler)
logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
# 配置日志系统
configure_logger()