diff --git a/.gitignore b/.gitignore index 9be0291..2c50f5b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ # Python-related files # ================================ elevator_saga/traffic/test_cases.py +elevator_saga/traffic/test_cases +result.json # Compiled Python files __pycache__/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d3af09d..45f9126 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -75,7 +75,7 @@ repos: hooks: - id: prettier types_or: [yaml, json, markdown] - exclude: "^(build/|dist/|__pycache__/|\\.mypy_cache/|\\.pytest_cache/|htmlcov/|\\.idea/|\\.vscode/|docs/_build/|elevatorpy\\.egg-info/|elevator_saga\\.egg-info/)" + exclude: "^(build/|dist/|__pycache__/|\\.mypy_cache/|\\.pytest_cache/|htmlcov/|\\.idea/|\\.vscode/|docs/_build/|elevatorpy\\.egg-info/|\\.egg-info/)" # Global settings default_stages: [pre-commit, pre-push] diff --git a/docs/index.rst b/docs/index.rst index 7f43ed2..1527b59 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -83,6 +83,7 @@ Contents client communication events + logging .. toctree:: :maxdepth: 1 diff --git a/docs/logging.rst b/docs/logging.rst new file mode 100644 index 0000000..899442b --- /dev/null +++ b/docs/logging.rst @@ -0,0 +1,325 @@ +Logging System +============== + +Overview +-------- + +Elevator Saga uses a unified logging system with colored output and multiple log levels. The logging system provides consistent, filterable output across all components. + +Log Levels +---------- + +The logger supports four log levels with distinct colors: + +* **DEBUG** - Gray/Bright Black - Detailed debugging information +* **INFO** - Cyan - General informational messages +* **WARNING** - Yellow - Warning messages +* **ERROR** - Red - Error messages + +Configuration +------------- + +Environment Variable +~~~~~~~~~~~~~~~~~~~~ + +The default log level is controlled by the ``ELEVATOR_LOG_LEVEL`` environment variable: + +.. code-block:: bash + + # Set log level to DEBUG (default) + export ELEVATOR_LOG_LEVEL=DEBUG + + # Set log level to INFO (less verbose) + export ELEVATOR_LOG_LEVEL=INFO + + # Set log level to WARNING (only warnings and errors) + export ELEVATOR_LOG_LEVEL=WARNING + + # Set log level to ERROR (only errors) + export ELEVATOR_LOG_LEVEL=ERROR + +If not set, the default is **DEBUG** mode. + +Programmatic Control +~~~~~~~~~~~~~~~~~~~~ + +You can also control the log level programmatically: + +.. code-block:: python + + from elevator_saga.utils.logger import LogLevel, set_log_level + + # Set to INFO level + set_log_level(LogLevel.INFO) + + # Set to DEBUG level + set_log_level(LogLevel.DEBUG) + +Basic Usage +----------- + +Simple Logging +~~~~~~~~~~~~~~ + +.. code-block:: python + + from elevator_saga.utils.logger import debug, info, warning, error + + # Simple messages + info("Server started successfully") + warning("Connection timeout") + error("Failed to load configuration") + debug("Processing tick 42") + +With Prefix +~~~~~~~~~~~ + +Add a prefix to identify the source of the log message: + +.. code-block:: python + + # Server logs + info("Client registered", prefix="SERVER") + debug("Algorithm client processed tick 42", prefix="SERVER") + + # Client logs + info("API Client initialized", prefix="CLIENT") + warning("Command ignored", prefix="CLIENT") + + # Controller logs + info("启动 MyController 算法", prefix="CONTROLLER") + error("模拟运行错误", prefix="CONTROLLER") + +Advanced Usage +-------------- + +Custom Logger +~~~~~~~~~~~~~ + +Create a custom logger instance with specific settings: + +.. code-block:: python + + from elevator_saga.utils.logger import get_logger, LogLevel + + # Get a custom logger + logger = get_logger("MyComponent", min_level=LogLevel.WARNING) + logger.info("This will not appear (level too low)") + logger.warning("This will appear") + logger.error("This will appear") + +Color Output +~~~~~~~~~~~~ + +The logger automatically detects if output is to a TTY (terminal) and enables colors. When redirecting to files or pipes, colors are automatically disabled for clean output. + +Log Format +---------- + +All log messages follow a consistent format:: + + LEVEL [PREFIX] message + +Examples: + +.. code-block:: text + + DEBUG [SERVER] Algorithm client registered: abc-123 + INFO [SERVER] Loading traffic from test_case_01.json + WARNING [SERVER] GUI client: timeout waiting for tick 42 + ERROR [CLIENT] Reset failed: Connection refused + INFO [CONTROLLER] 启动 MyController 算法 + +Component Prefixes +------------------ + +Standard prefixes used throughout the system: + +* **SERVER** - Simulator server logs +* **CLIENT** - API client logs +* **CONTROLLER** - Controller/algorithm logs + +You can use any prefix that makes sense for your component. + +API Reference +------------- + +Functions +~~~~~~~~~ + +.. py:function:: debug(message: str, prefix: Optional[str] = None) -> None + + Log a DEBUG level message. + + :param message: The message to log + :param prefix: Optional prefix to identify the source + +.. py:function:: info(message: str, prefix: Optional[str] = None) -> None + + Log an INFO level message. + + :param message: The message to log + :param prefix: Optional prefix to identify the source + +.. py:function:: warning(message: str, prefix: Optional[str] = None) -> None + + Log a WARNING level message. + + :param message: The message to log + :param prefix: Optional prefix to identify the source + +.. py:function:: error(message: str, prefix: Optional[str] = None) -> None + + Log an ERROR level message. + + :param message: The message to log + :param prefix: Optional prefix to identify the source + +.. py:function:: set_log_level(level: LogLevel) -> None + + Set the global log level. + + :param level: The minimum log level to display + +.. py:function:: get_logger(name: str = "ElevatorSaga", min_level: Optional[LogLevel] = None) -> Logger + + Get or create the global logger instance. + + :param name: Name of the logger + :param min_level: Minimum log level (defaults to ELEVATOR_LOG_LEVEL or DEBUG) + :return: Logger instance + +Classes +~~~~~~~ + +.. py:class:: LogLevel + + Enumeration of available log levels. + + .. py:attribute:: DEBUG + :value: 0 + + Debug level - most verbose + + .. py:attribute:: INFO + :value: 1 + + Info level - general information + + .. py:attribute:: WARNING + :value: 2 + + Warning level - warnings only + + .. py:attribute:: ERROR + :value: 3 + + Error level - errors only + + .. py:method:: from_string(level_str: str) -> LogLevel + :classmethod: + + Convert a string to a LogLevel. + + :param level_str: String representation (case-insensitive) + :return: Corresponding LogLevel (defaults to DEBUG if invalid) + +.. py:class:: Logger + + The main logger class. + + .. py:method:: __init__(name: str = "ElevatorSaga", min_level: LogLevel = LogLevel.INFO, use_color: bool = True) + + Initialize a logger instance. + + :param name: Logger name + :param min_level: Minimum level to log + :param use_color: Whether to use colored output + + .. py:method:: debug(message: str, prefix: Optional[str] = None) -> None + + Log a DEBUG message. + + .. py:method:: info(message: str, prefix: Optional[str] = None) -> None + + Log an INFO message. + + .. py:method:: warning(message: str, prefix: Optional[str] = None) -> None + + Log a WARNING message. + + .. py:method:: error(message: str, prefix: Optional[str] = None) -> None + + Log an ERROR message. + + .. py:method:: set_level(level: LogLevel) -> None + + Change the minimum log level. + +Best Practices +-------------- + +1. **Use appropriate levels**: + + * DEBUG for detailed state changes and internal operations + * INFO for significant events (startup, completion, etc.) + * WARNING for unexpected but recoverable situations + * ERROR for failures and exceptions + +2. **Use prefixes consistently**: + + * Always use the same prefix for the same component + * Use uppercase for standard prefixes (SERVER, CLIENT, CONTROLLER) + +3. **Keep messages concise**: + + * One log message per event + * Include relevant context (IDs, values, etc.) + * Avoid multi-line messages + +4. **Set appropriate default level**: + + * Use DEBUG for development + * Use INFO for production + * Use WARNING for minimal logging + +5. **Avoid logging in tight loops**: + + * Excessive logging can impact performance + * Consider conditional logging or sampling + +Examples +-------- + +Server Startup +~~~~~~~~~~~~~~ + +.. code-block:: python + + from elevator_saga.utils.logger import info, debug + + info("Elevator simulation server (Async) running on http://127.0.0.1:8000", prefix="SERVER") + info("Using Quart (async Flask) for better concurrency", prefix="SERVER") + debug("Found 5 traffic files: ['test01.json', 'test02.json', ...]", prefix="SERVER") + +Client Operations +~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + from elevator_saga.utils.logger import info, warning, error + + info("Client registered successfully with ID: xyz-789", prefix="CLIENT") + warning("Client type 'gui' cannot send control commands", prefix="CLIENT") + error("Reset failed: Connection refused", prefix="CLIENT") + +Controller Logic +~~~~~~~~~~~~~~~~ + +.. code-block:: python + + from elevator_saga.utils.logger import info, debug + + info("启动 MyController 算法", prefix="CONTROLLER") + debug("Updated traffic info - max_tick: 1000", prefix="CONTROLLER") + info("停止 MyController 算法", prefix="CONTROLLER") diff --git a/elevator_saga/client/api_client.py b/elevator_saga/client/api_client.py index d75a4ea..65ccb22 100644 --- a/elevator_saga/client/api_client.py +++ b/elevator_saga/client/api_client.py @@ -4,6 +4,7 @@ Unified API Client for Elevator Saga 使用统一数据模型的客户端API封装 """ import json +import os import urllib.error import urllib.request from typing import Any, Dict, Optional @@ -18,19 +19,25 @@ from elevator_saga.core.models import ( SimulationState, StepResponse, ) -from elevator_saga.utils.debug import debug_log +from elevator_saga.utils.logger import debug, error, info, warning class ElevatorAPIClient: """统一的电梯API客户端""" - def __init__(self, base_url: str): + def __init__(self, base_url: str, client_type: str = "algorithm"): self.base_url = base_url.rstrip("/") + # 客户端身份相关 + self.client_type = client_type + self.client_id: Optional[str] = None # 缓存相关字段 self._cached_state: Optional[SimulationState] = None self._cached_tick: int = -1 self._tick_processed: bool = False # 标记当前tick是否已处理完成 - debug_log(f"API Client initialized for {self.base_url}") + debug(f"API Client initialized for {self.base_url} with type {self.client_type}", prefix="CLIENT") + + # 尝试自动注册 + self._auto_register() def get_state(self, force_reload: bool = False) -> SimulationState: """获取模拟状态 @@ -92,7 +99,16 @@ class ElevatorAPIClient: def step(self, ticks: int = 1) -> StepResponse: """执行步进""" - response_data = self._send_post_request("/api/step", {"ticks": ticks}) + # 携带当前tick信息,用于优先级队列控制 + # 如果没有缓存的state,先获取一次 + if self._cached_state is None: + self.get_state(force_reload=True) + + request_data = {"ticks": ticks} + if self._cached_state is not None: + request_data["current_tick"] = self._cached_state.tick + + response_data = self._send_post_request("/api/step", request_data) if "error" not in response_data: # 使用服务端返回的真实数据 @@ -108,7 +124,7 @@ class ElevatorAPIClient: event_dict["type"] = EventType(event_dict["type"]) except ValueError: - debug_log(f"Unknown event type: {event_dict['type']}") + warning(f"Unknown event type: {event_dict['type']}", prefix="CLIENT") continue events.append(SimulationEvent.from_dict(event_dict)) @@ -118,6 +134,10 @@ class ElevatorAPIClient: events=events, ) + # 更新缓存的tick(保持其他状态不变,只更新tick) + if self._cached_state is not None: + self._cached_state.tick = step_response.tick + # debug_log(f"Step response: tick={step_response.tick}, events={len(events)}") return step_response else: @@ -125,9 +145,20 @@ class ElevatorAPIClient: def send_elevator_command(self, command: GoToFloorCommand) -> bool: """发送电梯命令""" + # 客户端拦截:检查是否有权限发送控制命令 + if not self._can_send_command(): + warning( + f"Client type '{self.client_type}' cannot send control commands. " + f"Command ignored: {command.command_type} elevator {command.elevator_id} to floor {command.floor}", + prefix="CLIENT", + ) + # 不抛出错误,直接返回True(但实际未执行) + return True + endpoint = self._get_elevator_endpoint(command) - debug_log( - f"Sending elevator command: {command.command_type} to elevator {command.elevator_id} To:F{command.floor}" + debug( + f"Sending elevator command: {command.command_type} to elevator {command.elevator_id} To:F{command.floor}", + prefix="CLIENT", ) response_data = self._send_post_request(endpoint, command.parameters) @@ -145,7 +176,7 @@ class ElevatorAPIClient: response = self.send_elevator_command(command) return response except Exception as e: - debug_log(f"Go to floor failed: {e}") + error(f"Go to floor failed: {e}", prefix="CLIENT") return False def _get_elevator_endpoint(self, command: GoToFloorCommand) -> str: @@ -155,13 +186,69 @@ class ElevatorAPIClient: if isinstance(command, GoToFloorCommand): return f"{base}/go_to_floor" + def _auto_register(self) -> None: + """自动注册客户端""" + try: + # 从环境变量读取客户端类型(如果有的话) + env_client_type = os.environ.get("ELEVATOR_CLIENT_TYPE") + if env_client_type: + self.client_type = env_client_type + debug(f"Client type from environment: {self.client_type}", prefix="CLIENT") + + # 直接发送注册请求(不使用_send_post_request以避免循环依赖) + url = f"{self.base_url}/api/client/register" + request_body = json.dumps({}).encode("utf-8") + headers = {"Content-Type": "application/json", "X-Client-Type": self.client_type} + req = urllib.request.Request(url, data=request_body, headers=headers) + + with urllib.request.urlopen(req, timeout=60) as response: + response_data = json.loads(response.read().decode("utf-8")) + if response_data.get("success"): + self.client_id = response_data.get("client_id") + info(f"Client registered successfully with ID: {self.client_id}", prefix="CLIENT") + else: + warning(f"Client registration failed: {response_data.get('error')}", prefix="CLIENT") + except Exception as e: + error(f"Auto registration failed: {e}", prefix="CLIENT") + + def _can_send_command(self) -> bool: + """检查客户端是否可以发送控制命令 + + Returns: + True: 如果是算法客户端或未注册客户端 + False: 如果是GUI客户端 + """ + # 算法客户端可以发送命令 + if self.client_type.lower() == "algorithm": + return True + # 未注册的客户端也可以发送命令(向后兼容) + if self.client_id is None: + return True + # GUI客户端不能发送命令 + if self.client_type.lower() == "gui": + return False + # 其他未知类型,默认允许(向后兼容) + return True + + def _get_request_headers(self) -> Dict[str, str]: + """获取请求头,包含客户端身份信息""" + headers = {"Content-Type": "application/json"} + if self.client_id: + headers["X-Client-ID"] = self.client_id + headers["X-Client-Type"] = self.client_type + return headers + def _send_get_request(self, endpoint: str) -> Dict[str, Any]: """发送GET请求""" url = f"{self.base_url}{endpoint}" + # todo: 全部更改为post # debug_log(f"GET {url}") try: - with urllib.request.urlopen(url, timeout=60) as response: + headers = self._get_request_headers() + # 对于GET请求,只添加客户端标识头 + req = urllib.request.Request(url, headers={k: v for k, v in headers.items() if k != "Content-Type"}) + with urllib.request.urlopen(req, timeout=60) as response: data: Dict[str, Any] = json.loads(response.read().decode("utf-8")) # debug_log(f"GET {url} -> {response.status}") return data @@ -169,7 +256,7 @@ class ElevatorAPIClient: raise RuntimeError(f"GET {url} failed: {e}") def reset(self) -> bool: - """重置模拟""" + """重置模拟并重新注册客户端""" try: response_data = self._send_post_request("/api/reset", {}) success = bool(response_data.get("success", False)) @@ -178,10 +265,14 @@ class ElevatorAPIClient: self._cached_state = None self._cached_tick = -1 self._tick_processed = False - debug_log("Cache cleared after reset") + debug("Cache cleared after reset", prefix="CLIENT") + + # 重新注册客户端(因为服务器已清除客户端记录) + self._auto_register() + debug("Client re-registered after reset", prefix="CLIENT") return success except Exception as e: - debug_log(f"Reset failed: {e}") + error(f"Reset failed: {e}", prefix="CLIENT") return False def next_traffic_round(self, full_reset: bool = False) -> bool: @@ -194,10 +285,10 @@ class ElevatorAPIClient: self._cached_state = None self._cached_tick = -1 self._tick_processed = False - debug_log("Cache cleared after traffic round switch") + debug("Cache cleared after traffic round switch", prefix="CLIENT") return success except Exception as e: - debug_log(f"Next traffic round failed: {e}") + error(f"Next traffic round failed: {e}", prefix="CLIENT") return False def get_traffic_info(self) -> Optional[Dict[str, Any]]: @@ -207,10 +298,10 @@ class ElevatorAPIClient: if "error" not in response_data: return response_data else: - debug_log(f"Get traffic info failed: {response_data.get('error')}") + warning(f"Get traffic info failed: {response_data.get('error')}", prefix="CLIENT") return None except Exception as e: - debug_log(f"Get traffic info failed: {e}") + error(f"Get traffic info failed: {e}", prefix="CLIENT") return None def _send_post_request(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]: @@ -220,7 +311,8 @@ class ElevatorAPIClient: # debug_log(f"POST {url} with data: {data}") - req = urllib.request.Request(url, data=request_body, headers={"Content-Type": "application/json"}) + headers = self._get_request_headers() + req = urllib.request.Request(url, data=request_body, headers=headers) try: with urllib.request.urlopen(req, timeout=600) as response: diff --git a/elevator_saga/client/base_controller.py b/elevator_saga/client/base_controller.py index 4d9f297..a4bad91 100644 --- a/elevator_saga/client/base_controller.py +++ b/elevator_saga/client/base_controller.py @@ -14,7 +14,7 @@ from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPa from elevator_saga.core.models import EventType, SimulationEvent, SimulationState # 避免循环导入,使用运行时导入 -from elevator_saga.utils.debug import debug_log +from elevator_saga.utils.logger import debug, error, info, warning class ElevatorController(ABC): @@ -24,13 +24,14 @@ class ElevatorController(ABC): 用户通过继承此类并实现 abstract 方法来创建自己的调度算法 """ - def __init__(self, server_url: str = "http://127.0.0.1:8000", debug: bool = False): + def __init__(self, server_url: str = "http://127.0.0.1:8000", debug: bool = False, client_type: str = "algorithm"): """ 初始化控制器 Args: server_url: 服务器URL debug: 是否启用debug模式 + client_type: 客户端类型 ("algorithm" 或 "gui") """ self.server_url = server_url self.debug = debug @@ -39,9 +40,10 @@ class ElevatorController(ABC): self.current_tick = 0 self.is_running = False self.current_traffic_max_tick: int = 0 + self.client_type = client_type - # 初始化API客户端 - self.api_client = ElevatorAPIClient(server_url) + # 初始化API客户端,传递客户端类型 + self.api_client = ElevatorAPIClient(server_url, client_type=client_type) @abstractmethod def on_init(self, elevators: List[Any], floors: List[Any]) -> None: @@ -84,13 +86,13 @@ class ElevatorController(ABC): """ 算法启动前的回调 - 可选实现 """ - print(f"启动 {self.__class__.__name__} 算法") + info(f"启动 {self.__class__.__name__} 算法", prefix="CONTROLLER") def on_stop(self) -> None: """ 算法停止后的回调 - 可选实现 """ - print(f"停止 {self.__class__.__name__} 算法") + info(f"停止 {self.__class__.__name__} 算法", prefix="CONTROLLER") @abstractmethod def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None: @@ -206,9 +208,9 @@ class ElevatorController(ABC): try: self._run_event_driven_simulation() except KeyboardInterrupt: - print("\n用户中断了算法运行") + info("用户中断了算法运行", prefix="CONTROLLER") except Exception as e: - print(f"算法运行出错: {e}") + error(f"算法运行出错: {e}", prefix="CONTROLLER") raise finally: self.is_running = False @@ -217,7 +219,7 @@ class ElevatorController(ABC): def stop(self) -> None: """停止控制器""" self.is_running = False - print(f"停止 {self.__class__.__name__}") + info(f"停止 {self.__class__.__name__}", prefix="CONTROLLER") def on_simulation_complete(self, final_state: Dict[str, Any]) -> None: """ @@ -235,10 +237,10 @@ class ElevatorController(ABC): try: state = self.api_client.get_state() except ConnectionResetError as _: # noqa: F841 - print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}") + error(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}", prefix="CONTROLLER") os._exit(1) if state.tick > 0: - print("模拟器可能已经开始了一次模拟,执行重置...") + warning("模拟器可能已经开始了一次模拟,执行重置...", prefix="CONTROLLER") self.api_client.reset() time.sleep(0.3) return self._run_event_driven_simulation() @@ -247,7 +249,7 @@ class ElevatorController(ABC): # 获取当前流量文件的最大tick数 self._update_traffic_info() if self.current_traffic_max_tick == 0: - print("模拟器接收到的最大tick时间为0,可能所有的测试案例已用完,请求重置...") + warning("模拟器接收到的最大tick时间为0,可能所有的测试案例已用完,请求重置...", prefix="CONTROLLER") self.api_client.next_traffic_round(full_reset=True) time.sleep(0.3) return self._run_event_driven_simulation() @@ -297,7 +299,7 @@ class ElevatorController(ABC): self._reset_and_reinit() except Exception as e: - print(f"模拟运行错误: {e}") + error(f"模拟运行错误: {e}", prefix="CONTROLLER") raise def _update_wrappers(self, state: SimulationState, init: bool = False) -> None: @@ -321,12 +323,12 @@ class ElevatorController(ABC): traffic_info = self.api_client.get_traffic_info() if traffic_info: self.current_traffic_max_tick = int(traffic_info["max_tick"]) - debug_log(f"Updated traffic info - max_tick: {self.current_traffic_max_tick}") + debug(f"Updated traffic info - max_tick: {self.current_traffic_max_tick}", prefix="CONTROLLER") else: - debug_log("Failed to get traffic info") + warning("Failed to get traffic info", prefix="CONTROLLER") self.current_traffic_max_tick = 0 except Exception as e: - debug_log(f"Error updating traffic info: {e}") + error(f"Error updating traffic info: {e}", prefix="CONTROLLER") self.current_traffic_max_tick = 0 def _handle_single_event(self, event: SimulationEvent) -> None: @@ -430,5 +432,5 @@ class ElevatorController(ABC): self._internal_init(self.elevators, self.floors) except Exception as e: - debug_log(f"重置失败: {e}") + error(f"重置失败: {e}", prefix="CONTROLLER") raise diff --git a/elevator_saga/server/simulator.py b/elevator_saga/server/simulator.py index 6f42487..bb46f75 100644 --- a/elevator_saga/server/simulator.py +++ b/elevator_saga/server/simulator.py @@ -2,17 +2,20 @@ """ Elevator simulation server - tick-based discrete event simulation Provides HTTP API for controlling elevators and advancing simulation time +使用Quart异步框架提供更高的并发性能 """ import argparse +import asyncio import json import os.path import threading +import uuid from dataclasses import dataclass from enum import Enum from pathlib import Path -from typing import Any, Dict, List, cast +from typing import Any, Dict, List, Optional, cast -from flask import Flask, Response, request +from quart import Quart, Response, request from elevator_saga.core.models import ( Direction, @@ -29,21 +32,236 @@ from elevator_saga.core.models import ( TrafficEntry, create_empty_simulation_state, ) - -# Global debug flag for server -_SERVER_DEBUG_MODE = False +from elevator_saga.utils.logger import LogLevel, debug, error, info, set_log_level, warning -def set_server_debug_mode(enabled: bool) -> None: - """Enable or disable server debug logging""" - global _SERVER_DEBUG_MODE - globals()["_SERVER_DEBUG_MODE"] = enabled +class ClientType(Enum): + """客户端类型""" + + ALGORITHM = "algorithm" + GUI = "gui" + UNKNOWN = "unknown" -def server_debug_log(message: str) -> None: - """Print server debug message if debug mode is enabled""" - if _SERVER_DEBUG_MODE: - print(f"[SERVER-DEBUG] {message}", flush=True) +@dataclass +class ClientInfo: + """客户端信息""" + + client_id: str + client_type: ClientType + registered_tick: int + + +class ClientManager: + """客户端管理器 - 管理多个客户端的连接和身份""" + + def __init__(self) -> None: + self.clients: Dict[str, ClientInfo] = {} + self.algorithm_client_id: Optional[str] = None + self.gui_client_id: Optional[str] = None + self.lock = threading.Lock() + # Step请求控制(使用轮询检查,不需要事件对象) + self.current_tick_processed: Dict[int, bool] = {} # tick -> 是否已被算法客户端处理 + self.tick_lock = threading.Lock() + # 事件缓存:记录算法客户端产生的events,供GUI获取 + self.tick_events: Dict[int, List[Any]] = {} # target_tick -> events + self.events_lock = threading.Lock() # Size May Change When Iter + # 严格同步:记录GUI已确认的最后tick,确保不丢失消息 + self.gui_acknowledged_tick: int = -1 # GUI已读取到的最后一个tick + self.algorithm_current_tick: int = -1 # 算法当前的tick(执行step前) + + def register_client(self, client_type_str: str, current_tick: int) -> tuple[str, bool, str]: + """ + 注册客户端 + + Args: + client_type_str: 客户端类型字符串 + current_tick: 当前tick + + Returns: + tuple[client_id, success, message] + """ + with self.lock: + # 解析客户端类型 + try: + if client_type_str.lower() == "algorithm": + client_type = ClientType.ALGORITHM + elif client_type_str.lower() == "gui": + client_type = ClientType.GUI + else: + client_type = ClientType.UNKNOWN + except (AttributeError, ValueError): + client_type = ClientType.UNKNOWN + + # 检查是否已经有相同类型的客户端 + if client_type == ClientType.ALGORITHM and self.algorithm_client_id is not None: + return "", False, "Algorithm client already registered" + elif client_type == ClientType.GUI and self.gui_client_id is not None: + return "", False, "GUI client already registered" + + # 生成新的客户端ID + client_id = str(uuid.uuid4()) + client_info = ClientInfo(client_id=client_id, client_type=client_type, registered_tick=current_tick) + + # 注册客户端 + self.clients[client_id] = client_info + if client_type == ClientType.ALGORITHM: + self.algorithm_client_id = client_id + debug(f"Algorithm client registered: {client_id}", prefix="SERVER") + elif client_type == ClientType.GUI: + self.gui_client_id = client_id + debug(f"GUI client registered: {client_id}", prefix="SERVER") + + return client_id, True, f"{client_type.value} client registered successfully" + + def get_client_info(self, client_id: str) -> Optional[ClientInfo]: + """获取客户端信息""" + with self.lock: + return self.clients.get(client_id) + + def is_algorithm_client(self, client_id: Optional[str]) -> bool: + """检查是否是算法客户端""" + if client_id is None: + return False + with self.lock: + client_info = self.clients.get(client_id) + return client_info is not None and client_info.client_type == ClientType.ALGORITHM + + def can_execute_command(self, client_id: Optional[str]) -> bool: + """检查客户端是否可以执行控制命令""" + return self.is_algorithm_client(client_id) + + async def wait_for_algorithm_step(self, client_id: Optional[str], target_tick: int, timeout: float = 30.0) -> bool: + """ + GUI客户端等待算法客户端处理完指定tick的step请求 + 使用asyncio异步等待,真正的非阻塞协程 + + 如果没有算法客户端,GUI会持续等待直到算法客户端注册并处理 + + Args: + client_id: 客户端ID + target_tick: 目标tick + timeout: 超时时间(秒) + + Returns: + True: 可以继续, False: 超时或其他原因 + """ + # 如果是算法客户端,直接返回True + if self.is_algorithm_client(client_id): + with self.tick_lock: + self.current_tick_processed[target_tick] = True + debug(f"Algorithm client processed tick {target_tick}", prefix="SERVER") + return True + + # GUI客户端需要等待 - 使用异步协程 + # 如果没有算法客户端,先等待算法客户端注册 + if self.algorithm_client_id is None: + debug("GUI client waiting for algorithm client to register...", prefix="SERVER") + + start_time = asyncio.get_event_loop().time() + check_interval = 0.1 # 每100ms检查一次 + + # 阶段1:等待算法客户端注册 + while self.algorithm_client_id is None: + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed >= timeout: + warning("GUI client: timeout waiting for algorithm client to register", prefix="SERVER") + return False + + await asyncio.sleep(check_interval) + + # 动态调整检查间隔 + if elapsed > 5 and check_interval < 0.5: + check_interval = 0.5 + + debug(f"GUI client: algorithm client registered, now waiting for tick {target_tick}", prefix="SERVER") + + # 阶段2:等待算法客户端处理指定tick + while True: + # 检查是否已处理 + with self.tick_lock: + if self.current_tick_processed.get(target_tick, False): + debug(f"GUI client: tick {target_tick} ready, proceeding", prefix="SERVER") + return True + + # 检查是否超时 + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed >= timeout: + warning(f"GUI client: timeout waiting for tick {target_tick}", prefix="SERVER") + return False + + # 异步休眠,真正的协程切换 + # 不阻塞事件循环,其他协程可以运行 + await asyncio.sleep(check_interval) + + # 动态调整检查间隔(可选优化) + # 前几秒检查更频繁,之后降低频率 + if elapsed > 5 and check_interval < 0.5: + check_interval = 0.5 # 5秒后降低到500ms检查一次 + + def store_tick_events(self, target_tick: int, events: List[Any]) -> None: + """存储指定tick的events""" + with self.events_lock: + self.tick_events[target_tick] = events + debug(f"Stored {len(events)} events for tick {target_tick}", prefix="SERVER") + + def get_tick_events(self, target_tick: int) -> List[Any]: + """获取指定tick的events""" + with self.events_lock: + events = self.tick_events.get(target_tick, []) + debug(f"Retrieved {len(events)} events for tick {target_tick}", prefix="SERVER") + return events + + async def wait_for_gui_acknowledgment(self, target_tick: int, timeout: float = 30.0) -> bool: + """ + 算法客户端等待GUI确认已读取上一次step的结果 + 确保GUI不会错过任何tick的消息 + + Args: + target_tick: 算法刚刚执行完step后的tick(上一次step的结果) + timeout: 超时时间 + + Returns: + True: GUI已确认, False: 超时或没有GUI客户端 + """ + # 如果没有GUI客户端,不需要等待 + if self.gui_client_id is None: + return True + # 如果是第一个tick(target_tick=1),不需要等待(GUI还没开始) + if target_tick <= 1: + return True + debug(f"Algorithm waiting for GUI to acknowledge tick {target_tick - 1}", prefix="SERVER") + start_time = asyncio.get_event_loop().time() + while True: + # 检查GUI是否已读取到上一个tick的结果 + if self.gui_acknowledged_tick >= target_tick - 1: + debug(f"GUI acknowledged tick {target_tick - 1}, algorithm can proceed", prefix="SERVER") + return True + # 检查超时 + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed >= timeout: + warning(f"Timeout waiting for GUI acknowledgment of tick {target_tick - 1}", prefix="SERVER") + return False + await asyncio.sleep(0.01) # 10ms检查一次 + + def acknowledge_gui_read(self, tick: int) -> None: + """GUI确认已读取指定tick""" + self.gui_acknowledged_tick = max(self.gui_acknowledged_tick, tick) + debug(f"GUI acknowledged tick {tick}", prefix="SERVER") + + def reset(self) -> None: + """重置客户端管理器""" + with self.lock: + self.clients.clear() + self.algorithm_client_id = None + self.gui_client_id = None + with self.tick_lock: + self.current_tick_processed.clear() + with self.events_lock: + self.tick_events.clear() + self.gui_acknowledged_tick = -1 + self.algorithm_current_tick = -1 + debug("Client manager reset", prefix="SERVER") class CustomJSONEncoder(json.JSONEncoder): @@ -124,6 +342,8 @@ class ElevatorSimulation: self.current_traffic_index = 0 self.traffic_files: List[Path] = [] self.state: SimulationState = create_empty_simulation_state(2, 1, 1) + self.all_traffic_results: List[Dict[str, Any]] = [] # 存储所有traffic文件的结果 + self.start_dir = Path.cwd() # 记录启动目录 self._load_traffic_files() @property @@ -154,7 +374,7 @@ class ElevatorSimulation: self.traffic_files.append(file_path) # 按文件名排序 self.traffic_files.sort() - server_debug_log(f"Found {len(self.traffic_files)} traffic files: {[f.name for f in self.traffic_files]}") + debug(f"Found {len(self.traffic_files)} traffic files: {[f.name for f in self.traffic_files]}", prefix="SERVER") # 如果有文件,加载第一个 if self.traffic_files: self.load_current_traffic() @@ -162,20 +382,20 @@ class ElevatorSimulation: def load_current_traffic(self) -> None: """加载当前索引对应的流量文件""" if not self.traffic_files: - server_debug_log("No traffic files available") + warning("No traffic files available", prefix="SERVER") return if self.current_traffic_index >= len(self.traffic_files): - server_debug_log(f"Traffic index {self.current_traffic_index} out of range") + warning(f"Traffic index {self.current_traffic_index} out of range", prefix="SERVER") return traffic_file = self.traffic_files[self.current_traffic_index] - server_debug_log(f"Loading traffic from {traffic_file.name}") + info(f"Loading traffic from {traffic_file.name}", prefix="SERVER") try: with open(traffic_file, "r", encoding="utf-8") as f: file_data = json.load(f) building_config = file_data["building"] - server_debug_log(f"Building config: {building_config}") + debug(f"Building config: {building_config}", prefix="SERVER") self.state = create_empty_simulation_state( building_config["elevators"], building_config["floors"], building_config["elevator_capacity"] ) @@ -186,7 +406,7 @@ class ElevatorSimulation: for i, elevator in enumerate(self.state.elevators): if i < len(elevator_energy_rates): elevator.energy_rate = elevator_energy_rates[i] - server_debug_log(f"电梯 E{elevator.id} 能耗率设置为: {elevator.energy_rate}") + debug(f"电梯 E{elevator.id} 能耗率设置为: {elevator.energy_rate}", prefix="SERVER") self.max_duration_ticks = building_config["duration"] traffic_data: list[Dict[str, Any]] = file_data["traffic"] @@ -202,16 +422,108 @@ class ElevatorSimulation: self.next_passenger_id += 1 except Exception as e: - server_debug_log(f"Error loading traffic file {traffic_file}: {e}") + error(f"Error loading traffic file {traffic_file}: {e}", prefix="SERVER") + + def save_current_traffic_result(self) -> None: + """保存当前traffic文件的结果""" + if not self.traffic_files or self.current_traffic_index >= len(self.traffic_files): + return + + traffic_file = self.traffic_files[self.current_traffic_index] + metrics = self._calculate_metrics() + + result = { + "traffic_file": traffic_file.name, + "traffic_index": self.current_traffic_index, + "final_tick": self.tick, + "max_duration_ticks": self.max_duration_ticks, + "metrics": metrics.to_dict(), + } + + self.all_traffic_results.append(result) + info( + f"Saved result for {traffic_file.name}: {metrics.completed_passengers}/{metrics.total_passengers} passengers completed", + prefix="SERVER", + ) + + def save_final_results(self) -> None: + """保存所有结果到result.json""" + result_file = self.start_dir / "result.json" + + # 计算总体统计 + total_completed = sum(r["metrics"]["completed_passengers"] for r in self.all_traffic_results) + total_passengers = sum(r["metrics"]["total_passengers"] for r in self.all_traffic_results) + total_energy = sum(r["metrics"]["total_energy_consumption"] for r in self.all_traffic_results) + + # 计算平均等待时间(只统计有完成乘客的情况) + all_avg_floor_wait = [ + r["metrics"]["average_floor_wait_time"] + for r in self.all_traffic_results + if r["metrics"]["completed_passengers"] > 0 + ] + all_avg_arrival_wait = [ + r["metrics"]["average_arrival_wait_time"] + for r in self.all_traffic_results + if r["metrics"]["completed_passengers"] > 0 + ] + all_p95_floor_wait = [ + r["metrics"]["p95_floor_wait_time"] + for r in self.all_traffic_results + if r["metrics"]["completed_passengers"] > 0 + ] + all_p95_arrival_wait = [ + r["metrics"]["p95_arrival_wait_time"] + for r in self.all_traffic_results + if r["metrics"]["completed_passengers"] > 0 + ] + + completion_rate = total_completed / total_passengers if total_passengers > 0 else 0 + final_result = { + "total_traffic_files": len(self.all_traffic_results), + "summary": { + "total_completed_passengers": total_completed, + "total_passengers": total_passengers, + "completion_rate": completion_rate, + "total_energy_consumption": total_energy, + "average_floor_wait_time": ( + sum(all_avg_floor_wait) / len(all_avg_floor_wait) if all_avg_floor_wait else 0 + ), + "average_arrival_wait_time": ( + sum(all_avg_arrival_wait) / len(all_avg_arrival_wait) if all_avg_arrival_wait else 0 + ), + "p95_floor_wait_time": sum(all_p95_floor_wait) / len(all_p95_floor_wait) if all_p95_floor_wait else 0, + "p95_arrival_wait_time": ( + sum(all_p95_arrival_wait) / len(all_p95_arrival_wait) if all_p95_arrival_wait else 0 + ), + }, + "individual_results": self.all_traffic_results, + } + + with open(result_file, "w", encoding="utf-8") as f: + json.dump(final_result, f, indent=2, ensure_ascii=False) + + info(f"Final results saved to: {result_file}", prefix="SERVER") + info( + f"Summary: {total_completed}/{total_passengers} passengers completed ({completion_rate:.1%})", + prefix="SERVER", + ) + info(f"Total energy consumption: {total_energy:.2f}", prefix="SERVER") def next_traffic_round(self, full_reset: bool = False) -> bool: """切换到下一个流量文件,返回是否成功切换""" if not self.traffic_files: return False + # 在切换前保存当前traffic文件的结果 + if self.current_traffic_index >= 0 and self.current_traffic_index < len(self.traffic_files): + self.save_current_traffic_result() + # 检查是否还有下一个文件 next_index = self.current_traffic_index + 1 if next_index >= len(self.traffic_files): + # 所有任务完成,保存最终结果 + self.save_final_results() + if full_reset: self.current_traffic_index = -1 return self.next_traffic_round() @@ -226,7 +538,7 @@ class ElevatorSimulation: with open(traffic_file, "r") as f: traffic_data = json.load(f) - server_debug_log(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries") + debug(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries", prefix="SERVER") self.traffic_queue: List[TrafficEntry] = [] # type: ignore[reportRedeclaration] for entry in traffic_data: @@ -242,12 +554,12 @@ class ElevatorSimulation: # Sort by arrival time self.traffic_queue.sort(key=lambda p: p.tick) - server_debug_log(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}") + debug(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}", prefix="SERVER") def _emit_event(self, event_type: EventType, data: Dict[str, Any]) -> None: """Emit an event to be sent to clients using unified data models""" self.state.add_event(event_type, data) - server_debug_log(f"Event emitted: {event_type.value} with data {data}") + debug(f"Event emitted: {event_type.value} with data {data}", prefix="SERVER") def step(self, num_ticks: int = 1) -> List[SimulationEvent]: with self.lock: @@ -263,9 +575,9 @@ class ElevatorSimulation: if self.tick >= self.max_duration_ticks: completed_count = self.force_complete_remaining_passengers() if completed_count > 0: - server_debug_log(f"模拟结束,强制完成了 {completed_count} 个乘客") + info(f"模拟结束,强制完成了 {completed_count} 个乘客", prefix="SERVER") - server_debug_log(f"Step completed - Final tick: {self.tick}, Total events: {len(new_events)}") + debug(f"Step completed - Final tick: {self.tick}, Total events: {len(new_events)}", prefix="SERVER") return new_events def _process_tick(self) -> List[SimulationEvent]: @@ -338,9 +650,10 @@ class ElevatorSimulation: elif elevator.run_status == ElevatorStatus.START_UP: # 从启动状态切换到匀速 elevator.run_status = ElevatorStatus.CONSTANT_SPEED - server_debug_log( + debug( f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} " - f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}" + f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}", + prefix="SERVER", ) # START_DOWN状态会在到达目标时在_move_elevators中切换为STOPPED @@ -356,7 +669,7 @@ class ElevatorSimulation: ) assert traffic_entry.origin != traffic_entry.destination, f"乘客{passenger.id}目的地和起始地{traffic_entry.origin}重复" self.passengers[passenger.id] = passenger - server_debug_log(f"乘客 {passenger.id:4}: 创建 | {passenger}") + debug(f"乘客 {passenger.id:4}: 创建 | {passenger}", prefix="SERVER") if passenger.destination > passenger.origin: self.floors[passenger.origin].up_queue.append(passenger.id) self._emit_event(EventType.UP_BUTTON_PRESSED, {"floor": passenger.origin, "passenger": passenger.id}) @@ -491,19 +804,19 @@ class ElevatorSimulation: 说明电梯处于stop状态,这个tick直接采用下一个目的地运行了 """ elevator.position.target_floor = floor - server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}") + debug(f"电梯 E{elevator.id} 被设定为前往 F{floor}", prefix="SERVER") new_target_floor_should_accel = self._should_start_deceleration(elevator) if not new_target_floor_should_accel: if elevator.run_status == ElevatorStatus.START_DOWN: # 不应该加速但是加了 elevator.run_status = ElevatorStatus.CONSTANT_SPEED - server_debug_log(f"电梯 E{elevator.id} 被设定为匀速") + debug(f"电梯 E{elevator.id} 被设定为匀速", prefix="SERVER") elif new_target_floor_should_accel: if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: # 应该减速了,但是之前是匀速 elevator.run_status = ElevatorStatus.START_DOWN - server_debug_log(f"电梯 E{elevator.id} 被设定为减速") + debug(f"电梯 E{elevator.id} 被设定为减速", prefix="SERVER") if elevator.current_floor != floor or elevator.position.floor_up_position != 0: old_status = elevator.run_status.value - server_debug_log(f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value}") + debug(f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value}", prefix="SERVER") def _calculate_distance_to_target(self, elevator: ElevatorState) -> float: """计算到目标楼层的距离(以floor_up_position为单位)""" @@ -542,7 +855,7 @@ class ElevatorSimulation: self._set_elevator_target_floor(elevator, floor) else: elevator.next_target_floor = floor - server_debug_log(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}") + debug(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}", prefix="SERVER") def get_state(self) -> SimulationStateResponse: """Get complete simulation state""" @@ -636,26 +949,59 @@ class ElevatorSimulation: self.traffic_queue: List[TrafficEntry] = [] self.max_duration_ticks = 0 self.next_passenger_id = 1 + self.all_traffic_results.clear() # 清空累积结果 -# Global simulation instance for Flask routes +# Global simulation instance for Quart routes simulation: ElevatorSimulation = ElevatorSimulation("", _init_only=True) -# Create Flask app -app = Flask(__name__) +# Global client manager instance +client_manager = ClientManager() + +# Create Quart app (异步Flask) +app = Quart(__name__) + + +def get_client_id_from_request() -> Optional[str]: + """从请求头中获取客户端ID""" + result = request.headers.get("X-Client-ID") + return result if result else None + + +def get_client_type_from_request() -> str: + """从请求头中获取客户端类型,默认为algorithm""" + result = request.headers.get("X-Client-Type", "algorithm") + return str(result) if result else "algorithm" # Configure CORS @app.after_request def after_request(response: Response) -> Response: response.headers.add("Access-Control-Allow-Origin", "*") - response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization") + response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization,X-Client-ID,X-Client-Type") response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS") return response +@app.route("/api/client/register", methods=["POST"]) +async def register_client() -> Response | tuple[Response, int]: + """客户端注册端点""" + try: + client_type = get_client_type_from_request() + current_tick = simulation.tick + + client_id, success, message = client_manager.register_client(client_type, current_tick) + + if success: + return json_response({"success": True, "client_id": client_id, "message": message}) + else: + return json_response({"success": False, "error": message}, 400) + except Exception as e: + return json_response({"error": str(e)}, 500) + + @app.route("/api/state", methods=["GET"]) -def get_state() -> Response | tuple[Response, int]: +async def get_state() -> Response | tuple[Response, int]: try: state = simulation.get_state() return json_response(state) @@ -664,14 +1010,60 @@ def get_state() -> Response | tuple[Response, int]: @app.route("/api/step", methods=["POST"]) -def step_simulation() -> Response | tuple[Response, int]: +async def step_simulation() -> Response | tuple[Response, int]: try: - data: Dict[str, Any] = request.get_json() or {} + data: Dict[str, Any] = await request.get_json() or {} ticks = data.get("ticks", 1) - # server_debug_log("") - # server_debug_log(f"HTTP /api/step request ----- ticks: {ticks}") - events = simulation.step(ticks) - server_debug_log(f"HTTP /api/step response ----- tick: {simulation.tick}, events: {len(events)}\n") + client_current_tick = data.get("current_tick", None) + + # 获取客户端ID + client_id = get_client_id_from_request() + + # 检查客户端类型 + is_algorithm = client_manager.is_algorithm_client(client_id) + + # 如果提供了current_tick,实现优先级队列 + if client_current_tick is not None: + target_tick = client_current_tick + ticks + + # GUI客户端需要等待算法客户端先处理(异步等待) + can_proceed = await client_manager.wait_for_algorithm_step(client_id, target_tick) + + if not can_proceed: + warning(f"Client {client_id} timeout waiting for tick {target_tick}", prefix="SERVER") + return json_response({"error": "Timeout waiting for algorithm client to process this tick"}, 408) + + # 只有算法客户端才能真正推进模拟 + if is_algorithm: + # 计算target_tick + target_tick = client_current_tick + ticks if client_current_tick is not None else simulation.tick + ticks + + # 算法客户端:等待GUI确认已读取上一次的tick结果(严格同步) + gui_ready = await client_manager.wait_for_gui_acknowledgment(target_tick) + if not gui_ready: + warning("Algorithm timeout waiting for GUI acknowledgment, but continuing", prefix="SERVER") + # 继续执行,不阻塞算法 + + # 真正执行step + events = simulation.step(ticks) + debug(f"Algorithm step: tick {simulation.tick}, events: {len(events)}", prefix="SERVER") + + # 存储events供GUI获取 + if client_current_tick is not None: + client_manager.store_tick_events(target_tick, events) + else: + # GUI客户端:不推进模拟,但可以获取算法产生的events + if client_current_tick is not None: + target_tick = client_current_tick + ticks + events = client_manager.get_tick_events(target_tick) + debug(f"GUI step (retrieved): tick {simulation.tick}, events: {len(events)}", prefix="SERVER") + + # GUI确认已读取这个tick + client_manager.acknowledge_gui_read(target_tick) + else: + events = [] + debug(f"GUI step (no tick): tick {simulation.tick}", prefix="SERVER") + return json_response( { "tick": simulation.tick, @@ -683,18 +1075,37 @@ def step_simulation() -> Response | tuple[Response, int]: @app.route("/api/reset", methods=["POST"]) -def reset_simulation() -> Response | tuple[Response, int]: +async def reset_simulation() -> Response | tuple[Response, int]: try: simulation.reset() + client_manager.reset() # 同时重置客户端管理器 + info("Simulation and client manager reset", prefix="SERVER") return json_response({"success": True}) except Exception as e: return json_response({"error": str(e)}, 500) @app.route("/api/elevators//go_to_floor", methods=["POST"]) -def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]: +async def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]: try: - data: Dict[str, Any] = request.get_json() or {} + # 获取客户端ID + client_id = get_client_id_from_request() + + # 检查客户端是否有权限执行控制命令 + if not client_manager.can_execute_command(client_id): + client_type = "unknown" + if client_id: + client_info = client_manager.get_client_info(client_id) + if client_info: + client_type = client_info.client_type.value + warning( + f"Client {client_id} (type: {client_type}) attempted to execute command but was denied", prefix="SERVER" + ) + return json_response( + {"success": False, "error": "Only algorithm clients can execute control commands"}, 403 + ) + + data: Dict[str, Any] = await request.get_json() or {} floor = data["floor"] immediate = data.get("immediate", False) simulation.elevator_go_to_floor(elevator_id, floor, immediate) @@ -704,10 +1115,11 @@ def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]: @app.route("/api/traffic/next", methods=["POST"]) -def next_traffic_round() -> Response | tuple[Response, int]: +async def next_traffic_round() -> Response | tuple[Response, int]: """切换到下一个流量文件""" try: - full_reset = request.get_json()["full_reset"] + data = await request.get_json() + full_reset = data["full_reset"] success = simulation.next_traffic_round(full_reset) if success: return json_response({"success": True}) @@ -718,7 +1130,7 @@ def next_traffic_round() -> Response | tuple[Response, int]: @app.route("/api/traffic/info", methods=["GET"]) -def get_traffic_info() -> Response | tuple[Response, int]: +async def get_traffic_info() -> Response | tuple[Response, int]: """获取当前流量文件信息""" try: info = simulation.get_traffic_info() @@ -730,7 +1142,7 @@ def get_traffic_info() -> Response | tuple[Response, int]: def main() -> None: global simulation - parser = argparse.ArgumentParser(description="Elevator Simulation Server") + parser = argparse.ArgumentParser(description="Elevator Simulation Server (Async)") parser.add_argument("--host", default="127.0.0.1", help="Server host") parser.add_argument("--port", type=int, default=8000, help="Server port") parser.add_argument("--debug", default=True, action="store_true", help="Enable debug logging") @@ -739,20 +1151,24 @@ def main() -> None: # Enable debug mode if requested if args.debug: - set_server_debug_mode(True) - server_debug_log("Server debug mode enabled") + set_log_level(LogLevel.DEBUG) + debug("Server debug mode enabled", prefix="SERVER") app.config["DEBUG"] = True # Create simulation with traffic directory simulation = ElevatorSimulation(f"{os.path.join(os.path.dirname(__file__), '..', 'traffic')}") # Print traffic status - print(f"Elevator simulation server running on http://{args.host}:{args.port}") + info(f"Elevator simulation server (Async) running on http://{args.host}:{args.port}", prefix="SERVER") + info("Using Quart (async Flask) for better concurrency", prefix="SERVER") + debug_status = "enabled" if args.debug else "disabled" + info(f"Debug mode: {debug_status}", prefix="SERVER") try: - app.run(host=args.host, port=args.port, debug=args.debug, threaded=True) + # 使用Quart的run方法(底层使用hypercorn) + app.run(host=args.host, port=args.port, debug=args.debug) except KeyboardInterrupt: - print("\nShutting down server...") + info("Shutting down server...", prefix="SERVER") if __name__ == "__main__": diff --git a/elevator_saga/utils/__init__.py b/elevator_saga/utils/__init__.py index e69de29..314af55 100644 --- a/elevator_saga/utils/__init__.py +++ b/elevator_saga/utils/__init__.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +""" +Utils package for Elevator Saga +工具包 +""" + +from elevator_saga.utils.logger import LogLevel, debug, error, get_logger, info, set_log_level, warning + +__all__ = [ + # Logger functions + "debug", + "info", + "warning", + "error", + "get_logger", + "set_log_level", + "LogLevel", +] diff --git a/elevator_saga/utils/debug.py b/elevator_saga/utils/debug.py deleted file mode 100644 index 46e9d7d..0000000 --- a/elevator_saga/utils/debug.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python3 -""" -Debug utilities for Elevator Saga -调试工具模块 -""" - -# Global debug flag -_debug_enabled: bool = True - - -def set_debug_mode(enabled: bool) -> None: - """启用或禁用调试模式""" - global _debug_enabled - _debug_enabled = enabled - - -def debug_log(message: str) -> None: - """输出调试信息(如果启用了调试模式)""" - if _debug_enabled: - print(f"[DEBUG] {message}", flush=True) - - -def is_debug_enabled() -> bool: - """检查是否启用了调试模式""" - return _debug_enabled diff --git a/elevator_saga/utils/logger.py b/elevator_saga/utils/logger.py new file mode 100644 index 0000000..35c3dd6 --- /dev/null +++ b/elevator_saga/utils/logger.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +""" +Unified logging system for Elevator Saga +统一的日志系统 - 支持多级别、带颜色的日志输出 +""" +import os +import sys +from enum import Enum +from typing import Optional + + +class LogLevel(Enum): + """日志级别枚举""" + + DEBUG = 0 + INFO = 1 + WARNING = 2 + ERROR = 3 + + @classmethod + def from_string(cls, level_str: str) -> "LogLevel": + """从字符串转换为日志级别""" + level_map = { + "DEBUG": cls.DEBUG, + "INFO": cls.INFO, + "WARNING": cls.WARNING, + "ERROR": cls.ERROR, + } + return level_map.get(level_str.upper(), cls.DEBUG) + + +class Color: + """ANSI颜色代码""" + + # 基础颜色 + RESET = "\033[0m" + BOLD = "\033[1m" + + # 前景色 + BLACK = "\033[30m" + RED = "\033[31m" + GREEN = "\033[32m" + YELLOW = "\033[33m" + BLUE = "\033[34m" + MAGENTA = "\033[35m" + CYAN = "\033[36m" + WHITE = "\033[37m" + + # 亮色 + BRIGHT_BLACK = "\033[90m" + BRIGHT_RED = "\033[91m" + BRIGHT_GREEN = "\033[92m" + BRIGHT_YELLOW = "\033[93m" + BRIGHT_BLUE = "\033[94m" + BRIGHT_MAGENTA = "\033[95m" + BRIGHT_CYAN = "\033[96m" + BRIGHT_WHITE = "\033[97m" + + +class Logger: + """统一日志记录器""" + + def __init__(self, name: str = "ElevatorSaga", min_level: LogLevel = LogLevel.INFO, use_color: bool = True): + """ + 初始化日志记录器 + + Args: + name: 日志记录器名称 + min_level: 最低日志级别 + use_color: 是否使用颜色 + """ + self.name = name + self.min_level = min_level + self.use_color = use_color and sys.stdout.isatty() + + # 日志级别对应的颜色 + self.level_colors = { + LogLevel.DEBUG: Color.BRIGHT_BLACK, + LogLevel.INFO: Color.BRIGHT_CYAN, + LogLevel.WARNING: Color.BRIGHT_YELLOW, + LogLevel.ERROR: Color.BRIGHT_RED, + } + + # 日志级别对应的标签 + self.level_labels = { + LogLevel.DEBUG: "DEBUG", + LogLevel.INFO: "INFO", + LogLevel.WARNING: "WARNING", + LogLevel.ERROR: "ERROR", + } + + def _format_message(self, level: LogLevel, message: str, prefix: Optional[str] = None) -> str: + """ + 格式化日志消息 + + Args: + level: 日志级别 + message: 消息内容 + prefix: 可选的前缀(如模块名) + + Returns: + 格式化后的消息 + """ + level_label = self.level_labels[level] + + if self.use_color: + color = self.level_colors[level] + level_str = f"{color}{level_label:8}{Color.RESET}" + else: + level_str = f"{level_label:8}" + + if prefix: + prefix_str = f"[{prefix}] " + else: + prefix_str = "" + + return f"{level_str} {prefix_str}{message}" + + def _log(self, level: LogLevel, message: str, prefix: Optional[str] = None) -> None: + """ + 记录日志 + + Args: + level: 日志级别 + message: 消息内容 + prefix: 可选的前缀 + """ + if level.value < self.min_level.value: + return + + formatted = self._format_message(level, message, prefix) + print(formatted, flush=True) + + def debug(self, message: str, prefix: Optional[str] = None) -> None: + """记录DEBUG级别日志""" + self._log(LogLevel.DEBUG, message, prefix) + + def info(self, message: str, prefix: Optional[str] = None) -> None: + """记录INFO级别日志""" + self._log(LogLevel.INFO, message, prefix) + + def warning(self, message: str, prefix: Optional[str] = None) -> None: + """记录WARNING级别日志""" + self._log(LogLevel.WARNING, message, prefix) + + def error(self, message: str, prefix: Optional[str] = None) -> None: + """记录ERROR级别日志""" + self._log(LogLevel.ERROR, message, prefix) + + def set_level(self, level: LogLevel) -> None: + """设置最低日志级别""" + self.min_level = level + + +# 全局日志记录器实例 +_global_logger: Optional[Logger] = None + + +def _get_default_log_level() -> LogLevel: + """从环境变量获取默认日志级别,默认为DEBUG""" + env_level = os.environ.get("ELEVATOR_LOG_LEVEL", "DEBUG") + return LogLevel.from_string(env_level) + + +def get_logger(name: str = "ElevatorSaga", min_level: Optional[LogLevel] = None) -> Logger: + """ + 获取全局日志记录器 + + Args: + name: 日志记录器名称 + min_level: 最低日志级别,如果为None则从环境变量读取(默认DEBUG) + + Returns: + Logger实例 + """ + global _global_logger + if _global_logger is None: + if min_level is None: + min_level = _get_default_log_level() + _global_logger = Logger(name, min_level) + return _global_logger + + +def set_log_level(level: LogLevel) -> None: + """设置全局日志级别""" + logger = get_logger() + logger.set_level(level) + + +# 便捷函数 +def debug(message: str, prefix: Optional[str] = None) -> None: + """记录DEBUG日志""" + get_logger().debug(message, prefix) + + +def info(message: str, prefix: Optional[str] = None) -> None: + """记录INFO日志""" + get_logger().info(message, prefix) + + +def warning(message: str, prefix: Optional[str] = None) -> None: + """记录WARNING日志""" + get_logger().warning(message, prefix) + + +def error(message: str, prefix: Optional[str] = None) -> None: + """记录ERROR日志""" + get_logger().error(message, prefix) diff --git a/pyproject.toml b/pyproject.toml index 23f4377..8fd9453 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,8 @@ classifiers = [ dependencies = [ "numpy>=1.20.0", - "flask>=2.0.0", + "quart>=0.19.0", + "hypercorn>=0.15.0", ] [project.optional-dependencies] diff --git a/pyrightconfig.json b/pyrightconfig.json index d756a76..161eb64 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -12,8 +12,7 @@ "docs/_build", "elevatorpy.egg-info", "elevator_saga.egg-info", - ".eggs", - "MsgCenterPy" + ".eggs" ], "pythonVersion": "3.10", "typeCheckingMode": "basic",