2 Commits

Author SHA1 Message Date
Xuwznln
8ae77f6b2a Bump version: 0.0.9 → 0.0.10 2025-10-19 22:35:15 +08:00
Xuwznln
996a23832e Full support for gui & algorithm 2025-10-19 22:35:02 +08:00
14 changed files with 1160 additions and 121 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.9
current_version = 0.0.10
commit = True
tag = True
tag_name = v{new_version}

2
.gitignore vendored
View File

@@ -2,6 +2,8 @@
# Python-related files
# ================================
elevator_saga/traffic/test_cases.py
elevator_saga/traffic/test_cases
result.json
# Compiled Python files
__pycache__/

View File

@@ -75,7 +75,7 @@ repos:
hooks:
- id: prettier
types_or: [yaml, json, markdown]
exclude: "^(build/|dist/|__pycache__/|\\.mypy_cache/|\\.pytest_cache/|htmlcov/|\\.idea/|\\.vscode/|docs/_build/|elevatorpy\\.egg-info/|elevator_saga\\.egg-info/)"
exclude: "^(build/|dist/|__pycache__/|\\.mypy_cache/|\\.pytest_cache/|htmlcov/|\\.idea/|\\.vscode/|docs/_build/|elevatorpy\\.egg-info/|\\.egg-info/)"
# Global settings
default_stages: [pre-commit, pre-push]

View File

@@ -83,6 +83,7 @@ Contents
client
communication
events
logging
.. toctree::
:maxdepth: 1

325
docs/logging.rst Normal file
View File

@@ -0,0 +1,325 @@
Logging System
==============
Overview
--------
Elevator Saga uses a unified logging system with colored output and multiple log levels. The logging system provides consistent, filterable output across all components.
Log Levels
----------
The logger supports four log levels with distinct colors:
* **DEBUG** - Gray/Bright Black - Detailed debugging information
* **INFO** - Cyan - General informational messages
* **WARNING** - Yellow - Warning messages
* **ERROR** - Red - Error messages
Configuration
-------------
Environment Variable
~~~~~~~~~~~~~~~~~~~~
The default log level is controlled by the ``ELEVATOR_LOG_LEVEL`` environment variable:
.. code-block:: bash
# Set log level to DEBUG (default)
export ELEVATOR_LOG_LEVEL=DEBUG
# Set log level to INFO (less verbose)
export ELEVATOR_LOG_LEVEL=INFO
# Set log level to WARNING (only warnings and errors)
export ELEVATOR_LOG_LEVEL=WARNING
# Set log level to ERROR (only errors)
export ELEVATOR_LOG_LEVEL=ERROR
If not set, the default is **DEBUG** mode.
Programmatic Control
~~~~~~~~~~~~~~~~~~~~
You can also control the log level programmatically:
.. code-block:: python
from elevator_saga.utils.logger import LogLevel, set_log_level
# Set to INFO level
set_log_level(LogLevel.INFO)
# Set to DEBUG level
set_log_level(LogLevel.DEBUG)
Basic Usage
-----------
Simple Logging
~~~~~~~~~~~~~~
.. code-block:: python
from elevator_saga.utils.logger import debug, info, warning, error
# Simple messages
info("Server started successfully")
warning("Connection timeout")
error("Failed to load configuration")
debug("Processing tick 42")
With Prefix
~~~~~~~~~~~
Add a prefix to identify the source of the log message:
.. code-block:: python
# Server logs
info("Client registered", prefix="SERVER")
debug("Algorithm client processed tick 42", prefix="SERVER")
# Client logs
info("API Client initialized", prefix="CLIENT")
warning("Command ignored", prefix="CLIENT")
# Controller logs
info("启动 MyController 算法", prefix="CONTROLLER")
error("模拟运行错误", prefix="CONTROLLER")
Advanced Usage
--------------
Custom Logger
~~~~~~~~~~~~~
Create a custom logger instance with specific settings:
.. code-block:: python
from elevator_saga.utils.logger import get_logger, LogLevel
# Get a custom logger
logger = get_logger("MyComponent", min_level=LogLevel.WARNING)
logger.info("This will not appear (level too low)")
logger.warning("This will appear")
logger.error("This will appear")
Color Output
~~~~~~~~~~~~
The logger automatically detects if output is to a TTY (terminal) and enables colors. When redirecting to files or pipes, colors are automatically disabled for clean output.
Log Format
----------
All log messages follow a consistent format::
LEVEL [PREFIX] message
Examples:
.. code-block:: text
DEBUG [SERVER] Algorithm client registered: abc-123
INFO [SERVER] Loading traffic from test_case_01.json
WARNING [SERVER] GUI client: timeout waiting for tick 42
ERROR [CLIENT] Reset failed: Connection refused
INFO [CONTROLLER] 启动 MyController 算法
Component Prefixes
------------------
Standard prefixes used throughout the system:
* **SERVER** - Simulator server logs
* **CLIENT** - API client logs
* **CONTROLLER** - Controller/algorithm logs
You can use any prefix that makes sense for your component.
API Reference
-------------
Functions
~~~~~~~~~
.. py:function:: debug(message: str, prefix: Optional[str] = None) -> None
Log a DEBUG level message.
:param message: The message to log
:param prefix: Optional prefix to identify the source
.. py:function:: info(message: str, prefix: Optional[str] = None) -> None
Log an INFO level message.
:param message: The message to log
:param prefix: Optional prefix to identify the source
.. py:function:: warning(message: str, prefix: Optional[str] = None) -> None
Log a WARNING level message.
:param message: The message to log
:param prefix: Optional prefix to identify the source
.. py:function:: error(message: str, prefix: Optional[str] = None) -> None
Log an ERROR level message.
:param message: The message to log
:param prefix: Optional prefix to identify the source
.. py:function:: set_log_level(level: LogLevel) -> None
Set the global log level.
:param level: The minimum log level to display
.. py:function:: get_logger(name: str = "ElevatorSaga", min_level: Optional[LogLevel] = None) -> Logger
Get or create the global logger instance.
:param name: Name of the logger
:param min_level: Minimum log level (defaults to ELEVATOR_LOG_LEVEL or DEBUG)
:return: Logger instance
Classes
~~~~~~~
.. py:class:: LogLevel
Enumeration of available log levels.
.. py:attribute:: DEBUG
:value: 0
Debug level - most verbose
.. py:attribute:: INFO
:value: 1
Info level - general information
.. py:attribute:: WARNING
:value: 2
Warning level - warnings only
.. py:attribute:: ERROR
:value: 3
Error level - errors only
.. py:method:: from_string(level_str: str) -> LogLevel
:classmethod:
Convert a string to a LogLevel.
:param level_str: String representation (case-insensitive)
:return: Corresponding LogLevel (defaults to DEBUG if invalid)
.. py:class:: Logger
The main logger class.
.. py:method:: __init__(name: str = "ElevatorSaga", min_level: LogLevel = LogLevel.INFO, use_color: bool = True)
Initialize a logger instance.
:param name: Logger name
:param min_level: Minimum level to log
:param use_color: Whether to use colored output
.. py:method:: debug(message: str, prefix: Optional[str] = None) -> None
Log a DEBUG message.
.. py:method:: info(message: str, prefix: Optional[str] = None) -> None
Log an INFO message.
.. py:method:: warning(message: str, prefix: Optional[str] = None) -> None
Log a WARNING message.
.. py:method:: error(message: str, prefix: Optional[str] = None) -> None
Log an ERROR message.
.. py:method:: set_level(level: LogLevel) -> None
Change the minimum log level.
Best Practices
--------------
1. **Use appropriate levels**:
* DEBUG for detailed state changes and internal operations
* INFO for significant events (startup, completion, etc.)
* WARNING for unexpected but recoverable situations
* ERROR for failures and exceptions
2. **Use prefixes consistently**:
* Always use the same prefix for the same component
* Use uppercase for standard prefixes (SERVER, CLIENT, CONTROLLER)
3. **Keep messages concise**:
* One log message per event
* Include relevant context (IDs, values, etc.)
* Avoid multi-line messages
4. **Set appropriate default level**:
* Use DEBUG for development
* Use INFO for production
* Use WARNING for minimal logging
5. **Avoid logging in tight loops**:
* Excessive logging can impact performance
* Consider conditional logging or sampling
Examples
--------
Server Startup
~~~~~~~~~~~~~~
.. code-block:: python
from elevator_saga.utils.logger import info, debug
info("Elevator simulation server (Async) running on http://127.0.0.1:8000", prefix="SERVER")
info("Using Quart (async Flask) for better concurrency", prefix="SERVER")
debug("Found 5 traffic files: ['test01.json', 'test02.json', ...]", prefix="SERVER")
Client Operations
~~~~~~~~~~~~~~~~~
.. code-block:: python
from elevator_saga.utils.logger import info, warning, error
info("Client registered successfully with ID: xyz-789", prefix="CLIENT")
warning("Client type 'gui' cannot send control commands", prefix="CLIENT")
error("Reset failed: Connection refused", prefix="CLIENT")
Controller Logic
~~~~~~~~~~~~~~~~
.. code-block:: python
from elevator_saga.utils.logger import info, debug
info("启动 MyController 算法", prefix="CONTROLLER")
debug("Updated traffic info - max_tick: 1000", prefix="CONTROLLER")
info("停止 MyController 算法", prefix="CONTROLLER")

View File

@@ -6,5 +6,5 @@ A Python implementation of the Elevator Saga game with event-driven architecture
realistic elevator dispatch algorithm development and testing.
"""
__version__ = "0.0.9"
__version__ = "0.0.10"
__author__ = "ZGCA Team"

View File

@@ -4,6 +4,7 @@ Unified API Client for Elevator Saga
使用统一数据模型的客户端API封装
"""
import json
import os
import urllib.error
import urllib.request
from typing import Any, Dict, Optional
@@ -18,19 +19,25 @@ from elevator_saga.core.models import (
SimulationState,
StepResponse,
)
from elevator_saga.utils.debug import debug_log
from elevator_saga.utils.logger import debug, error, info, warning
class ElevatorAPIClient:
"""统一的电梯API客户端"""
def __init__(self, base_url: str):
def __init__(self, base_url: str, client_type: str = "algorithm"):
self.base_url = base_url.rstrip("/")
# 客户端身份相关
self.client_type = client_type
self.client_id: Optional[str] = None
# 缓存相关字段
self._cached_state: Optional[SimulationState] = None
self._cached_tick: int = -1
self._tick_processed: bool = False # 标记当前tick是否已处理完成
debug_log(f"API Client initialized for {self.base_url}")
debug(f"API Client initialized for {self.base_url} with type {self.client_type}", prefix="CLIENT")
# 尝试自动注册
self._auto_register()
def get_state(self, force_reload: bool = False) -> SimulationState:
"""获取模拟状态
@@ -92,7 +99,16 @@ class ElevatorAPIClient:
def step(self, ticks: int = 1) -> StepResponse:
"""执行步进"""
response_data = self._send_post_request("/api/step", {"ticks": ticks})
# 携带当前tick信息用于优先级队列控制
# 如果没有缓存的state先获取一次
if self._cached_state is None:
self.get_state(force_reload=True)
request_data = {"ticks": ticks}
if self._cached_state is not None:
request_data["current_tick"] = self._cached_state.tick
response_data = self._send_post_request("/api/step", request_data)
if "error" not in response_data:
# 使用服务端返回的真实数据
@@ -108,7 +124,7 @@ class ElevatorAPIClient:
event_dict["type"] = EventType(event_dict["type"])
except ValueError:
debug_log(f"Unknown event type: {event_dict['type']}")
warning(f"Unknown event type: {event_dict['type']}", prefix="CLIENT")
continue
events.append(SimulationEvent.from_dict(event_dict))
@@ -118,6 +134,10 @@ class ElevatorAPIClient:
events=events,
)
# 更新缓存的tick保持其他状态不变只更新tick
if self._cached_state is not None:
self._cached_state.tick = step_response.tick
# debug_log(f"Step response: tick={step_response.tick}, events={len(events)}")
return step_response
else:
@@ -125,9 +145,20 @@ class ElevatorAPIClient:
def send_elevator_command(self, command: GoToFloorCommand) -> bool:
"""发送电梯命令"""
# 客户端拦截:检查是否有权限发送控制命令
if not self._can_send_command():
warning(
f"Client type '{self.client_type}' cannot send control commands. "
f"Command ignored: {command.command_type} elevator {command.elevator_id} to floor {command.floor}",
prefix="CLIENT",
)
# 不抛出错误直接返回True但实际未执行
return True
endpoint = self._get_elevator_endpoint(command)
debug_log(
f"Sending elevator command: {command.command_type} to elevator {command.elevator_id} To:F{command.floor}"
debug(
f"Sending elevator command: {command.command_type} to elevator {command.elevator_id} To:F{command.floor}",
prefix="CLIENT",
)
response_data = self._send_post_request(endpoint, command.parameters)
@@ -145,7 +176,7 @@ class ElevatorAPIClient:
response = self.send_elevator_command(command)
return response
except Exception as e:
debug_log(f"Go to floor failed: {e}")
error(f"Go to floor failed: {e}", prefix="CLIENT")
return False
def _get_elevator_endpoint(self, command: GoToFloorCommand) -> str:
@@ -155,13 +186,69 @@ class ElevatorAPIClient:
if isinstance(command, GoToFloorCommand):
return f"{base}/go_to_floor"
def _auto_register(self) -> None:
"""自动注册客户端"""
try:
# 从环境变量读取客户端类型(如果有的话)
env_client_type = os.environ.get("ELEVATOR_CLIENT_TYPE")
if env_client_type:
self.client_type = env_client_type
debug(f"Client type from environment: {self.client_type}", prefix="CLIENT")
# 直接发送注册请求不使用_send_post_request以避免循环依赖
url = f"{self.base_url}/api/client/register"
request_body = json.dumps({}).encode("utf-8")
headers = {"Content-Type": "application/json", "X-Client-Type": self.client_type}
req = urllib.request.Request(url, data=request_body, headers=headers)
with urllib.request.urlopen(req, timeout=60) as response:
response_data = json.loads(response.read().decode("utf-8"))
if response_data.get("success"):
self.client_id = response_data.get("client_id")
info(f"Client registered successfully with ID: {self.client_id}", prefix="CLIENT")
else:
warning(f"Client registration failed: {response_data.get('error')}", prefix="CLIENT")
except Exception as e:
error(f"Auto registration failed: {e}", prefix="CLIENT")
def _can_send_command(self) -> bool:
"""检查客户端是否可以发送控制命令
Returns:
True: 如果是算法客户端或未注册客户端
False: 如果是GUI客户端
"""
# 算法客户端可以发送命令
if self.client_type.lower() == "algorithm":
return True
# 未注册的客户端也可以发送命令(向后兼容)
if self.client_id is None:
return True
# GUI客户端不能发送命令
if self.client_type.lower() == "gui":
return False
# 其他未知类型,默认允许(向后兼容)
return True
def _get_request_headers(self) -> Dict[str, str]:
"""获取请求头,包含客户端身份信息"""
headers = {"Content-Type": "application/json"}
if self.client_id:
headers["X-Client-ID"] = self.client_id
headers["X-Client-Type"] = self.client_type
return headers
def _send_get_request(self, endpoint: str) -> Dict[str, Any]:
"""发送GET请求"""
url = f"{self.base_url}{endpoint}"
# todo: 全部更改为post
# debug_log(f"GET {url}")
try:
with urllib.request.urlopen(url, timeout=60) as response:
headers = self._get_request_headers()
# 对于GET请求只添加客户端标识头
req = urllib.request.Request(url, headers={k: v for k, v in headers.items() if k != "Content-Type"})
with urllib.request.urlopen(req, timeout=60) as response:
data: Dict[str, Any] = json.loads(response.read().decode("utf-8"))
# debug_log(f"GET {url} -> {response.status}")
return data
@@ -169,7 +256,7 @@ class ElevatorAPIClient:
raise RuntimeError(f"GET {url} failed: {e}")
def reset(self) -> bool:
"""重置模拟"""
"""重置模拟并重新注册客户端"""
try:
response_data = self._send_post_request("/api/reset", {})
success = bool(response_data.get("success", False))
@@ -178,10 +265,14 @@ class ElevatorAPIClient:
self._cached_state = None
self._cached_tick = -1
self._tick_processed = False
debug_log("Cache cleared after reset")
debug("Cache cleared after reset", prefix="CLIENT")
# 重新注册客户端(因为服务器已清除客户端记录)
self._auto_register()
debug("Client re-registered after reset", prefix="CLIENT")
return success
except Exception as e:
debug_log(f"Reset failed: {e}")
error(f"Reset failed: {e}", prefix="CLIENT")
return False
def next_traffic_round(self, full_reset: bool = False) -> bool:
@@ -194,10 +285,10 @@ class ElevatorAPIClient:
self._cached_state = None
self._cached_tick = -1
self._tick_processed = False
debug_log("Cache cleared after traffic round switch")
debug("Cache cleared after traffic round switch", prefix="CLIENT")
return success
except Exception as e:
debug_log(f"Next traffic round failed: {e}")
error(f"Next traffic round failed: {e}", prefix="CLIENT")
return False
def get_traffic_info(self) -> Optional[Dict[str, Any]]:
@@ -207,10 +298,10 @@ class ElevatorAPIClient:
if "error" not in response_data:
return response_data
else:
debug_log(f"Get traffic info failed: {response_data.get('error')}")
warning(f"Get traffic info failed: {response_data.get('error')}", prefix="CLIENT")
return None
except Exception as e:
debug_log(f"Get traffic info failed: {e}")
error(f"Get traffic info failed: {e}", prefix="CLIENT")
return None
def _send_post_request(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
@@ -220,7 +311,8 @@ class ElevatorAPIClient:
# debug_log(f"POST {url} with data: {data}")
req = urllib.request.Request(url, data=request_body, headers={"Content-Type": "application/json"})
headers = self._get_request_headers()
req = urllib.request.Request(url, data=request_body, headers=headers)
try:
with urllib.request.urlopen(req, timeout=600) as response:

View File

@@ -14,7 +14,7 @@ from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPa
from elevator_saga.core.models import EventType, SimulationEvent, SimulationState
# 避免循环导入,使用运行时导入
from elevator_saga.utils.debug import debug_log
from elevator_saga.utils.logger import debug, error, info, warning
class ElevatorController(ABC):
@@ -24,13 +24,14 @@ class ElevatorController(ABC):
用户通过继承此类并实现 abstract 方法来创建自己的调度算法
"""
def __init__(self, server_url: str = "http://127.0.0.1:8000", debug: bool = False):
def __init__(self, server_url: str = "http://127.0.0.1:8000", debug: bool = False, client_type: str = "algorithm"):
"""
初始化控制器
Args:
server_url: 服务器URL
debug: 是否启用debug模式
client_type: 客户端类型 ("algorithm""gui")
"""
self.server_url = server_url
self.debug = debug
@@ -39,9 +40,10 @@ class ElevatorController(ABC):
self.current_tick = 0
self.is_running = False
self.current_traffic_max_tick: int = 0
self.client_type = client_type
# 初始化API客户端
self.api_client = ElevatorAPIClient(server_url)
# 初始化API客户端,传递客户端类型
self.api_client = ElevatorAPIClient(server_url, client_type=client_type)
@abstractmethod
def on_init(self, elevators: List[Any], floors: List[Any]) -> None:
@@ -84,13 +86,13 @@ class ElevatorController(ABC):
"""
算法启动前的回调 - 可选实现
"""
print(f"启动 {self.__class__.__name__} 算法")
info(f"启动 {self.__class__.__name__} 算法", prefix="CONTROLLER")
def on_stop(self) -> None:
"""
算法停止后的回调 - 可选实现
"""
print(f"停止 {self.__class__.__name__} 算法")
info(f"停止 {self.__class__.__name__} 算法", prefix="CONTROLLER")
@abstractmethod
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
@@ -206,9 +208,9 @@ class ElevatorController(ABC):
try:
self._run_event_driven_simulation()
except KeyboardInterrupt:
print("\n用户中断了算法运行")
info("用户中断了算法运行", prefix="CONTROLLER")
except Exception as e:
print(f"算法运行出错: {e}")
error(f"算法运行出错: {e}", prefix="CONTROLLER")
raise
finally:
self.is_running = False
@@ -217,7 +219,7 @@ class ElevatorController(ABC):
def stop(self) -> None:
"""停止控制器"""
self.is_running = False
print(f"停止 {self.__class__.__name__}")
info(f"停止 {self.__class__.__name__}", prefix="CONTROLLER")
def on_simulation_complete(self, final_state: Dict[str, Any]) -> None:
"""
@@ -235,10 +237,10 @@ class ElevatorController(ABC):
try:
state = self.api_client.get_state()
except ConnectionResetError as _: # noqa: F841
print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}")
error(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}", prefix="CONTROLLER")
os._exit(1)
if state.tick > 0:
print("模拟器可能已经开始了一次模拟,执行重置...")
warning("模拟器可能已经开始了一次模拟,执行重置...", prefix="CONTROLLER")
self.api_client.reset()
time.sleep(0.3)
return self._run_event_driven_simulation()
@@ -247,7 +249,7 @@ class ElevatorController(ABC):
# 获取当前流量文件的最大tick数
self._update_traffic_info()
if self.current_traffic_max_tick == 0:
print("模拟器接收到的最大tick时间为0可能所有的测试案例已用完请求重置...")
warning("模拟器接收到的最大tick时间为0可能所有的测试案例已用完请求重置...", prefix="CONTROLLER")
self.api_client.next_traffic_round(full_reset=True)
time.sleep(0.3)
return self._run_event_driven_simulation()
@@ -297,7 +299,7 @@ class ElevatorController(ABC):
self._reset_and_reinit()
except Exception as e:
print(f"模拟运行错误: {e}")
error(f"模拟运行错误: {e}", prefix="CONTROLLER")
raise
def _update_wrappers(self, state: SimulationState, init: bool = False) -> None:
@@ -321,12 +323,12 @@ class ElevatorController(ABC):
traffic_info = self.api_client.get_traffic_info()
if traffic_info:
self.current_traffic_max_tick = int(traffic_info["max_tick"])
debug_log(f"Updated traffic info - max_tick: {self.current_traffic_max_tick}")
debug(f"Updated traffic info - max_tick: {self.current_traffic_max_tick}", prefix="CONTROLLER")
else:
debug_log("Failed to get traffic info")
warning("Failed to get traffic info", prefix="CONTROLLER")
self.current_traffic_max_tick = 0
except Exception as e:
debug_log(f"Error updating traffic info: {e}")
error(f"Error updating traffic info: {e}", prefix="CONTROLLER")
self.current_traffic_max_tick = 0
def _handle_single_event(self, event: SimulationEvent) -> None:
@@ -430,5 +432,5 @@ class ElevatorController(ABC):
self._internal_init(self.elevators, self.floors)
except Exception as e:
debug_log(f"重置失败: {e}")
error(f"重置失败: {e}", prefix="CONTROLLER")
raise

View File

@@ -2,17 +2,20 @@
"""
Elevator simulation server - tick-based discrete event simulation
Provides HTTP API for controlling elevators and advancing simulation time
使用Quart异步框架提供更高的并发性能
"""
import argparse
import asyncio
import json
import os.path
import threading
import uuid
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, cast
from typing import Any, Dict, List, Optional, cast
from flask import Flask, Response, request
from quart import Quart, Response, request
from elevator_saga.core.models import (
Direction,
@@ -29,21 +32,236 @@ from elevator_saga.core.models import (
TrafficEntry,
create_empty_simulation_state,
)
# Global debug flag for server
_SERVER_DEBUG_MODE = False
from elevator_saga.utils.logger import LogLevel, debug, error, info, set_log_level, warning
def set_server_debug_mode(enabled: bool) -> None:
"""Enable or disable server debug logging"""
global _SERVER_DEBUG_MODE
globals()["_SERVER_DEBUG_MODE"] = enabled
class ClientType(Enum):
"""客户端类型"""
ALGORITHM = "algorithm"
GUI = "gui"
UNKNOWN = "unknown"
def server_debug_log(message: str) -> None:
"""Print server debug message if debug mode is enabled"""
if _SERVER_DEBUG_MODE:
print(f"[SERVER-DEBUG] {message}", flush=True)
@dataclass
class ClientInfo:
"""客户端信息"""
client_id: str
client_type: ClientType
registered_tick: int
class ClientManager:
"""客户端管理器 - 管理多个客户端的连接和身份"""
def __init__(self) -> None:
self.clients: Dict[str, ClientInfo] = {}
self.algorithm_client_id: Optional[str] = None
self.gui_client_id: Optional[str] = None
self.lock = threading.Lock()
# Step请求控制使用轮询检查不需要事件对象
self.current_tick_processed: Dict[int, bool] = {} # tick -> 是否已被算法客户端处理
self.tick_lock = threading.Lock()
# 事件缓存记录算法客户端产生的events供GUI获取
self.tick_events: Dict[int, List[Any]] = {} # target_tick -> events
self.events_lock = threading.Lock() # Size May Change When Iter
# 严格同步记录GUI已确认的最后tick确保不丢失消息
self.gui_acknowledged_tick: int = -1 # GUI已读取到的最后一个tick
self.algorithm_current_tick: int = -1 # 算法当前的tick执行step前
def register_client(self, client_type_str: str, current_tick: int) -> tuple[str, bool, str]:
"""
注册客户端
Args:
client_type_str: 客户端类型字符串
current_tick: 当前tick
Returns:
tuple[client_id, success, message]
"""
with self.lock:
# 解析客户端类型
try:
if client_type_str.lower() == "algorithm":
client_type = ClientType.ALGORITHM
elif client_type_str.lower() == "gui":
client_type = ClientType.GUI
else:
client_type = ClientType.UNKNOWN
except (AttributeError, ValueError):
client_type = ClientType.UNKNOWN
# 检查是否已经有相同类型的客户端
if client_type == ClientType.ALGORITHM and self.algorithm_client_id is not None:
return "", False, "Algorithm client already registered"
elif client_type == ClientType.GUI and self.gui_client_id is not None:
return "", False, "GUI client already registered"
# 生成新的客户端ID
client_id = str(uuid.uuid4())
client_info = ClientInfo(client_id=client_id, client_type=client_type, registered_tick=current_tick)
# 注册客户端
self.clients[client_id] = client_info
if client_type == ClientType.ALGORITHM:
self.algorithm_client_id = client_id
debug(f"Algorithm client registered: {client_id}", prefix="SERVER")
elif client_type == ClientType.GUI:
self.gui_client_id = client_id
debug(f"GUI client registered: {client_id}", prefix="SERVER")
return client_id, True, f"{client_type.value} client registered successfully"
def get_client_info(self, client_id: str) -> Optional[ClientInfo]:
"""获取客户端信息"""
with self.lock:
return self.clients.get(client_id)
def is_algorithm_client(self, client_id: Optional[str]) -> bool:
"""检查是否是算法客户端"""
if client_id is None:
return False
with self.lock:
client_info = self.clients.get(client_id)
return client_info is not None and client_info.client_type == ClientType.ALGORITHM
def can_execute_command(self, client_id: Optional[str]) -> bool:
"""检查客户端是否可以执行控制命令"""
return self.is_algorithm_client(client_id)
async def wait_for_algorithm_step(self, client_id: Optional[str], target_tick: int, timeout: float = 30.0) -> bool:
"""
GUI客户端等待算法客户端处理完指定tick的step请求
使用asyncio异步等待真正的非阻塞协程
如果没有算法客户端GUI会持续等待直到算法客户端注册并处理
Args:
client_id: 客户端ID
target_tick: 目标tick
timeout: 超时时间(秒)
Returns:
True: 可以继续, False: 超时或其他原因
"""
# 如果是算法客户端直接返回True
if self.is_algorithm_client(client_id):
with self.tick_lock:
self.current_tick_processed[target_tick] = True
debug(f"Algorithm client processed tick {target_tick}", prefix="SERVER")
return True
# GUI客户端需要等待 - 使用异步协程
# 如果没有算法客户端,先等待算法客户端注册
if self.algorithm_client_id is None:
debug("GUI client waiting for algorithm client to register...", prefix="SERVER")
start_time = asyncio.get_event_loop().time()
check_interval = 0.1 # 每100ms检查一次
# 阶段1等待算法客户端注册
while self.algorithm_client_id is None:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= timeout:
warning("GUI client: timeout waiting for algorithm client to register", prefix="SERVER")
return False
await asyncio.sleep(check_interval)
# 动态调整检查间隔
if elapsed > 5 and check_interval < 0.5:
check_interval = 0.5
debug(f"GUI client: algorithm client registered, now waiting for tick {target_tick}", prefix="SERVER")
# 阶段2等待算法客户端处理指定tick
while True:
# 检查是否已处理
with self.tick_lock:
if self.current_tick_processed.get(target_tick, False):
debug(f"GUI client: tick {target_tick} ready, proceeding", prefix="SERVER")
return True
# 检查是否超时
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= timeout:
warning(f"GUI client: timeout waiting for tick {target_tick}", prefix="SERVER")
return False
# 异步休眠,真正的协程切换
# 不阻塞事件循环,其他协程可以运行
await asyncio.sleep(check_interval)
# 动态调整检查间隔(可选优化)
# 前几秒检查更频繁,之后降低频率
if elapsed > 5 and check_interval < 0.5:
check_interval = 0.5 # 5秒后降低到500ms检查一次
def store_tick_events(self, target_tick: int, events: List[Any]) -> None:
"""存储指定tick的events"""
with self.events_lock:
self.tick_events[target_tick] = events
debug(f"Stored {len(events)} events for tick {target_tick}", prefix="SERVER")
def get_tick_events(self, target_tick: int) -> List[Any]:
"""获取指定tick的events"""
with self.events_lock:
events = self.tick_events.get(target_tick, [])
debug(f"Retrieved {len(events)} events for tick {target_tick}", prefix="SERVER")
return events
async def wait_for_gui_acknowledgment(self, target_tick: int, timeout: float = 30.0) -> bool:
"""
算法客户端等待GUI确认已读取上一次step的结果
确保GUI不会错过任何tick的消息
Args:
target_tick: 算法刚刚执行完step后的tick上一次step的结果
timeout: 超时时间
Returns:
True: GUI已确认, False: 超时或没有GUI客户端
"""
# 如果没有GUI客户端不需要等待
if self.gui_client_id is None:
return True
# 如果是第一个ticktarget_tick=1不需要等待GUI还没开始
if target_tick <= 1:
return True
debug(f"Algorithm waiting for GUI to acknowledge tick {target_tick - 1}", prefix="SERVER")
start_time = asyncio.get_event_loop().time()
while True:
# 检查GUI是否已读取到上一个tick的结果
if self.gui_acknowledged_tick >= target_tick - 1:
debug(f"GUI acknowledged tick {target_tick - 1}, algorithm can proceed", prefix="SERVER")
return True
# 检查超时
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= timeout:
warning(f"Timeout waiting for GUI acknowledgment of tick {target_tick - 1}", prefix="SERVER")
return False
await asyncio.sleep(0.01) # 10ms检查一次
def acknowledge_gui_read(self, tick: int) -> None:
"""GUI确认已读取指定tick"""
self.gui_acknowledged_tick = max(self.gui_acknowledged_tick, tick)
debug(f"GUI acknowledged tick {tick}", prefix="SERVER")
def reset(self) -> None:
"""重置客户端管理器"""
with self.lock:
self.clients.clear()
self.algorithm_client_id = None
self.gui_client_id = None
with self.tick_lock:
self.current_tick_processed.clear()
with self.events_lock:
self.tick_events.clear()
self.gui_acknowledged_tick = -1
self.algorithm_current_tick = -1
debug("Client manager reset", prefix="SERVER")
class CustomJSONEncoder(json.JSONEncoder):
@@ -124,6 +342,8 @@ class ElevatorSimulation:
self.current_traffic_index = 0
self.traffic_files: List[Path] = []
self.state: SimulationState = create_empty_simulation_state(2, 1, 1)
self.all_traffic_results: List[Dict[str, Any]] = [] # 存储所有traffic文件的结果
self.start_dir = Path.cwd() # 记录启动目录
self._load_traffic_files()
@property
@@ -154,7 +374,7 @@ class ElevatorSimulation:
self.traffic_files.append(file_path)
# 按文件名排序
self.traffic_files.sort()
server_debug_log(f"Found {len(self.traffic_files)} traffic files: {[f.name for f in self.traffic_files]}")
debug(f"Found {len(self.traffic_files)} traffic files: {[f.name for f in self.traffic_files]}", prefix="SERVER")
# 如果有文件,加载第一个
if self.traffic_files:
self.load_current_traffic()
@@ -162,20 +382,20 @@ class ElevatorSimulation:
def load_current_traffic(self) -> None:
"""加载当前索引对应的流量文件"""
if not self.traffic_files:
server_debug_log("No traffic files available")
warning("No traffic files available", prefix="SERVER")
return
if self.current_traffic_index >= len(self.traffic_files):
server_debug_log(f"Traffic index {self.current_traffic_index} out of range")
warning(f"Traffic index {self.current_traffic_index} out of range", prefix="SERVER")
return
traffic_file = self.traffic_files[self.current_traffic_index]
server_debug_log(f"Loading traffic from {traffic_file.name}")
info(f"Loading traffic from {traffic_file.name}", prefix="SERVER")
try:
with open(traffic_file, "r", encoding="utf-8") as f:
file_data = json.load(f)
building_config = file_data["building"]
server_debug_log(f"Building config: {building_config}")
debug(f"Building config: {building_config}", prefix="SERVER")
self.state = create_empty_simulation_state(
building_config["elevators"], building_config["floors"], building_config["elevator_capacity"]
)
@@ -186,7 +406,7 @@ class ElevatorSimulation:
for i, elevator in enumerate(self.state.elevators):
if i < len(elevator_energy_rates):
elevator.energy_rate = elevator_energy_rates[i]
server_debug_log(f"电梯 E{elevator.id} 能耗率设置为: {elevator.energy_rate}")
debug(f"电梯 E{elevator.id} 能耗率设置为: {elevator.energy_rate}", prefix="SERVER")
self.max_duration_ticks = building_config["duration"]
traffic_data: list[Dict[str, Any]] = file_data["traffic"]
@@ -202,16 +422,108 @@ class ElevatorSimulation:
self.next_passenger_id += 1
except Exception as e:
server_debug_log(f"Error loading traffic file {traffic_file}: {e}")
error(f"Error loading traffic file {traffic_file}: {e}", prefix="SERVER")
def save_current_traffic_result(self) -> None:
"""保存当前traffic文件的结果"""
if not self.traffic_files or self.current_traffic_index >= len(self.traffic_files):
return
traffic_file = self.traffic_files[self.current_traffic_index]
metrics = self._calculate_metrics()
result = {
"traffic_file": traffic_file.name,
"traffic_index": self.current_traffic_index,
"final_tick": self.tick,
"max_duration_ticks": self.max_duration_ticks,
"metrics": metrics.to_dict(),
}
self.all_traffic_results.append(result)
info(
f"Saved result for {traffic_file.name}: {metrics.completed_passengers}/{metrics.total_passengers} passengers completed",
prefix="SERVER",
)
def save_final_results(self) -> None:
"""保存所有结果到result.json"""
result_file = self.start_dir / "result.json"
# 计算总体统计
total_completed = sum(r["metrics"]["completed_passengers"] for r in self.all_traffic_results)
total_passengers = sum(r["metrics"]["total_passengers"] for r in self.all_traffic_results)
total_energy = sum(r["metrics"]["total_energy_consumption"] for r in self.all_traffic_results)
# 计算平均等待时间(只统计有完成乘客的情况)
all_avg_floor_wait = [
r["metrics"]["average_floor_wait_time"]
for r in self.all_traffic_results
if r["metrics"]["completed_passengers"] > 0
]
all_avg_arrival_wait = [
r["metrics"]["average_arrival_wait_time"]
for r in self.all_traffic_results
if r["metrics"]["completed_passengers"] > 0
]
all_p95_floor_wait = [
r["metrics"]["p95_floor_wait_time"]
for r in self.all_traffic_results
if r["metrics"]["completed_passengers"] > 0
]
all_p95_arrival_wait = [
r["metrics"]["p95_arrival_wait_time"]
for r in self.all_traffic_results
if r["metrics"]["completed_passengers"] > 0
]
completion_rate = total_completed / total_passengers if total_passengers > 0 else 0
final_result = {
"total_traffic_files": len(self.all_traffic_results),
"summary": {
"total_completed_passengers": total_completed,
"total_passengers": total_passengers,
"completion_rate": completion_rate,
"total_energy_consumption": total_energy,
"average_floor_wait_time": (
sum(all_avg_floor_wait) / len(all_avg_floor_wait) if all_avg_floor_wait else 0
),
"average_arrival_wait_time": (
sum(all_avg_arrival_wait) / len(all_avg_arrival_wait) if all_avg_arrival_wait else 0
),
"p95_floor_wait_time": sum(all_p95_floor_wait) / len(all_p95_floor_wait) if all_p95_floor_wait else 0,
"p95_arrival_wait_time": (
sum(all_p95_arrival_wait) / len(all_p95_arrival_wait) if all_p95_arrival_wait else 0
),
},
"individual_results": self.all_traffic_results,
}
with open(result_file, "w", encoding="utf-8") as f:
json.dump(final_result, f, indent=2, ensure_ascii=False)
info(f"Final results saved to: {result_file}", prefix="SERVER")
info(
f"Summary: {total_completed}/{total_passengers} passengers completed ({completion_rate:.1%})",
prefix="SERVER",
)
info(f"Total energy consumption: {total_energy:.2f}", prefix="SERVER")
def next_traffic_round(self, full_reset: bool = False) -> bool:
"""切换到下一个流量文件,返回是否成功切换"""
if not self.traffic_files:
return False
# 在切换前保存当前traffic文件的结果
if self.current_traffic_index >= 0 and self.current_traffic_index < len(self.traffic_files):
self.save_current_traffic_result()
# 检查是否还有下一个文件
next_index = self.current_traffic_index + 1
if next_index >= len(self.traffic_files):
# 所有任务完成,保存最终结果
self.save_final_results()
if full_reset:
self.current_traffic_index = -1
return self.next_traffic_round()
@@ -226,7 +538,7 @@ class ElevatorSimulation:
with open(traffic_file, "r") as f:
traffic_data = json.load(f)
server_debug_log(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries")
debug(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries", prefix="SERVER")
self.traffic_queue: List[TrafficEntry] = [] # type: ignore[reportRedeclaration]
for entry in traffic_data:
@@ -242,12 +554,12 @@ class ElevatorSimulation:
# Sort by arrival time
self.traffic_queue.sort(key=lambda p: p.tick)
server_debug_log(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}")
debug(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}", prefix="SERVER")
def _emit_event(self, event_type: EventType, data: Dict[str, Any]) -> None:
"""Emit an event to be sent to clients using unified data models"""
self.state.add_event(event_type, data)
server_debug_log(f"Event emitted: {event_type.value} with data {data}")
debug(f"Event emitted: {event_type.value} with data {data}", prefix="SERVER")
def step(self, num_ticks: int = 1) -> List[SimulationEvent]:
with self.lock:
@@ -263,9 +575,9 @@ class ElevatorSimulation:
if self.tick >= self.max_duration_ticks:
completed_count = self.force_complete_remaining_passengers()
if completed_count > 0:
server_debug_log(f"模拟结束,强制完成了 {completed_count} 个乘客")
info(f"模拟结束,强制完成了 {completed_count} 个乘客", prefix="SERVER")
server_debug_log(f"Step completed - Final tick: {self.tick}, Total events: {len(new_events)}")
debug(f"Step completed - Final tick: {self.tick}, Total events: {len(new_events)}", prefix="SERVER")
return new_events
def _process_tick(self) -> List[SimulationEvent]:
@@ -338,9 +650,10 @@ class ElevatorSimulation:
elif elevator.run_status == ElevatorStatus.START_UP:
# 从启动状态切换到匀速
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
server_debug_log(
debug(
f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} "
f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}"
f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}",
prefix="SERVER",
)
# START_DOWN状态会在到达目标时在_move_elevators中切换为STOPPED
@@ -356,7 +669,7 @@ class ElevatorSimulation:
)
assert traffic_entry.origin != traffic_entry.destination, f"乘客{passenger.id}目的地和起始地{traffic_entry.origin}重复"
self.passengers[passenger.id] = passenger
server_debug_log(f"乘客 {passenger.id:4} 创建 | {passenger}")
debug(f"乘客 {passenger.id:4} 创建 | {passenger}", prefix="SERVER")
if passenger.destination > passenger.origin:
self.floors[passenger.origin].up_queue.append(passenger.id)
self._emit_event(EventType.UP_BUTTON_PRESSED, {"floor": passenger.origin, "passenger": passenger.id})
@@ -491,19 +804,19 @@ class ElevatorSimulation:
说明电梯处于stop状态这个tick直接采用下一个目的地运行了
"""
elevator.position.target_floor = floor
server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}")
debug(f"电梯 E{elevator.id} 被设定为前往 F{floor}", prefix="SERVER")
new_target_floor_should_accel = self._should_start_deceleration(elevator)
if not new_target_floor_should_accel:
if elevator.run_status == ElevatorStatus.START_DOWN: # 不应该加速但是加了
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
server_debug_log(f"电梯 E{elevator.id} 被设定为匀速")
debug(f"电梯 E{elevator.id} 被设定为匀速", prefix="SERVER")
elif new_target_floor_should_accel:
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: # 应该减速了,但是之前是匀速
elevator.run_status = ElevatorStatus.START_DOWN
server_debug_log(f"电梯 E{elevator.id} 被设定为减速")
debug(f"电梯 E{elevator.id} 被设定为减速", prefix="SERVER")
if elevator.current_floor != floor or elevator.position.floor_up_position != 0:
old_status = elevator.run_status.value
server_debug_log(f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value}")
debug(f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value}", prefix="SERVER")
def _calculate_distance_to_target(self, elevator: ElevatorState) -> float:
"""计算到目标楼层的距离以floor_up_position为单位"""
@@ -542,7 +855,7 @@ class ElevatorSimulation:
self._set_elevator_target_floor(elevator, floor)
else:
elevator.next_target_floor = floor
server_debug_log(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}")
debug(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}", prefix="SERVER")
def get_state(self) -> SimulationStateResponse:
"""Get complete simulation state"""
@@ -636,26 +949,59 @@ class ElevatorSimulation:
self.traffic_queue: List[TrafficEntry] = []
self.max_duration_ticks = 0
self.next_passenger_id = 1
self.all_traffic_results.clear() # 清空累积结果
# Global simulation instance for Flask routes
# Global simulation instance for Quart routes
simulation: ElevatorSimulation = ElevatorSimulation("", _init_only=True)
# Create Flask app
app = Flask(__name__)
# Global client manager instance
client_manager = ClientManager()
# Create Quart app (异步Flask)
app = Quart(__name__)
def get_client_id_from_request() -> Optional[str]:
"""从请求头中获取客户端ID"""
result = request.headers.get("X-Client-ID")
return result if result else None
def get_client_type_from_request() -> str:
"""从请求头中获取客户端类型默认为algorithm"""
result = request.headers.get("X-Client-Type", "algorithm")
return str(result) if result else "algorithm"
# Configure CORS
@app.after_request
def after_request(response: Response) -> Response:
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization,X-Client-ID,X-Client-Type")
response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS")
return response
@app.route("/api/client/register", methods=["POST"])
async def register_client() -> Response | tuple[Response, int]:
"""客户端注册端点"""
try:
client_type = get_client_type_from_request()
current_tick = simulation.tick
client_id, success, message = client_manager.register_client(client_type, current_tick)
if success:
return json_response({"success": True, "client_id": client_id, "message": message})
else:
return json_response({"success": False, "error": message}, 400)
except Exception as e:
return json_response({"error": str(e)}, 500)
@app.route("/api/state", methods=["GET"])
def get_state() -> Response | tuple[Response, int]:
async def get_state() -> Response | tuple[Response, int]:
try:
state = simulation.get_state()
return json_response(state)
@@ -664,14 +1010,60 @@ def get_state() -> Response | tuple[Response, int]:
@app.route("/api/step", methods=["POST"])
def step_simulation() -> Response | tuple[Response, int]:
async def step_simulation() -> Response | tuple[Response, int]:
try:
data: Dict[str, Any] = request.get_json() or {}
data: Dict[str, Any] = await request.get_json() or {}
ticks = data.get("ticks", 1)
# server_debug_log("")
# server_debug_log(f"HTTP /api/step request ----- ticks: {ticks}")
events = simulation.step(ticks)
server_debug_log(f"HTTP /api/step response ----- tick: {simulation.tick}, events: {len(events)}\n")
client_current_tick = data.get("current_tick", None)
# 获取客户端ID
client_id = get_client_id_from_request()
# 检查客户端类型
is_algorithm = client_manager.is_algorithm_client(client_id)
# 如果提供了current_tick实现优先级队列
if client_current_tick is not None:
target_tick = client_current_tick + ticks
# GUI客户端需要等待算法客户端先处理异步等待
can_proceed = await client_manager.wait_for_algorithm_step(client_id, target_tick)
if not can_proceed:
warning(f"Client {client_id} timeout waiting for tick {target_tick}", prefix="SERVER")
return json_response({"error": "Timeout waiting for algorithm client to process this tick"}, 408)
# 只有算法客户端才能真正推进模拟
if is_algorithm:
# 计算target_tick
target_tick = client_current_tick + ticks if client_current_tick is not None else simulation.tick + ticks
# 算法客户端等待GUI确认已读取上一次的tick结果严格同步
gui_ready = await client_manager.wait_for_gui_acknowledgment(target_tick)
if not gui_ready:
warning("Algorithm timeout waiting for GUI acknowledgment, but continuing", prefix="SERVER")
# 继续执行,不阻塞算法
# 真正执行step
events = simulation.step(ticks)
debug(f"Algorithm step: tick {simulation.tick}, events: {len(events)}", prefix="SERVER")
# 存储events供GUI获取
if client_current_tick is not None:
client_manager.store_tick_events(target_tick, events)
else:
# GUI客户端不推进模拟但可以获取算法产生的events
if client_current_tick is not None:
target_tick = client_current_tick + ticks
events = client_manager.get_tick_events(target_tick)
debug(f"GUI step (retrieved): tick {simulation.tick}, events: {len(events)}", prefix="SERVER")
# GUI确认已读取这个tick
client_manager.acknowledge_gui_read(target_tick)
else:
events = []
debug(f"GUI step (no tick): tick {simulation.tick}", prefix="SERVER")
return json_response(
{
"tick": simulation.tick,
@@ -683,18 +1075,37 @@ def step_simulation() -> Response | tuple[Response, int]:
@app.route("/api/reset", methods=["POST"])
def reset_simulation() -> Response | tuple[Response, int]:
async def reset_simulation() -> Response | tuple[Response, int]:
try:
simulation.reset()
client_manager.reset() # 同时重置客户端管理器
info("Simulation and client manager reset", prefix="SERVER")
return json_response({"success": True})
except Exception as e:
return json_response({"error": str(e)}, 500)
@app.route("/api/elevators/<int:elevator_id>/go_to_floor", methods=["POST"])
def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]:
async def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]:
try:
data: Dict[str, Any] = request.get_json() or {}
# 获取客户端ID
client_id = get_client_id_from_request()
# 检查客户端是否有权限执行控制命令
if not client_manager.can_execute_command(client_id):
client_type = "unknown"
if client_id:
client_info = client_manager.get_client_info(client_id)
if client_info:
client_type = client_info.client_type.value
warning(
f"Client {client_id} (type: {client_type}) attempted to execute command but was denied", prefix="SERVER"
)
return json_response(
{"success": False, "error": "Only algorithm clients can execute control commands"}, 403
)
data: Dict[str, Any] = await request.get_json() or {}
floor = data["floor"]
immediate = data.get("immediate", False)
simulation.elevator_go_to_floor(elevator_id, floor, immediate)
@@ -704,10 +1115,11 @@ def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]:
@app.route("/api/traffic/next", methods=["POST"])
def next_traffic_round() -> Response | tuple[Response, int]:
async def next_traffic_round() -> Response | tuple[Response, int]:
"""切换到下一个流量文件"""
try:
full_reset = request.get_json()["full_reset"]
data = await request.get_json()
full_reset = data["full_reset"]
success = simulation.next_traffic_round(full_reset)
if success:
return json_response({"success": True})
@@ -718,7 +1130,7 @@ def next_traffic_round() -> Response | tuple[Response, int]:
@app.route("/api/traffic/info", methods=["GET"])
def get_traffic_info() -> Response | tuple[Response, int]:
async def get_traffic_info() -> Response | tuple[Response, int]:
"""获取当前流量文件信息"""
try:
info = simulation.get_traffic_info()
@@ -730,7 +1142,7 @@ def get_traffic_info() -> Response | tuple[Response, int]:
def main() -> None:
global simulation
parser = argparse.ArgumentParser(description="Elevator Simulation Server")
parser = argparse.ArgumentParser(description="Elevator Simulation Server (Async)")
parser.add_argument("--host", default="127.0.0.1", help="Server host")
parser.add_argument("--port", type=int, default=8000, help="Server port")
parser.add_argument("--debug", default=True, action="store_true", help="Enable debug logging")
@@ -739,20 +1151,24 @@ def main() -> None:
# Enable debug mode if requested
if args.debug:
set_server_debug_mode(True)
server_debug_log("Server debug mode enabled")
set_log_level(LogLevel.DEBUG)
debug("Server debug mode enabled", prefix="SERVER")
app.config["DEBUG"] = True
# Create simulation with traffic directory
simulation = ElevatorSimulation(f"{os.path.join(os.path.dirname(__file__), '..', 'traffic')}")
# Print traffic status
print(f"Elevator simulation server running on http://{args.host}:{args.port}")
info(f"Elevator simulation server (Async) running on http://{args.host}:{args.port}", prefix="SERVER")
info("Using Quart (async Flask) for better concurrency", prefix="SERVER")
debug_status = "enabled" if args.debug else "disabled"
info(f"Debug mode: {debug_status}", prefix="SERVER")
try:
app.run(host=args.host, port=args.port, debug=args.debug, threaded=True)
# 使用Quart的run方法底层使用hypercorn
app.run(host=args.host, port=args.port, debug=args.debug)
except KeyboardInterrupt:
print("\nShutting down server...")
info("Shutting down server...", prefix="SERVER")
if __name__ == "__main__":

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""
Utils package for Elevator Saga
工具包
"""
from elevator_saga.utils.logger import LogLevel, debug, error, get_logger, info, set_log_level, warning
__all__ = [
# Logger functions
"debug",
"info",
"warning",
"error",
"get_logger",
"set_log_level",
"LogLevel",
]

View File

@@ -1,25 +0,0 @@
#!/usr/bin/env python3
"""
Debug utilities for Elevator Saga
调试工具模块
"""
# Global debug flag
_debug_enabled: bool = True
def set_debug_mode(enabled: bool) -> None:
"""启用或禁用调试模式"""
global _debug_enabled
_debug_enabled = enabled
def debug_log(message: str) -> None:
"""输出调试信息(如果启用了调试模式)"""
if _debug_enabled:
print(f"[DEBUG] {message}", flush=True)
def is_debug_enabled() -> bool:
"""检查是否启用了调试模式"""
return _debug_enabled

View File

@@ -0,0 +1,208 @@
#!/usr/bin/env python3
"""
Unified logging system for Elevator Saga
统一的日志系统 - 支持多级别、带颜色的日志输出
"""
import os
import sys
from enum import Enum
from typing import Optional
class LogLevel(Enum):
"""日志级别枚举"""
DEBUG = 0
INFO = 1
WARNING = 2
ERROR = 3
@classmethod
def from_string(cls, level_str: str) -> "LogLevel":
"""从字符串转换为日志级别"""
level_map = {
"DEBUG": cls.DEBUG,
"INFO": cls.INFO,
"WARNING": cls.WARNING,
"ERROR": cls.ERROR,
}
return level_map.get(level_str.upper(), cls.DEBUG)
class Color:
"""ANSI颜色代码"""
# 基础颜色
RESET = "\033[0m"
BOLD = "\033[1m"
# 前景色
BLACK = "\033[30m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
# 亮色
BRIGHT_BLACK = "\033[90m"
BRIGHT_RED = "\033[91m"
BRIGHT_GREEN = "\033[92m"
BRIGHT_YELLOW = "\033[93m"
BRIGHT_BLUE = "\033[94m"
BRIGHT_MAGENTA = "\033[95m"
BRIGHT_CYAN = "\033[96m"
BRIGHT_WHITE = "\033[97m"
class Logger:
"""统一日志记录器"""
def __init__(self, name: str = "ElevatorSaga", min_level: LogLevel = LogLevel.INFO, use_color: bool = True):
"""
初始化日志记录器
Args:
name: 日志记录器名称
min_level: 最低日志级别
use_color: 是否使用颜色
"""
self.name = name
self.min_level = min_level
self.use_color = use_color and sys.stdout.isatty()
# 日志级别对应的颜色
self.level_colors = {
LogLevel.DEBUG: Color.BRIGHT_BLACK,
LogLevel.INFO: Color.BRIGHT_CYAN,
LogLevel.WARNING: Color.BRIGHT_YELLOW,
LogLevel.ERROR: Color.BRIGHT_RED,
}
# 日志级别对应的标签
self.level_labels = {
LogLevel.DEBUG: "DEBUG",
LogLevel.INFO: "INFO",
LogLevel.WARNING: "WARNING",
LogLevel.ERROR: "ERROR",
}
def _format_message(self, level: LogLevel, message: str, prefix: Optional[str] = None) -> str:
"""
格式化日志消息
Args:
level: 日志级别
message: 消息内容
prefix: 可选的前缀(如模块名)
Returns:
格式化后的消息
"""
level_label = self.level_labels[level]
if self.use_color:
color = self.level_colors[level]
level_str = f"{color}{level_label:8}{Color.RESET}"
else:
level_str = f"{level_label:8}"
if prefix:
prefix_str = f"[{prefix}] "
else:
prefix_str = ""
return f"{level_str} {prefix_str}{message}"
def _log(self, level: LogLevel, message: str, prefix: Optional[str] = None) -> None:
"""
记录日志
Args:
level: 日志级别
message: 消息内容
prefix: 可选的前缀
"""
if level.value < self.min_level.value:
return
formatted = self._format_message(level, message, prefix)
print(formatted, flush=True)
def debug(self, message: str, prefix: Optional[str] = None) -> None:
"""记录DEBUG级别日志"""
self._log(LogLevel.DEBUG, message, prefix)
def info(self, message: str, prefix: Optional[str] = None) -> None:
"""记录INFO级别日志"""
self._log(LogLevel.INFO, message, prefix)
def warning(self, message: str, prefix: Optional[str] = None) -> None:
"""记录WARNING级别日志"""
self._log(LogLevel.WARNING, message, prefix)
def error(self, message: str, prefix: Optional[str] = None) -> None:
"""记录ERROR级别日志"""
self._log(LogLevel.ERROR, message, prefix)
def set_level(self, level: LogLevel) -> None:
"""设置最低日志级别"""
self.min_level = level
# 全局日志记录器实例
_global_logger: Optional[Logger] = None
def _get_default_log_level() -> LogLevel:
"""从环境变量获取默认日志级别默认为DEBUG"""
env_level = os.environ.get("ELEVATOR_LOG_LEVEL", "DEBUG")
return LogLevel.from_string(env_level)
def get_logger(name: str = "ElevatorSaga", min_level: Optional[LogLevel] = None) -> Logger:
"""
获取全局日志记录器
Args:
name: 日志记录器名称
min_level: 最低日志级别如果为None则从环境变量读取默认DEBUG
Returns:
Logger实例
"""
global _global_logger
if _global_logger is None:
if min_level is None:
min_level = _get_default_log_level()
_global_logger = Logger(name, min_level)
return _global_logger
def set_log_level(level: LogLevel) -> None:
"""设置全局日志级别"""
logger = get_logger()
logger.set_level(level)
# 便捷函数
def debug(message: str, prefix: Optional[str] = None) -> None:
"""记录DEBUG日志"""
get_logger().debug(message, prefix)
def info(message: str, prefix: Optional[str] = None) -> None:
"""记录INFO日志"""
get_logger().info(message, prefix)
def warning(message: str, prefix: Optional[str] = None) -> None:
"""记录WARNING日志"""
get_logger().warning(message, prefix)
def error(message: str, prefix: Optional[str] = None) -> None:
"""记录ERROR日志"""
get_logger().error(message, prefix)

View File

@@ -30,7 +30,8 @@ classifiers = [
dependencies = [
"numpy>=1.20.0",
"flask>=2.0.0",
"quart>=0.19.0",
"hypercorn>=0.15.0",
]
[project.optional-dependencies]

View File

@@ -12,8 +12,7 @@
"docs/_build",
"elevatorpy.egg-info",
"elevator_saga.egg-info",
".eggs",
"MsgCenterPy"
".eggs"
],
"pythonVersion": "3.10",
"typeCheckingMode": "basic",