17 Commits

Author SHA1 Message Date
Xuwznln
14a2aea166 Bump version: 0.0.10 → 0.0.11 2025-10-24 00:26:21 +08:00
Xuwznln
a009dac5ab Update gitignore 2025-10-24 00:26:15 +08:00
Xuwznln
371b3edf70 Correct calculating passengers arrival time. 2025-10-23 21:30:10 +08:00
Xuwznln
8ae77f6b2a Bump version: 0.0.9 → 0.0.10 2025-10-19 22:35:15 +08:00
Xuwznln
996a23832e Full support for gui & algorithm 2025-10-19 22:35:02 +08:00
Xuwznln
83459923e8 Update docs (Energy) 2025-10-16 01:26:28 +08:00
Xuwznln
b4b99daead Bump version: 0.0.8 → 0.0.9 2025-10-16 01:20:39 +08:00
Xuwznln
d44ba8b6cd Update gitignore 2025-10-16 01:20:11 +08:00
Xuwznln
71e8f2a451 Feat: add energy rate for elevators 2025-10-15 20:46:47 +08:00
Xuwznln
4b60359894 Bump version: 0.0.7 → 0.0.8 2025-10-12 02:14:27 +08:00
Xuwznln
0157496e6f Fix: client completed_passengers calculation error 2025-10-12 02:14:18 +08:00
Xuwznln
1031e677e1 Bump version: 0.0.6 → 0.0.7 2025-10-09 16:49:15 +08:00
Xuwznln
889d554f19 Fix: remove abstractmethod decroation for on_elevator_move 2025-10-09 16:49:07 +08:00
Xuwznln
ee3c4bab7e Bump version: 0.0.5 → 0.0.6 2025-10-09 16:41:04 +08:00
Xuwznln
99524eee3d Add: elevator move event 2025-10-09 16:40:51 +08:00
Xuwznln
b2d03b2510 Bump version: 0.0.4 → 0.0.5 2025-10-06 15:16:47 +08:00
Xuwznln
1a8063e4fd fix performance calculation. fix floor error in approaching event. fix passenger board wrongly. 2025-10-06 15:16:35 +08:00
23 changed files with 1485 additions and 253 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.4
current_version = 0.0.11
commit = True
tag = True
tag_name = v{new_version}

6
.gitignore vendored
View File

@@ -1,6 +1,12 @@
# ================================
# Python-related files
# ================================
elevator_saga/traffic/test_cases.py
elevator_saga/traffic/test_cases
result.json
batch_grading.py
students.csv
ElevatorTest
# Compiled Python files
__pycache__/

View File

@@ -75,7 +75,7 @@ repos:
hooks:
- id: prettier
types_or: [yaml, json, markdown]
exclude: "^(build/|dist/|__pycache__/|\\.mypy_cache/|\\.pytest_cache/|htmlcov/|\\.idea/|\\.vscode/|docs/_build/|elevatorpy\\.egg-info/|elevator_saga\\.egg-info/)"
exclude: "^(build/|dist/|__pycache__/|\\.mypy_cache/|\\.pytest_cache/|htmlcov/|\\.idea/|\\.vscode/|docs/_build/|elevatorpy\\.egg-info/|\\.egg-info/)"
# Global settings
default_stages: [pre-commit, pre-push]

View File

@@ -176,7 +176,7 @@ Dynamic proxy for ``PassengerInfo`` that provides access to passenger informatio
if passenger.status == PassengerStatus.IN_ELEVATOR:
print(f"In elevator {passenger.elevator_id}")
print(f"Waited {passenger.wait_time} ticks")
print(f"Waited {passenger.floor_wait_time} ticks")
Read-Only Protection
~~~~~~~~~~~~~~~~~~~~
@@ -284,6 +284,7 @@ The controller provides these event handlers:
- ``on_passenger_alight(elevator, passenger, floor)``: Passenger alights
- ``on_elevator_passing_floor(elevator, floor, direction)``: Elevator passes floor
- ``on_elevator_approaching(elevator, floor, direction)``: Elevator about to arrive
- ``on_elevator_move(elevator, from_position, to_position, direction, status)``: Elevator moves
Complete Example
----------------

View File

@@ -65,6 +65,8 @@ Response format:
"passengers": [101, 102],
"max_capacity": 10,
"run_status": "constant_speed",
"energy_consumed": 38.5,
"energy_rate": 1.0,
"..."
}
],
@@ -81,7 +83,8 @@ Response format:
"avg_wait": 15.2,
"p95_wait": 30.0,
"avg_system": 25.5,
"p95_system": 45.0
"p95_system": 45.0,
"total_energy_consumption": 156.0
}
}

View File

@@ -202,7 +202,7 @@ Event System
Event Types
~~~~~~~~~~~
The simulation generates 8 types of events defined in ``EventType`` enum:
The simulation generates 9 types of events defined in ``EventType`` enum:
.. code-block:: python
@@ -215,6 +215,7 @@ The simulation generates 8 types of events defined in ``EventType`` enum:
IDLE = "idle"
PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move"
Event Generation
~~~~~~~~~~~~~~~~
@@ -260,6 +261,19 @@ Events are generated during tick processing:
for elevator in self.elevators:
# ... movement logic ...
# Elevator moves
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
}
)
# Passing a floor
if old_floor != new_floor and new_floor != target_floor:
self._emit_event(
@@ -277,7 +291,7 @@ Events are generated during tick processing:
EventType.ELEVATOR_APPROACHING,
{
"elevator": elevator.id,
"floor": elevator.target_floor,
"floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value
}
)
@@ -363,6 +377,14 @@ The ``ElevatorController`` base class automatically routes events to handler met
elevator = self.elevators[event.data["elevator"]]
self.on_elevator_idle(elevator)
elif event.type == EventType.ELEVATOR_MOVE:
elevator = self.elevators[event.data["elevator"]]
from_position = event.data["from_position"]
to_position = event.data["to_position"]
direction = event.data["direction"]
status = event.data["status"]
self.on_elevator_move(elevator, from_position, to_position, direction, status)
# ... other event types ...
Control Flow: Bus Example
@@ -487,28 +509,39 @@ Metrics are calculated from passenger data:
.. code-block:: python
def _calculate_metrics(self) -> MetricsResponse:
def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics"""
completed = [p for p in self.state.passengers.values()
if p.status == PassengerStatus.COMPLETED]
wait_times = [float(p.wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed]
floor_wait_times = [float(p.floor_wait_time) for p in completed]
arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
return MetricsResponse(
done=len(completed),
total=len(self.state.passengers),
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0,
p95_wait=percentile(wait_times, 95),
avg_system=sum(system_times) / len(system_times) if system_times else 0,
p95_system=percentile(system_times, 95),
def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
"""计算排除掉最长的指定百分比后的平均值"""
if not data:
return 0.0
sorted_data = sorted(data)
keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
if keep_count == 0:
return 0.0
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return PerformanceMetrics(
completed_passengers=len(completed),
total_passengers=len(self.state.passengers),
average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
)
Key metrics:
- **Wait time**: ``pickup_tick - arrive_tick`` (how long passenger waited)
- **System time**: ``dropoff_tick - arrive_tick`` (total time in system)
- **P95**: 95th percentile (worst-case for most passengers)
- **Floor wait time**: ``pickup_tick - arrive_tick`` (在楼层等待的时间,从到达到上电梯)
- **Arrival wait time**: ``dropoff_tick - arrive_tick`` (总等待时间,从到达到下电梯)
- **P95 metrics**: 排除掉最长的5%时间后计算剩余95%的平均值
Summary
-------

View File

@@ -28,7 +28,9 @@ Features
🔌 **Client-Server Model**: Separate simulation server from control logic for clean architecture
📊 **Performance Metrics**: Track wait times, system times, and completion rates
📊 **Performance Metrics**: Track wait times, system times, completion rates, and energy consumption
**Energy Tracking**: Monitor and optimize energy consumption with configurable per-elevator energy rates
🎯 **Flexible Control**: Implement your own algorithms using a simple controller interface
@@ -81,6 +83,7 @@ Contents
client
communication
events
logging
.. toctree::
:maxdepth: 1

325
docs/logging.rst Normal file
View File

@@ -0,0 +1,325 @@
Logging System
==============
Overview
--------
Elevator Saga uses a unified logging system with colored output and multiple log levels. The logging system provides consistent, filterable output across all components.
Log Levels
----------
The logger supports four log levels with distinct colors:
* **DEBUG** - Gray/Bright Black - Detailed debugging information
* **INFO** - Cyan - General informational messages
* **WARNING** - Yellow - Warning messages
* **ERROR** - Red - Error messages
Configuration
-------------
Environment Variable
~~~~~~~~~~~~~~~~~~~~
The default log level is controlled by the ``ELEVATOR_LOG_LEVEL`` environment variable:
.. code-block:: bash
# Set log level to DEBUG (default)
export ELEVATOR_LOG_LEVEL=DEBUG
# Set log level to INFO (less verbose)
export ELEVATOR_LOG_LEVEL=INFO
# Set log level to WARNING (only warnings and errors)
export ELEVATOR_LOG_LEVEL=WARNING
# Set log level to ERROR (only errors)
export ELEVATOR_LOG_LEVEL=ERROR
If not set, the default is **DEBUG** mode.
Programmatic Control
~~~~~~~~~~~~~~~~~~~~
You can also control the log level programmatically:
.. code-block:: python
from elevator_saga.utils.logger import LogLevel, set_log_level
# Set to INFO level
set_log_level(LogLevel.INFO)
# Set to DEBUG level
set_log_level(LogLevel.DEBUG)
Basic Usage
-----------
Simple Logging
~~~~~~~~~~~~~~
.. code-block:: python
from elevator_saga.utils.logger import debug, info, warning, error
# Simple messages
info("Server started successfully")
warning("Connection timeout")
error("Failed to load configuration")
debug("Processing tick 42")
With Prefix
~~~~~~~~~~~
Add a prefix to identify the source of the log message:
.. code-block:: python
# Server logs
info("Client registered", prefix="SERVER")
debug("Algorithm client processed tick 42", prefix="SERVER")
# Client logs
info("API Client initialized", prefix="CLIENT")
warning("Command ignored", prefix="CLIENT")
# Controller logs
info("启动 MyController 算法", prefix="CONTROLLER")
error("模拟运行错误", prefix="CONTROLLER")
Advanced Usage
--------------
Custom Logger
~~~~~~~~~~~~~
Create a custom logger instance with specific settings:
.. code-block:: python
from elevator_saga.utils.logger import get_logger, LogLevel
# Get a custom logger
logger = get_logger("MyComponent", min_level=LogLevel.WARNING)
logger.info("This will not appear (level too low)")
logger.warning("This will appear")
logger.error("This will appear")
Color Output
~~~~~~~~~~~~
The logger automatically detects if output is to a TTY (terminal) and enables colors. When redirecting to files or pipes, colors are automatically disabled for clean output.
Log Format
----------
All log messages follow a consistent format::
LEVEL [PREFIX] message
Examples:
.. code-block:: text
DEBUG [SERVER] Algorithm client registered: abc-123
INFO [SERVER] Loading traffic from test_case_01.json
WARNING [SERVER] GUI client: timeout waiting for tick 42
ERROR [CLIENT] Reset failed: Connection refused
INFO [CONTROLLER] 启动 MyController 算法
Component Prefixes
------------------
Standard prefixes used throughout the system:
* **SERVER** - Simulator server logs
* **CLIENT** - API client logs
* **CONTROLLER** - Controller/algorithm logs
You can use any prefix that makes sense for your component.
API Reference
-------------
Functions
~~~~~~~~~
.. py:function:: debug(message: str, prefix: Optional[str] = None) -> None
Log a DEBUG level message.
:param message: The message to log
:param prefix: Optional prefix to identify the source
.. py:function:: info(message: str, prefix: Optional[str] = None) -> None
Log an INFO level message.
:param message: The message to log
:param prefix: Optional prefix to identify the source
.. py:function:: warning(message: str, prefix: Optional[str] = None) -> None
Log a WARNING level message.
:param message: The message to log
:param prefix: Optional prefix to identify the source
.. py:function:: error(message: str, prefix: Optional[str] = None) -> None
Log an ERROR level message.
:param message: The message to log
:param prefix: Optional prefix to identify the source
.. py:function:: set_log_level(level: LogLevel) -> None
Set the global log level.
:param level: The minimum log level to display
.. py:function:: get_logger(name: str = "ElevatorSaga", min_level: Optional[LogLevel] = None) -> Logger
Get or create the global logger instance.
:param name: Name of the logger
:param min_level: Minimum log level (defaults to ELEVATOR_LOG_LEVEL or DEBUG)
:return: Logger instance
Classes
~~~~~~~
.. py:class:: LogLevel
Enumeration of available log levels.
.. py:attribute:: DEBUG
:value: 0
Debug level - most verbose
.. py:attribute:: INFO
:value: 1
Info level - general information
.. py:attribute:: WARNING
:value: 2
Warning level - warnings only
.. py:attribute:: ERROR
:value: 3
Error level - errors only
.. py:method:: from_string(level_str: str) -> LogLevel
:classmethod:
Convert a string to a LogLevel.
:param level_str: String representation (case-insensitive)
:return: Corresponding LogLevel (defaults to DEBUG if invalid)
.. py:class:: Logger
The main logger class.
.. py:method:: __init__(name: str = "ElevatorSaga", min_level: LogLevel = LogLevel.INFO, use_color: bool = True)
Initialize a logger instance.
:param name: Logger name
:param min_level: Minimum level to log
:param use_color: Whether to use colored output
.. py:method:: debug(message: str, prefix: Optional[str] = None) -> None
Log a DEBUG message.
.. py:method:: info(message: str, prefix: Optional[str] = None) -> None
Log an INFO message.
.. py:method:: warning(message: str, prefix: Optional[str] = None) -> None
Log a WARNING message.
.. py:method:: error(message: str, prefix: Optional[str] = None) -> None
Log an ERROR message.
.. py:method:: set_level(level: LogLevel) -> None
Change the minimum log level.
Best Practices
--------------
1. **Use appropriate levels**:
* DEBUG for detailed state changes and internal operations
* INFO for significant events (startup, completion, etc.)
* WARNING for unexpected but recoverable situations
* ERROR for failures and exceptions
2. **Use prefixes consistently**:
* Always use the same prefix for the same component
* Use uppercase for standard prefixes (SERVER, CLIENT, CONTROLLER)
3. **Keep messages concise**:
* One log message per event
* Include relevant context (IDs, values, etc.)
* Avoid multi-line messages
4. **Set appropriate default level**:
* Use DEBUG for development
* Use INFO for production
* Use WARNING for minimal logging
5. **Avoid logging in tight loops**:
* Excessive logging can impact performance
* Consider conditional logging or sampling
Examples
--------
Server Startup
~~~~~~~~~~~~~~
.. code-block:: python
from elevator_saga.utils.logger import info, debug
info("Elevator simulation server (Async) running on http://127.0.0.1:8000", prefix="SERVER")
info("Using Quart (async Flask) for better concurrency", prefix="SERVER")
debug("Found 5 traffic files: ['test01.json', 'test02.json', ...]", prefix="SERVER")
Client Operations
~~~~~~~~~~~~~~~~~
.. code-block:: python
from elevator_saga.utils.logger import info, warning, error
info("Client registered successfully with ID: xyz-789", prefix="CLIENT")
warning("Client type 'gui' cannot send control commands", prefix="CLIENT")
error("Reset failed: Connection refused", prefix="CLIENT")
Controller Logic
~~~~~~~~~~~~~~~~
.. code-block:: python
from elevator_saga.utils.logger import info, debug
info("启动 MyController 算法", prefix="CONTROLLER")
debug("Updated traffic info - max_tick: 1000", prefix="CONTROLLER")
info("停止 MyController 算法", prefix="CONTROLLER")

View File

@@ -128,6 +128,7 @@ Complete state information for an elevator:
indicators: ElevatorIndicators = field(default_factory=ElevatorIndicators)
passenger_destinations: Dict[int, int] = {} # passenger_id -> floor
energy_consumed: float = 0.0
energy_rate: float = 1.0 # Energy consumption rate per tick
last_update_tick: int = 0
Key Properties:
@@ -142,6 +143,11 @@ Key Properties:
- ``pressed_floors``: List of destination floors for current passengers
- ``load_factor``: Current load as fraction of capacity (0.0 to 1.0)
Energy Tracking:
- ``energy_consumed``: Total energy consumed by this elevator during the simulation
- ``energy_rate``: Energy consumption rate per tick when moving (default: 1.0). Can be customized in traffic configuration files to simulate different elevator types (e.g., older elevators with higher rates, newer energy-efficient elevators with lower rates)
FloorState
~~~~~~~~~~
@@ -257,15 +263,20 @@ Tracks simulation performance:
class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0
total_passengers: int = 0
average_wait_time: float = 0.0
p95_wait_time: float = 0.0 # 95th percentile
average_system_time: float = 0.0
p95_system_time: float = 0.0 # 95th percentile
average_floor_wait_time: float = 0.0
p95_floor_wait_time: float = 0.0 # 95th percentile
average_arrival_wait_time: float = 0.0
p95_arrival_wait_time: float = 0.0 # 95th percentile
total_energy_consumption: float = 0.0 # Total energy consumed by all elevators
Properties:
- ``completion_rate``: Fraction of passengers completed (0.0 to 1.0)
Energy Metrics:
- ``total_energy_consumption``: Sum of energy consumed by all elevators in the system. Each elevator consumes ``energy_rate`` units of energy per tick when moving.
API Models
----------
@@ -345,3 +356,75 @@ All models support JSON serialization:
restored = ElevatorState.from_dict(data)
This enables seamless transmission over HTTP between client and server.
Energy System
-------------
Overview
~~~~~~~~
The energy system tracks energy consumption of elevators to help optimize control algorithms for both passenger service and energy efficiency.
How Energy Works
~~~~~~~~~~~~~~~~
**Energy Consumption:**
- Each elevator has an ``energy_rate`` attribute (default: 1.0)
- When an elevator moves (any tick where it's not stopped), it consumes energy equal to its ``energy_rate``
- Energy consumption is independent of speed, direction, or load
- Total system energy is the sum of all individual elevator energy consumption
**Configuration:**
Energy rates are configured in traffic JSON files via the ``elevator_energy_rates`` field:
.. code-block:: json
{
"building": {
"floors": 10,
"elevators": 3,
"elevator_capacity": 10,
"elevator_energy_rates": [1.0, 1.0, 1.2],
"scenario": "custom_scenario",
"duration": 600
},
"traffic": []
}
In this example, elevators 0 and 1 have standard energy rates (1.0), while elevator 2 consumes 20% more energy (1.2), perhaps representing an older or less efficient unit.
**Use Cases:**
1. **Algorithm Optimization**: Balance passenger wait times against energy consumption
2. **Heterogeneous Fleets**: Model buildings with elevators of different ages/efficiencies
3. **Cost Analysis**: Evaluate the energy cost of different control strategies
4. **Green Building Simulation**: Optimize for minimal energy while maintaining service quality
Example Usage
~~~~~~~~~~~~~
.. code-block:: python
# Get current state
state = api_client.get_state()
# Check individual elevator energy
for elevator in state.elevators:
print(f"Elevator {elevator.id}: {elevator.energy_consumed} units consumed")
print(f" Energy rate: {elevator.energy_rate} units/tick")
# Check total system energy
metrics = state.metrics
print(f"Total system energy: {metrics.total_energy_consumption} units")
print(f"Completed passengers: {metrics.completed_passengers}")
# Calculate energy per passenger
if metrics.completed_passengers > 0:
energy_per_passenger = metrics.total_energy_consumption / metrics.completed_passengers
print(f"Energy per passenger: {energy_per_passenger:.2f} units")
**Default Behavior:**
If ``elevator_energy_rates`` is not specified in the traffic file, all elevators default to an energy rate of 1.0, ensuring backward compatibility with existing traffic files.

View File

@@ -6,5 +6,5 @@ A Python implementation of the Elevator Saga game with event-driven architecture
realistic elevator dispatch algorithm development and testing.
"""
__version__ = "0.0.4"
__version__ = "0.0.11"
__author__ = "ZGCA Team"

View File

@@ -4,9 +4,10 @@ Unified API Client for Elevator Saga
使用统一数据模型的客户端API封装
"""
import json
import os
import urllib.error
import urllib.request
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional
from elevator_saga.core.models import (
ElevatorState,
@@ -18,19 +19,25 @@ from elevator_saga.core.models import (
SimulationState,
StepResponse,
)
from elevator_saga.utils.debug import debug_log
from elevator_saga.utils.logger import debug, error, info, warning
class ElevatorAPIClient:
"""统一的电梯API客户端"""
def __init__(self, base_url: str):
def __init__(self, base_url: str, client_type: str = "algorithm"):
self.base_url = base_url.rstrip("/")
# 客户端身份相关
self.client_type = client_type
self.client_id: Optional[str] = None
# 缓存相关字段
self._cached_state: Optional[SimulationState] = None
self._cached_tick: int = -1
self._tick_processed: bool = False # 标记当前tick是否已处理完成
debug_log(f"API Client initialized for {self.base_url}")
debug(f"API Client initialized for {self.base_url} with type {self.client_type}", prefix="CLIENT")
# 尝试自动注册
self._auto_register()
def get_state(self, force_reload: bool = False) -> SimulationState:
"""获取模拟状态
@@ -63,16 +70,8 @@ class ElevatorAPIClient:
# 使用服务端返回的metrics数据
metrics_data = response_data.get("metrics", {})
if metrics_data:
# 转换为PerformanceMetrics格式
metrics = PerformanceMetrics(
completed_passengers=metrics_data.get("done", 0),
total_passengers=metrics_data.get("total", 0),
average_wait_time=metrics_data.get("avg_wait", 0),
p95_wait_time=metrics_data.get("p95_wait", 0),
average_system_time=metrics_data.get("avg_system", 0),
p95_system_time=metrics_data.get("p95_system", 0),
# total_energy_consumption=metrics_data.get("energy_total", 0),
)
# 直接从字典创建PerformanceMetrics对象
metrics = PerformanceMetrics.from_dict(metrics_data)
else:
metrics = PerformanceMetrics()
@@ -100,7 +99,16 @@ class ElevatorAPIClient:
def step(self, ticks: int = 1) -> StepResponse:
"""执行步进"""
response_data = self._send_post_request("/api/step", {"ticks": ticks})
# 携带当前tick信息用于优先级队列控制
# 如果没有缓存的state先获取一次
if self._cached_state is None:
self.get_state(force_reload=True)
request_data = {"ticks": ticks}
if self._cached_state is not None:
request_data["current_tick"] = self._cached_state.tick
response_data = self._send_post_request("/api/step", request_data)
if "error" not in response_data:
# 使用服务端返回的真实数据
@@ -116,7 +124,7 @@ class ElevatorAPIClient:
event_dict["type"] = EventType(event_dict["type"])
except ValueError:
debug_log(f"Unknown event type: {event_dict['type']}")
warning(f"Unknown event type: {event_dict['type']}", prefix="CLIENT")
continue
events.append(SimulationEvent.from_dict(event_dict))
@@ -126,16 +134,31 @@ class ElevatorAPIClient:
events=events,
)
# 更新缓存的tick保持其他状态不变只更新tick
if self._cached_state is not None:
self._cached_state.tick = step_response.tick
# debug_log(f"Step response: tick={step_response.tick}, events={len(events)}")
return step_response
else:
raise RuntimeError(f"Step failed: {response_data.get('error')}")
def send_elevator_command(self, command: Union[GoToFloorCommand]) -> bool:
def send_elevator_command(self, command: GoToFloorCommand) -> bool:
"""发送电梯命令"""
# 客户端拦截:检查是否有权限发送控制命令
if not self._can_send_command():
warning(
f"Client type '{self.client_type}' cannot send control commands. "
f"Command ignored: {command.command_type} elevator {command.elevator_id} to floor {command.floor}",
prefix="CLIENT",
)
# 不抛出错误直接返回True但实际未执行
return True
endpoint = self._get_elevator_endpoint(command)
debug_log(
f"Sending elevator command: {command.command_type} to elevator {command.elevator_id} To:F{command.floor}"
debug(
f"Sending elevator command: {command.command_type} to elevator {command.elevator_id} To:F{command.floor}",
prefix="CLIENT",
)
response_data = self._send_post_request(endpoint, command.parameters)
@@ -153,23 +176,79 @@ class ElevatorAPIClient:
response = self.send_elevator_command(command)
return response
except Exception as e:
debug_log(f"Go to floor failed: {e}")
error(f"Go to floor failed: {e}", prefix="CLIENT")
return False
def _get_elevator_endpoint(self, command: Union[GoToFloorCommand]) -> str:
def _get_elevator_endpoint(self, command: GoToFloorCommand) -> str:
"""获取电梯命令端点"""
base = f"/api/elevators/{command.elevator_id}"
if isinstance(command, GoToFloorCommand):
return f"{base}/go_to_floor"
def _auto_register(self) -> None:
"""自动注册客户端"""
try:
# 从环境变量读取客户端类型(如果有的话)
env_client_type = os.environ.get("ELEVATOR_CLIENT_TYPE")
if env_client_type:
self.client_type = env_client_type
debug(f"Client type from environment: {self.client_type}", prefix="CLIENT")
# 直接发送注册请求不使用_send_post_request以避免循环依赖
url = f"{self.base_url}/api/client/register"
request_body = json.dumps({}).encode("utf-8")
headers = {"Content-Type": "application/json", "X-Client-Type": self.client_type}
req = urllib.request.Request(url, data=request_body, headers=headers)
with urllib.request.urlopen(req, timeout=60) as response:
response_data = json.loads(response.read().decode("utf-8"))
if response_data.get("success"):
self.client_id = response_data.get("client_id")
info(f"Client registered successfully with ID: {self.client_id}", prefix="CLIENT")
else:
warning(f"Client registration failed: {response_data.get('error')}", prefix="CLIENT")
except Exception as e:
error(f"Auto registration failed: {e}", prefix="CLIENT")
def _can_send_command(self) -> bool:
"""检查客户端是否可以发送控制命令
Returns:
True: 如果是算法客户端或未注册客户端
False: 如果是GUI客户端
"""
# 算法客户端可以发送命令
if self.client_type.lower() == "algorithm":
return True
# 未注册的客户端也可以发送命令(向后兼容)
if self.client_id is None:
return True
# GUI客户端不能发送命令
if self.client_type.lower() == "gui":
return False
# 其他未知类型,默认允许(向后兼容)
return True
def _get_request_headers(self) -> Dict[str, str]:
"""获取请求头,包含客户端身份信息"""
headers = {"Content-Type": "application/json"}
if self.client_id:
headers["X-Client-ID"] = self.client_id
headers["X-Client-Type"] = self.client_type
return headers
def _send_get_request(self, endpoint: str) -> Dict[str, Any]:
"""发送GET请求"""
url = f"{self.base_url}{endpoint}"
# todo: 全部更改为post
# debug_log(f"GET {url}")
try:
with urllib.request.urlopen(url, timeout=60) as response:
headers = self._get_request_headers()
# 对于GET请求只添加客户端标识头
req = urllib.request.Request(url, headers={k: v for k, v in headers.items() if k != "Content-Type"})
with urllib.request.urlopen(req, timeout=60) as response:
data: Dict[str, Any] = json.loads(response.read().decode("utf-8"))
# debug_log(f"GET {url} -> {response.status}")
return data
@@ -177,7 +256,7 @@ class ElevatorAPIClient:
raise RuntimeError(f"GET {url} failed: {e}")
def reset(self) -> bool:
"""重置模拟"""
"""重置模拟并重新注册客户端"""
try:
response_data = self._send_post_request("/api/reset", {})
success = bool(response_data.get("success", False))
@@ -186,10 +265,14 @@ class ElevatorAPIClient:
self._cached_state = None
self._cached_tick = -1
self._tick_processed = False
debug_log("Cache cleared after reset")
debug("Cache cleared after reset", prefix="CLIENT")
# 重新注册客户端(因为服务器已清除客户端记录)
self._auto_register()
debug("Client re-registered after reset", prefix="CLIENT")
return success
except Exception as e:
debug_log(f"Reset failed: {e}")
error(f"Reset failed: {e}", prefix="CLIENT")
return False
def next_traffic_round(self, full_reset: bool = False) -> bool:
@@ -202,10 +285,10 @@ class ElevatorAPIClient:
self._cached_state = None
self._cached_tick = -1
self._tick_processed = False
debug_log("Cache cleared after traffic round switch")
debug("Cache cleared after traffic round switch", prefix="CLIENT")
return success
except Exception as e:
debug_log(f"Next traffic round failed: {e}")
error(f"Next traffic round failed: {e}", prefix="CLIENT")
return False
def get_traffic_info(self) -> Optional[Dict[str, Any]]:
@@ -215,10 +298,10 @@ class ElevatorAPIClient:
if "error" not in response_data:
return response_data
else:
debug_log(f"Get traffic info failed: {response_data.get('error')}")
warning(f"Get traffic info failed: {response_data.get('error')}", prefix="CLIENT")
return None
except Exception as e:
debug_log(f"Get traffic info failed: {e}")
error(f"Get traffic info failed: {e}", prefix="CLIENT")
return None
def _send_post_request(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
@@ -228,7 +311,8 @@ class ElevatorAPIClient:
# debug_log(f"POST {url} with data: {data}")
req = urllib.request.Request(url, data=request_body, headers={"Content-Type": "application/json"})
headers = self._get_request_headers()
req = urllib.request.Request(url, data=request_body, headers=headers)
try:
with urllib.request.urlopen(req, timeout=600) as response:

View File

@@ -7,14 +7,14 @@ import os
import time
from abc import ABC, abstractmethod
from pprint import pprint
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from elevator_saga.client.api_client import ElevatorAPIClient
from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger
from elevator_saga.core.models import EventType, SimulationEvent, SimulationState
# 避免循环导入,使用运行时导入
from elevator_saga.utils.debug import debug_log
from elevator_saga.utils.logger import debug, error, info, warning
class ElevatorController(ABC):
@@ -24,13 +24,14 @@ class ElevatorController(ABC):
用户通过继承此类并实现 abstract 方法来创建自己的调度算法
"""
def __init__(self, server_url: str = "http://127.0.0.1:8000", debug: bool = False):
def __init__(self, server_url: str = "http://127.0.0.1:8000", debug: bool = False, client_type: str = "algorithm"):
"""
初始化控制器
Args:
server_url: 服务器URL
debug: 是否启用debug模式
client_type: 客户端类型 ("algorithm""gui")
"""
self.server_url = server_url
self.debug = debug
@@ -39,9 +40,10 @@ class ElevatorController(ABC):
self.current_tick = 0
self.is_running = False
self.current_traffic_max_tick: int = 0
self.client_type = client_type
# 初始化API客户端
self.api_client = ElevatorAPIClient(server_url)
# 初始化API客户端,传递客户端类型
self.api_client = ElevatorAPIClient(server_url, client_type=client_type)
@abstractmethod
def on_init(self, elevators: List[Any], floors: List[Any]) -> None:
@@ -84,13 +86,13 @@ class ElevatorController(ABC):
"""
算法启动前的回调 - 可选实现
"""
print(f"启动 {self.__class__.__name__} 算法")
info(f"启动 {self.__class__.__name__} 算法", prefix="CONTROLLER")
def on_stop(self) -> None:
"""
算法停止后的回调 - 可选实现
"""
print(f"停止 {self.__class__.__name__} 算法")
info(f"停止 {self.__class__.__name__} 算法", prefix="CONTROLLER")
@abstractmethod
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
@@ -171,6 +173,22 @@ class ElevatorController(ABC):
"""
pass
# @abstractmethod 为了兼容性暂不强制要求elevator_move必须实现
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调 - 可选实现
Args:
elevator: 电梯代理对象
from_position: 起始位置(浮点数表示楼层)
to_position: 目标位置(浮点数表示楼层)
direction: 移动方向
status: 电梯运行状态
"""
pass
def _internal_init(self, elevators: List[Any], floors: List[Any]) -> None:
"""内部初始化方法"""
self.elevators = elevators
@@ -190,9 +208,9 @@ class ElevatorController(ABC):
try:
self._run_event_driven_simulation()
except KeyboardInterrupt:
print("\n用户中断了算法运行")
info("用户中断了算法运行", prefix="CONTROLLER")
except Exception as e:
print(f"算法运行出错: {e}")
error(f"算法运行出错: {e}", prefix="CONTROLLER")
raise
finally:
self.is_running = False
@@ -201,7 +219,7 @@ class ElevatorController(ABC):
def stop(self) -> None:
"""停止控制器"""
self.is_running = False
print(f"停止 {self.__class__.__name__}")
info(f"停止 {self.__class__.__name__}", prefix="CONTROLLER")
def on_simulation_complete(self, final_state: Dict[str, Any]) -> None:
"""
@@ -218,11 +236,11 @@ class ElevatorController(ABC):
# 获取初始状态并初始化默认从0开始
try:
state = self.api_client.get_state()
except ConnectionResetError as ex:
print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}")
except ConnectionResetError as _: # noqa: F841
error(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}", prefix="CONTROLLER")
os._exit(1)
if state.tick > 0:
print("模拟器可能已经开始了一次模拟,执行重置...")
warning("模拟器可能已经开始了一次模拟,执行重置...", prefix="CONTROLLER")
self.api_client.reset()
time.sleep(0.3)
return self._run_event_driven_simulation()
@@ -231,7 +249,7 @@ class ElevatorController(ABC):
# 获取当前流量文件的最大tick数
self._update_traffic_info()
if self.current_traffic_max_tick == 0:
print("模拟器接收到的最大tick时间为0可能所有的测试案例已用完请求重置...")
warning("模拟器接收到的最大tick时间为0可能所有的测试案例已用完请求重置...", prefix="CONTROLLER")
self.api_client.next_traffic_round(full_reset=True)
time.sleep(0.3)
return self._run_event_driven_simulation()
@@ -281,7 +299,7 @@ class ElevatorController(ABC):
self._reset_and_reinit()
except Exception as e:
print(f"模拟运行错误: {e}")
error(f"模拟运行错误: {e}", prefix="CONTROLLER")
raise
def _update_wrappers(self, state: SimulationState, init: bool = False) -> None:
@@ -305,12 +323,12 @@ class ElevatorController(ABC):
traffic_info = self.api_client.get_traffic_info()
if traffic_info:
self.current_traffic_max_tick = int(traffic_info["max_tick"])
debug_log(f"Updated traffic info - max_tick: {self.current_traffic_max_tick}")
debug(f"Updated traffic info - max_tick: {self.current_traffic_max_tick}", prefix="CONTROLLER")
else:
debug_log("Failed to get traffic info")
warning("Failed to get traffic info", prefix="CONTROLLER")
self.current_traffic_max_tick = 0
except Exception as e:
debug_log(f"Error updating traffic info: {e}")
error(f"Error updating traffic info: {e}", prefix="CONTROLLER")
self.current_traffic_max_tick = 0
def _handle_single_event(self, event: SimulationEvent) -> None:
@@ -352,9 +370,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction_str)
self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction)
elif event.type == EventType.ELEVATOR_APPROACHING:
elevator_id = event.data.get("elevator")
@@ -363,9 +379,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_approaching(elevator_proxy, floor_proxy, direction_str)
self.on_elevator_approaching(elevator_proxy, floor_proxy, direction)
elif event.type == EventType.PASSENGER_BOARD:
elevator_id = event.data.get("elevator")
@@ -385,6 +399,22 @@ class ElevatorController(ABC):
floor_proxy = ProxyFloor(floor_id, self.api_client)
self.on_passenger_alight(elevator_proxy, passenger_proxy, floor_proxy)
elif event.type == EventType.ELEVATOR_MOVE:
elevator_id = event.data.get("elevator")
from_position = event.data.get("from_position")
to_position = event.data.get("to_position")
direction = event.data.get("direction")
status = event.data.get("status")
if (
elevator_id is not None
and from_position is not None
and to_position is not None
and direction is not None
and status is not None
):
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
self.on_elevator_move(elevator_proxy, from_position, to_position, direction, status)
def _reset_and_reinit(self) -> None:
"""重置并重新初始化"""
try:
@@ -402,5 +432,5 @@ class ElevatorController(ABC):
self._internal_init(self.elevators, self.floors)
except Exception as e:
debug_log(f"重置失败: {e}")
error(f"重置失败: {e}", prefix="CONTROLLER")
raise

View File

@@ -41,6 +41,7 @@ class ElevatorBusExampleController(ElevatorController):
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
self.all_passengers.append(passenger)
print(f"乘客 {passenger.id} F{floor.floor} 请求 {passenger.origin} -> {passenger.destination} ({direction})")
pass
def on_elevator_idle(self, elevator: ProxyElevator) -> None:
@@ -60,10 +61,10 @@ class ElevatorBusExampleController(ElevatorController):
elevator.go_to_floor(elevator.current_floor - 1)
def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None:
pass
print(f" 乘客{passenger.id} E{elevator.id}⬆️ F{elevator.current_floor} -> F{passenger.destination}")
def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None:
pass
print(f" 乘客{passenger.id} E{elevator.id}⬇️ F{floor.floor}")
def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass
@@ -71,6 +72,11 @@ class ElevatorBusExampleController(ElevatorController):
def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
pass
if __name__ == "__main__":
algorithm = ElevatorBusExampleController()

View File

@@ -139,6 +139,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True)

View File

@@ -8,7 +8,7 @@ import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
# 类型变量
T = TypeVar("T", bound="SerializableModel")
@@ -55,6 +55,7 @@ class EventType(Enum):
IDLE = "idle"
PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move" # 电梯移动事件
class SerializableModel:
@@ -113,7 +114,7 @@ class Position(SerializableModel):
@property
def current_floor_float(self) -> float:
return self.current_floor + self.floor_up_position / 10
return round(self.current_floor + self.floor_up_position / 10, 1)
def floor_up_position_add(self, num: int) -> int:
self.floor_up_position += num
@@ -161,12 +162,13 @@ class PassengerInfo(SerializableModel):
arrive_tick: int
pickup_tick: int = 0
dropoff_tick: int = 0
arrived: bool = False
elevator_id: Optional[int] = None
@property
def status(self) -> PassengerStatus:
"""乘客状态"""
if self.dropoff_tick > 0:
if self.arrived:
return PassengerStatus.COMPLETED
elif self.pickup_tick > 0:
return PassengerStatus.IN_ELEVATOR
@@ -174,13 +176,13 @@ class PassengerInfo(SerializableModel):
return PassengerStatus.WAITING
@property
def wait_time(self) -> int:
"""等待时间"""
def floor_wait_time(self) -> int:
"""在楼层等待时间(从到达到上电梯)"""
return self.pickup_tick - self.arrive_tick
@property
def system_time(self) -> int:
"""系统时间(总时间"""
def arrival_wait_time(self) -> int:
"""总等待时间(从到达到下电梯"""
return self.dropoff_tick - self.arrive_tick
@property
@@ -209,6 +211,7 @@ class ElevatorState(SerializableModel):
indicators: ElevatorIndicators = field(default_factory=ElevatorIndicators)
passenger_destinations: Dict[int, int] = field(default_factory=dict) # 乘客ID -> 目的地楼层映射
energy_consumed: float = 0.0
energy_rate: float = 1.0 # 能耗率每tick消耗的能量单位
last_update_tick: int = 0
@property
@@ -331,11 +334,11 @@ class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0
total_passengers: int = 0
average_wait_time: float = 0.0
p95_wait_time: float = 0.0
average_system_time: float = 0.0
p95_system_time: float = 0.0
# total_energy_consumption: float = 0.0
average_floor_wait_time: float = 0.0
p95_floor_wait_time: float = 0.0
average_arrival_wait_time: float = 0.0
p95_arrival_wait_time: float = 0.0
total_energy_consumption: float = 0.0
@property
def completion_rate(self) -> float:
@@ -344,13 +347,6 @@ class PerformanceMetrics(SerializableModel):
return 0.0
return self.completed_passengers / self.total_passengers
# @property
# def energy_per_passenger(self) -> float:
# """每位乘客能耗"""
# if self.completed_passengers == 0:
# return 0.0
# return self.total_energy_consumption / self.completed_passengers
@dataclass
class SimulationState(SerializableModel):

View File

@@ -138,6 +138,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True)

View File

@@ -2,17 +2,20 @@
"""
Elevator simulation server - tick-based discrete event simulation
Provides HTTP API for controlling elevators and advancing simulation time
使用Quart异步框架提供更高的并发性能
"""
import argparse
import asyncio
import json
import os.path
import threading
import uuid
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, cast
from flask import Flask, Response, request
from quart import Quart, Response, request
from elevator_saga.core.models import (
Direction,
@@ -22,27 +25,243 @@ from elevator_saga.core.models import (
FloorState,
PassengerInfo,
PassengerStatus,
PerformanceMetrics,
SerializableModel,
SimulationEvent,
SimulationState,
TrafficEntry,
create_empty_simulation_state,
)
# Global debug flag for server
_SERVER_DEBUG_MODE = False
from elevator_saga.utils.logger import LogLevel, debug, error, info, set_log_level, warning
def set_server_debug_mode(enabled: bool) -> None:
"""Enable or disable server debug logging"""
global _SERVER_DEBUG_MODE
globals()["_SERVER_DEBUG_MODE"] = enabled
class ClientType(Enum):
"""客户端类型"""
ALGORITHM = "algorithm"
GUI = "gui"
UNKNOWN = "unknown"
def server_debug_log(message: str) -> None:
"""Print server debug message if debug mode is enabled"""
if _SERVER_DEBUG_MODE:
print(f"[SERVER-DEBUG] {message}", flush=True)
@dataclass
class ClientInfo:
"""客户端信息"""
client_id: str
client_type: ClientType
registered_tick: int
class ClientManager:
"""客户端管理器 - 管理多个客户端的连接和身份"""
def __init__(self) -> None:
self.clients: Dict[str, ClientInfo] = {}
self.algorithm_client_id: Optional[str] = None
self.gui_client_id: Optional[str] = None
self.lock = threading.Lock()
# Step请求控制使用轮询检查不需要事件对象
self.current_tick_processed: Dict[int, bool] = {} # tick -> 是否已被算法客户端处理
self.tick_lock = threading.Lock()
# 事件缓存记录算法客户端产生的events供GUI获取
self.tick_events: Dict[int, List[Any]] = {} # target_tick -> events
self.events_lock = threading.Lock() # Size May Change When Iter
# 严格同步记录GUI已确认的最后tick确保不丢失消息
self.gui_acknowledged_tick: int = -1 # GUI已读取到的最后一个tick
self.algorithm_current_tick: int = -1 # 算法当前的tick执行step前
def register_client(self, client_type_str: str, current_tick: int) -> tuple[str, bool, str]:
"""
注册客户端
Args:
client_type_str: 客户端类型字符串
current_tick: 当前tick
Returns:
tuple[client_id, success, message]
"""
with self.lock:
# 解析客户端类型
try:
if client_type_str.lower() == "algorithm":
client_type = ClientType.ALGORITHM
elif client_type_str.lower() == "gui":
client_type = ClientType.GUI
else:
client_type = ClientType.UNKNOWN
except (AttributeError, ValueError):
client_type = ClientType.UNKNOWN
# 检查是否已经有相同类型的客户端
if client_type == ClientType.ALGORITHM and self.algorithm_client_id is not None:
return "", False, "Algorithm client already registered"
elif client_type == ClientType.GUI and self.gui_client_id is not None:
return "", False, "GUI client already registered"
# 生成新的客户端ID
client_id = str(uuid.uuid4())
client_info = ClientInfo(client_id=client_id, client_type=client_type, registered_tick=current_tick)
# 注册客户端
self.clients[client_id] = client_info
if client_type == ClientType.ALGORITHM:
self.algorithm_client_id = client_id
debug(f"Algorithm client registered: {client_id}", prefix="SERVER")
elif client_type == ClientType.GUI:
self.gui_client_id = client_id
debug(f"GUI client registered: {client_id}", prefix="SERVER")
return client_id, True, f"{client_type.value} client registered successfully"
def get_client_info(self, client_id: str) -> Optional[ClientInfo]:
"""获取客户端信息"""
with self.lock:
return self.clients.get(client_id)
def is_algorithm_client(self, client_id: Optional[str]) -> bool:
"""检查是否是算法客户端"""
if client_id is None:
return False
with self.lock:
client_info = self.clients.get(client_id)
return client_info is not None and client_info.client_type == ClientType.ALGORITHM
def can_execute_command(self, client_id: Optional[str]) -> bool:
"""检查客户端是否可以执行控制命令"""
return self.is_algorithm_client(client_id)
async def wait_for_algorithm_step(self, client_id: Optional[str], target_tick: int, timeout: float = 30.0) -> bool:
"""
GUI客户端等待算法客户端处理完指定tick的step请求
使用asyncio异步等待真正的非阻塞协程
如果没有算法客户端GUI会持续等待直到算法客户端注册并处理
Args:
client_id: 客户端ID
target_tick: 目标tick
timeout: 超时时间(秒)
Returns:
True: 可以继续, False: 超时或其他原因
"""
# 如果是算法客户端直接返回True
if self.is_algorithm_client(client_id):
with self.tick_lock:
self.current_tick_processed[target_tick] = True
debug(f"Algorithm client processed tick {target_tick}", prefix="SERVER")
return True
# GUI客户端需要等待 - 使用异步协程
# 如果没有算法客户端,先等待算法客户端注册
if self.algorithm_client_id is None:
debug("GUI client waiting for algorithm client to register...", prefix="SERVER")
start_time = asyncio.get_event_loop().time()
check_interval = 0.1 # 每100ms检查一次
# 阶段1等待算法客户端注册
while self.algorithm_client_id is None:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= timeout:
warning("GUI client: timeout waiting for algorithm client to register", prefix="SERVER")
return False
await asyncio.sleep(check_interval)
# 动态调整检查间隔
if elapsed > 5 and check_interval < 0.5:
check_interval = 0.5
debug(f"GUI client: algorithm client registered, now waiting for tick {target_tick}", prefix="SERVER")
# 阶段2等待算法客户端处理指定tick
while True:
# 检查是否已处理
with self.tick_lock:
if self.current_tick_processed.get(target_tick, False):
debug(f"GUI client: tick {target_tick} ready, proceeding", prefix="SERVER")
return True
# 检查是否超时
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= timeout:
warning(f"GUI client: timeout waiting for tick {target_tick}", prefix="SERVER")
return False
# 异步休眠,真正的协程切换
# 不阻塞事件循环,其他协程可以运行
await asyncio.sleep(check_interval)
# 动态调整检查间隔(可选优化)
# 前几秒检查更频繁,之后降低频率
if elapsed > 5 and check_interval < 0.5:
check_interval = 0.5 # 5秒后降低到500ms检查一次
def store_tick_events(self, target_tick: int, events: List[Any]) -> None:
"""存储指定tick的events"""
with self.events_lock:
self.tick_events[target_tick] = events
debug(f"Stored {len(events)} events for tick {target_tick}", prefix="SERVER")
def get_tick_events(self, target_tick: int) -> List[Any]:
"""获取指定tick的events"""
with self.events_lock:
events = self.tick_events.get(target_tick, [])
debug(f"Retrieved {len(events)} events for tick {target_tick}", prefix="SERVER")
return events
async def wait_for_gui_acknowledgment(self, target_tick: int, timeout: float = 30.0) -> bool:
"""
算法客户端等待GUI确认已读取上一次step的结果
确保GUI不会错过任何tick的消息
Args:
target_tick: 算法刚刚执行完step后的tick上一次step的结果
timeout: 超时时间
Returns:
True: GUI已确认, False: 超时或没有GUI客户端
"""
# 如果没有GUI客户端不需要等待
if self.gui_client_id is None:
return True
# 如果是第一个ticktarget_tick=1不需要等待GUI还没开始
if target_tick <= 1:
return True
debug(f"Algorithm waiting for GUI to acknowledge tick {target_tick - 1}", prefix="SERVER")
start_time = asyncio.get_event_loop().time()
while True:
# 检查GUI是否已读取到上一个tick的结果
if self.gui_acknowledged_tick >= target_tick - 1:
debug(f"GUI acknowledged tick {target_tick - 1}, algorithm can proceed", prefix="SERVER")
return True
# 检查超时
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= timeout:
warning(f"Timeout waiting for GUI acknowledgment of tick {target_tick - 1}", prefix="SERVER")
return False
await asyncio.sleep(0.01) # 10ms检查一次
def acknowledge_gui_read(self, tick: int) -> None:
"""GUI确认已读取指定tick"""
self.gui_acknowledged_tick = max(self.gui_acknowledged_tick, tick)
debug(f"GUI acknowledged tick {tick}", prefix="SERVER")
def reset(self) -> None:
"""重置客户端管理器"""
with self.lock:
self.clients.clear()
self.algorithm_client_id = None
self.gui_client_id = None
with self.tick_lock:
self.current_tick_processed.clear()
with self.events_lock:
self.tick_events.clear()
self.gui_acknowledged_tick = -1
self.algorithm_current_tick = -1
debug("Client manager reset", prefix="SERVER")
class CustomJSONEncoder(json.JSONEncoder):
@@ -89,19 +308,6 @@ def json_response(data: Any, status: int = 200) -> Response | tuple[Response, in
return response, status
@dataclass
class MetricsResponse(SerializableModel):
"""性能指标响应"""
done: int
total: int
avg_wait: float
p95_wait: float
avg_system: float
p95_system: float
energy_total: float
@dataclass
class PassengerSummary(SerializableModel):
"""乘客摘要"""
@@ -120,7 +326,7 @@ class SimulationStateResponse(SerializableModel):
elevators: List[ElevatorState]
floors: List[FloorState]
passengers: Dict[int, PassengerInfo]
metrics: MetricsResponse
metrics: PerformanceMetrics
class ElevatorSimulation:
@@ -136,6 +342,8 @@ class ElevatorSimulation:
self.current_traffic_index = 0
self.traffic_files: List[Path] = []
self.state: SimulationState = create_empty_simulation_state(2, 1, 1)
self.all_traffic_results: List[Dict[str, Any]] = [] # 存储所有traffic文件的结果
self.start_dir = Path.cwd() # 记录启动目录
self._load_traffic_files()
@property
@@ -166,7 +374,7 @@ class ElevatorSimulation:
self.traffic_files.append(file_path)
# 按文件名排序
self.traffic_files.sort()
server_debug_log(f"Found {len(self.traffic_files)} traffic files: {[f.name for f in self.traffic_files]}")
debug(f"Found {len(self.traffic_files)} traffic files: {[f.name for f in self.traffic_files]}", prefix="SERVER")
# 如果有文件,加载第一个
if self.traffic_files:
self.load_current_traffic()
@@ -174,24 +382,32 @@ class ElevatorSimulation:
def load_current_traffic(self) -> None:
"""加载当前索引对应的流量文件"""
if not self.traffic_files:
server_debug_log("No traffic files available")
warning("No traffic files available", prefix="SERVER")
return
if self.current_traffic_index >= len(self.traffic_files):
server_debug_log(f"Traffic index {self.current_traffic_index} out of range")
warning(f"Traffic index {self.current_traffic_index} out of range", prefix="SERVER")
return
traffic_file = self.traffic_files[self.current_traffic_index]
server_debug_log(f"Loading traffic from {traffic_file.name}")
info(f"Loading traffic from {traffic_file.name}", prefix="SERVER")
try:
with open(traffic_file, "r", encoding="utf-8") as f:
file_data = json.load(f)
building_config = file_data["building"]
server_debug_log(f"Building config: {building_config}")
debug(f"Building config: {building_config}", prefix="SERVER")
self.state = create_empty_simulation_state(
building_config["elevators"], building_config["floors"], building_config["elevator_capacity"]
)
self.reset()
# 设置电梯能耗率
elevator_energy_rates = building_config.get("elevator_energy_rates", [1.0] * building_config["elevators"])
for i, elevator in enumerate(self.state.elevators):
if i < len(elevator_energy_rates):
elevator.energy_rate = elevator_energy_rates[i]
debug(f"电梯 E{elevator.id} 能耗率设置为: {elevator.energy_rate}", prefix="SERVER")
self.max_duration_ticks = building_config["duration"]
traffic_data: list[Dict[str, Any]] = file_data["traffic"]
traffic_data.sort(key=lambda t: cast(int, t["tick"]))
@@ -206,16 +422,108 @@ class ElevatorSimulation:
self.next_passenger_id += 1
except Exception as e:
server_debug_log(f"Error loading traffic file {traffic_file}: {e}")
error(f"Error loading traffic file {traffic_file}: {e}", prefix="SERVER")
def save_current_traffic_result(self) -> None:
"""保存当前traffic文件的结果"""
if not self.traffic_files or self.current_traffic_index >= len(self.traffic_files):
return
traffic_file = self.traffic_files[self.current_traffic_index]
metrics = self._calculate_metrics()
result = {
"traffic_file": traffic_file.name,
"traffic_index": self.current_traffic_index,
"final_tick": self.tick,
"max_duration_ticks": self.max_duration_ticks,
"metrics": metrics.to_dict(),
}
self.all_traffic_results.append(result)
info(
f"Saved result for {traffic_file.name}: {metrics.completed_passengers}/{metrics.total_passengers} passengers completed",
prefix="SERVER",
)
def save_final_results(self) -> None:
"""保存所有结果到result.json"""
result_file = self.start_dir / "result.json"
# 计算总体统计
total_completed = sum(r["metrics"]["completed_passengers"] for r in self.all_traffic_results)
total_passengers = sum(r["metrics"]["total_passengers"] for r in self.all_traffic_results)
total_energy = sum(r["metrics"]["total_energy_consumption"] for r in self.all_traffic_results)
# 计算平均等待时间(只统计有完成乘客的情况)
all_avg_floor_wait = [
r["metrics"]["average_floor_wait_time"]
for r in self.all_traffic_results
if r["metrics"]["completed_passengers"] > 0
]
all_avg_arrival_wait = [
r["metrics"]["average_arrival_wait_time"]
for r in self.all_traffic_results
if r["metrics"]["completed_passengers"] > 0
]
all_p95_floor_wait = [
r["metrics"]["p95_floor_wait_time"]
for r in self.all_traffic_results
if r["metrics"]["completed_passengers"] > 0
]
all_p95_arrival_wait = [
r["metrics"]["p95_arrival_wait_time"]
for r in self.all_traffic_results
if r["metrics"]["completed_passengers"] > 0
]
completion_rate = total_completed / total_passengers if total_passengers > 0 else 0
final_result = {
"total_traffic_files": len(self.all_traffic_results),
"summary": {
"total_completed_passengers": total_completed,
"total_passengers": total_passengers,
"completion_rate": completion_rate,
"total_energy_consumption": total_energy,
"average_floor_wait_time": (
sum(all_avg_floor_wait) / len(all_avg_floor_wait) if all_avg_floor_wait else 0
),
"average_arrival_wait_time": (
sum(all_avg_arrival_wait) / len(all_avg_arrival_wait) if all_avg_arrival_wait else 0
),
"p95_floor_wait_time": sum(all_p95_floor_wait) / len(all_p95_floor_wait) if all_p95_floor_wait else 0,
"p95_arrival_wait_time": (
sum(all_p95_arrival_wait) / len(all_p95_arrival_wait) if all_p95_arrival_wait else 0
),
},
"individual_results": self.all_traffic_results,
}
with open(result_file, "w", encoding="utf-8") as f:
json.dump(final_result, f, indent=2, ensure_ascii=False)
info(f"Final results saved to: {result_file}", prefix="SERVER")
info(
f"Summary: {total_completed}/{total_passengers} passengers completed ({completion_rate:.1%})",
prefix="SERVER",
)
info(f"Total energy consumption: {total_energy:.2f}", prefix="SERVER")
def next_traffic_round(self, full_reset: bool = False) -> bool:
"""切换到下一个流量文件,返回是否成功切换"""
if not self.traffic_files:
return False
# 在切换前保存当前traffic文件的结果
if self.current_traffic_index >= 0 and self.current_traffic_index < len(self.traffic_files):
self.save_current_traffic_result()
# 检查是否还有下一个文件
next_index = self.current_traffic_index + 1
if next_index >= len(self.traffic_files):
# 所有任务完成,保存最终结果
self.save_final_results()
if full_reset:
self.current_traffic_index = -1
return self.next_traffic_round()
@@ -230,7 +538,7 @@ class ElevatorSimulation:
with open(traffic_file, "r") as f:
traffic_data = json.load(f)
server_debug_log(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries")
debug(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries", prefix="SERVER")
self.traffic_queue: List[TrafficEntry] = [] # type: ignore[reportRedeclaration]
for entry in traffic_data:
@@ -246,12 +554,12 @@ class ElevatorSimulation:
# Sort by arrival time
self.traffic_queue.sort(key=lambda p: p.tick)
server_debug_log(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}")
debug(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}", prefix="SERVER")
def _emit_event(self, event_type: EventType, data: Dict[str, Any]) -> None:
"""Emit an event to be sent to clients using unified data models"""
self.state.add_event(event_type, data)
server_debug_log(f"Event emitted: {event_type.value} with data {data}")
debug(f"Event emitted: {event_type.value} with data {data}", prefix="SERVER")
def step(self, num_ticks: int = 1) -> List[SimulationEvent]:
with self.lock:
@@ -267,9 +575,9 @@ class ElevatorSimulation:
if self.tick >= self.max_duration_ticks:
completed_count = self.force_complete_remaining_passengers()
if completed_count > 0:
server_debug_log(f"模拟结束,强制完成了 {completed_count} 个乘客")
info(f"模拟结束,强制完成了 {completed_count} 个乘客", prefix="SERVER")
server_debug_log(f"Step completed - Final tick: {self.tick}, Total events: {len(new_events)}")
debug(f"Step completed - Final tick: {self.tick}, Total events: {len(new_events)}", prefix="SERVER")
return new_events
def _process_tick(self) -> List[SimulationEvent]:
@@ -286,44 +594,42 @@ class ElevatorSimulation:
# 2. Move elevators
self._move_elevators()
# 3. Process elevator stops and passenger boarding/alighting
# 3. Process elevator stops and passenger alighting
self._process_elevator_stops()
# Return events generated this tick
return self.state.events[events_start:]
def _process_passenger_in(self) -> None:
for elevator in self.elevators:
current_floor = elevator.current_floor
# 处于Stopped状态方向也已经清空说明没有调度。
floor = self.floors[current_floor]
passengers_to_board: List[int] = []
available_capacity = elevator.max_capacity - len(elevator.passengers)
# Board passengers going up (if up indicator is on or no direction set)
if elevator.target_floor_direction == Direction.UP:
passengers_to_board.extend(floor.up_queue[:available_capacity])
floor.up_queue = floor.up_queue[available_capacity:]
def _process_passenger_in(self, elevator: ElevatorState) -> None:
current_floor = elevator.current_floor
# 处于Stopped状态方向也已经清空说明没有调度。
floor = self.floors[current_floor]
passengers_to_board: List[int] = []
available_capacity = elevator.max_capacity - len(elevator.passengers)
# Board passengers going up (if up indicator is on or no direction set)
if elevator.target_floor_direction == Direction.UP:
passengers_to_board.extend(floor.up_queue[:available_capacity])
floor.up_queue = floor.up_queue[available_capacity:]
# Board passengers going down (if down indicator is on or no direction set)
if elevator.target_floor_direction == Direction.DOWN:
passengers_to_board.extend(floor.down_queue[:available_capacity])
floor.down_queue = floor.down_queue[available_capacity:]
# Board passengers going down (if down indicator is on or no direction set)
if elevator.target_floor_direction == Direction.DOWN:
passengers_to_board.extend(floor.down_queue[:available_capacity])
floor.down_queue = floor.down_queue[available_capacity:]
# Process boarding
for passenger_id in passengers_to_board:
passenger = self.passengers[passenger_id]
passenger.pickup_tick = self.tick
passenger.elevator_id = elevator.id
elevator.passengers.append(passenger_id)
self._emit_event(
EventType.PASSENGER_BOARD,
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
)
# Process boarding
for passenger_id in passengers_to_board:
passenger = self.passengers[passenger_id]
passenger.pickup_tick = self.tick
passenger.elevator_id = elevator.id
elevator.passengers.append(passenger_id)
self._emit_event(
EventType.PASSENGER_BOARD,
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
)
def _update_elevator_status(self) -> None:
"""更新电梯运行状态"""
for elevator in self.elevators:
current_floor = elevator.position.current_floor
target_floor = elevator.target_floor
old_status = elevator.run_status.value
# 没有移动方向,说明电梯已经到达目标楼层
@@ -331,7 +637,7 @@ class ElevatorSimulation:
if elevator.next_target_floor is not None:
self._set_elevator_target_floor(elevator, elevator.next_target_floor)
self._process_passenger_in()
self._process_passenger_in(elevator)
elevator.next_target_floor = None
else:
continue
@@ -344,9 +650,10 @@ class ElevatorSimulation:
elif elevator.run_status == ElevatorStatus.START_UP:
# 从启动状态切换到匀速
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
server_debug_log(
debug(
f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} "
f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}"
f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}",
prefix="SERVER",
)
# START_DOWN状态会在到达目标时在_move_elevators中切换为STOPPED
@@ -362,7 +669,7 @@ class ElevatorSimulation:
)
assert traffic_entry.origin != traffic_entry.destination, f"乘客{passenger.id}目的地和起始地{traffic_entry.origin}重复"
self.passengers[passenger.id] = passenger
server_debug_log(f"乘客 {passenger.id:4} 创建 | {passenger}")
debug(f"乘客 {passenger.id:4} 创建 | {passenger}", prefix="SERVER")
if passenger.destination > passenger.origin:
self.floors[passenger.origin].up_queue.append(passenger.id)
self._emit_event(EventType.UP_BUTTON_PRESSED, {"floor": passenger.origin, "passenger": passenger.id})
@@ -391,14 +698,32 @@ class ElevatorSimulation:
# 根据状态和方向调整移动距离
elevator.last_tick_direction = elevator.target_floor_direction
old_position = elevator.position.current_floor_float
if elevator.target_floor_direction == Direction.UP:
new_floor = elevator.position.floor_up_position_add(movement_speed)
# 电梯移动时增加能耗每tick增加电梯的能耗率
elevator.energy_consumed += elevator.energy_rate
elif elevator.target_floor_direction == Direction.DOWN:
new_floor = elevator.position.floor_up_position_add(-movement_speed)
# 电梯移动时增加能耗每tick增加电梯的能耗率
elevator.energy_consumed += elevator.energy_rate
else:
# 之前的状态已经是到站了,清空上一次到站的方向
pass
# 发送电梯移动事件
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
},
)
# 移动后检测是否即将到站,从匀速状态切换到减速
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
# 检查是否需要开始减速这里加速减速设置路程为1匀速路程为2这样能够保证不会匀速恰好到达必须加减速
@@ -411,7 +736,7 @@ class ElevatorSimulation:
EventType.ELEVATOR_APPROACHING,
{
"elevator": elevator.id,
"floor": elevator.target_floor,
"floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value,
},
)
@@ -435,7 +760,6 @@ class ElevatorSimulation:
self._emit_event(
EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor, "reason": "move_reached"}
)
# elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5
def _process_elevator_stops(self) -> None:
"""
@@ -457,6 +781,7 @@ class ElevatorSimulation:
passenger = self.passengers[passenger_id]
if passenger.destination == current_floor:
passenger.dropoff_tick = self.tick
passenger.arrived = True
passengers_to_remove.append(passenger_id)
# Remove passengers who alighted
@@ -478,21 +803,20 @@ class ElevatorSimulation:
[SERVER-DEBUG] 电梯 E0 被设定为前往 F1
说明电梯处于stop状态这个tick直接采用下一个目的地运行了
"""
original_target_floor = elevator.target_floor
elevator.position.target_floor = floor
server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}")
debug(f"电梯 E{elevator.id} 被设定为前往 F{floor}", prefix="SERVER")
new_target_floor_should_accel = self._should_start_deceleration(elevator)
if not new_target_floor_should_accel:
if elevator.run_status == ElevatorStatus.START_DOWN: # 不应该加速但是加了
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
server_debug_log(f"电梯 E{elevator.id} 被设定为匀速")
debug(f"电梯 E{elevator.id} 被设定为匀速", prefix="SERVER")
elif new_target_floor_should_accel:
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: # 应该减速了,但是之前是匀速
elevator.run_status = ElevatorStatus.START_DOWN
server_debug_log(f"电梯 E{elevator.id} 被设定为减速")
debug(f"电梯 E{elevator.id} 被设定为减速", prefix="SERVER")
if elevator.current_floor != floor or elevator.position.floor_up_position != 0:
old_status = elevator.run_status.value
server_debug_log(f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value}")
debug(f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value}", prefix="SERVER")
def _calculate_distance_to_target(self, elevator: ElevatorState) -> float:
"""计算到目标楼层的距离以floor_up_position为单位"""
@@ -531,7 +855,7 @@ class ElevatorSimulation:
self._set_elevator_target_floor(elevator, floor)
else:
elevator.next_target_floor = floor
server_debug_log(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}")
debug(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}", prefix="SERVER")
def get_state(self) -> SimulationStateResponse:
"""Get complete simulation state"""
@@ -547,41 +871,51 @@ class ElevatorSimulation:
metrics=metrics,
)
def _calculate_metrics(self) -> MetricsResponse:
def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics"""
# 直接从state中筛选已完成的乘客
completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED]
total_passengers = len(self.state.passengers)
# 计算总能耗
total_energy = sum(elevator.energy_consumed for elevator in self.state.elevators)
if not completed:
return MetricsResponse(
done=0,
total=total_passengers,
avg_wait=0,
p95_wait=0,
avg_system=0,
p95_system=0,
energy_total=sum(e.energy_consumed for e in self.elevators),
return PerformanceMetrics(
completed_passengers=0,
total_passengers=total_passengers,
average_floor_wait_time=0,
p95_floor_wait_time=0,
average_arrival_wait_time=0,
p95_arrival_wait_time=0,
total_energy_consumption=total_energy,
)
wait_times = [float(p.wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed]
floor_wait_times = [float(p.floor_wait_time) for p in self.state.passengers.values()]
arrival_wait_times = [float(p.arrival_wait_time) for p in self.state.passengers.values()]
def percentile(data: List[float], p: int) -> float:
def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
"""计算排除掉最长的指定百分比后的平均值"""
if not data:
return 0.0
sorted_data = sorted(data)
index = int(len(sorted_data) * p / 100)
return sorted_data[min(index, len(sorted_data) - 1)]
# 计算要保留的数据数量(排除掉最长的 exclude_percent
keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
if keep_count == 0:
return 0.0
# 只保留前 keep_count 个数据,排除最长的部分
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return MetricsResponse(
done=len(completed),
total=total_passengers,
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0,
p95_wait=percentile(wait_times, 95),
avg_system=sum(system_times) / len(system_times) if system_times else 0,
p95_system=percentile(system_times, 95),
energy_total=sum(e.energy_consumed for e in self.elevators),
return PerformanceMetrics(
completed_passengers=len(completed),
total_passengers=total_passengers,
average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
total_energy_consumption=total_energy,
)
def get_events(self, since_tick: int = 0) -> List[SimulationEvent]:
@@ -615,26 +949,59 @@ class ElevatorSimulation:
self.traffic_queue: List[TrafficEntry] = []
self.max_duration_ticks = 0
self.next_passenger_id = 1
self.all_traffic_results.clear() # 清空累积结果
# Global simulation instance for Flask routes
# Global simulation instance for Quart routes
simulation: ElevatorSimulation = ElevatorSimulation("", _init_only=True)
# Create Flask app
app = Flask(__name__)
# Global client manager instance
client_manager = ClientManager()
# Create Quart app (异步Flask)
app = Quart(__name__)
def get_client_id_from_request() -> Optional[str]:
"""从请求头中获取客户端ID"""
result = request.headers.get("X-Client-ID")
return result if result else None
def get_client_type_from_request() -> str:
"""从请求头中获取客户端类型默认为algorithm"""
result = request.headers.get("X-Client-Type", "algorithm")
return str(result) if result else "algorithm"
# Configure CORS
@app.after_request
def after_request(response: Response) -> Response:
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization,X-Client-ID,X-Client-Type")
response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS")
return response
@app.route("/api/client/register", methods=["POST"])
async def register_client() -> Response | tuple[Response, int]:
"""客户端注册端点"""
try:
client_type = get_client_type_from_request()
current_tick = simulation.tick
client_id, success, message = client_manager.register_client(client_type, current_tick)
if success:
return json_response({"success": True, "client_id": client_id, "message": message})
else:
return json_response({"success": False, "error": message}, 400)
except Exception as e:
return json_response({"error": str(e)}, 500)
@app.route("/api/state", methods=["GET"])
def get_state() -> Response | tuple[Response, int]:
async def get_state() -> Response | tuple[Response, int]:
try:
state = simulation.get_state()
return json_response(state)
@@ -643,14 +1010,60 @@ def get_state() -> Response | tuple[Response, int]:
@app.route("/api/step", methods=["POST"])
def step_simulation() -> Response | tuple[Response, int]:
async def step_simulation() -> Response | tuple[Response, int]:
try:
data: Dict[str, Any] = request.get_json() or {}
data: Dict[str, Any] = await request.get_json() or {}
ticks = data.get("ticks", 1)
# server_debug_log("")
# server_debug_log(f"HTTP /api/step request ----- ticks: {ticks}")
events = simulation.step(ticks)
server_debug_log(f"HTTP /api/step response ----- tick: {simulation.tick}, events: {len(events)}\n")
client_current_tick = data.get("current_tick", None)
# 获取客户端ID
client_id = get_client_id_from_request()
# 检查客户端类型
is_algorithm = client_manager.is_algorithm_client(client_id)
# 如果提供了current_tick实现优先级队列
if client_current_tick is not None:
target_tick = client_current_tick + ticks
# GUI客户端需要等待算法客户端先处理异步等待
can_proceed = await client_manager.wait_for_algorithm_step(client_id, target_tick)
if not can_proceed:
warning(f"Client {client_id} timeout waiting for tick {target_tick}", prefix="SERVER")
return json_response({"error": "Timeout waiting for algorithm client to process this tick"}, 408)
# 只有算法客户端才能真正推进模拟
if is_algorithm:
# 计算target_tick
target_tick = client_current_tick + ticks if client_current_tick is not None else simulation.tick + ticks
# 算法客户端等待GUI确认已读取上一次的tick结果严格同步
gui_ready = await client_manager.wait_for_gui_acknowledgment(target_tick)
if not gui_ready:
warning("Algorithm timeout waiting for GUI acknowledgment, but continuing", prefix="SERVER")
# 继续执行,不阻塞算法
# 真正执行step
events = simulation.step(ticks)
debug(f"Algorithm step: tick {simulation.tick}, events: {len(events)}", prefix="SERVER")
# 存储events供GUI获取
if client_current_tick is not None:
client_manager.store_tick_events(target_tick, events)
else:
# GUI客户端不推进模拟但可以获取算法产生的events
if client_current_tick is not None:
target_tick = client_current_tick + ticks
events = client_manager.get_tick_events(target_tick)
debug(f"GUI step (retrieved): tick {simulation.tick}, events: {len(events)}", prefix="SERVER")
# GUI确认已读取这个tick
client_manager.acknowledge_gui_read(target_tick)
else:
events = []
debug(f"GUI step (no tick): tick {simulation.tick}", prefix="SERVER")
return json_response(
{
"tick": simulation.tick,
@@ -662,18 +1075,37 @@ def step_simulation() -> Response | tuple[Response, int]:
@app.route("/api/reset", methods=["POST"])
def reset_simulation() -> Response | tuple[Response, int]:
async def reset_simulation() -> Response | tuple[Response, int]:
try:
simulation.reset()
client_manager.reset() # 同时重置客户端管理器
info("Simulation and client manager reset", prefix="SERVER")
return json_response({"success": True})
except Exception as e:
return json_response({"error": str(e)}, 500)
@app.route("/api/elevators/<int:elevator_id>/go_to_floor", methods=["POST"])
def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]:
async def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]:
try:
data: Dict[str, Any] = request.get_json() or {}
# 获取客户端ID
client_id = get_client_id_from_request()
# 检查客户端是否有权限执行控制命令
if not client_manager.can_execute_command(client_id):
client_type = "unknown"
if client_id:
client_info = client_manager.get_client_info(client_id)
if client_info:
client_type = client_info.client_type.value
warning(
f"Client {client_id} (type: {client_type}) attempted to execute command but was denied", prefix="SERVER"
)
return json_response(
{"success": False, "error": "Only algorithm clients can execute control commands"}, 403
)
data: Dict[str, Any] = await request.get_json() or {}
floor = data["floor"]
immediate = data.get("immediate", False)
simulation.elevator_go_to_floor(elevator_id, floor, immediate)
@@ -683,10 +1115,11 @@ def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]:
@app.route("/api/traffic/next", methods=["POST"])
def next_traffic_round() -> Response | tuple[Response, int]:
async def next_traffic_round() -> Response | tuple[Response, int]:
"""切换到下一个流量文件"""
try:
full_reset = request.get_json()["full_reset"]
data = await request.get_json()
full_reset = data["full_reset"]
success = simulation.next_traffic_round(full_reset)
if success:
return json_response({"success": True})
@@ -697,7 +1130,7 @@ def next_traffic_round() -> Response | tuple[Response, int]:
@app.route("/api/traffic/info", methods=["GET"])
def get_traffic_info() -> Response | tuple[Response, int]:
async def get_traffic_info() -> Response | tuple[Response, int]:
"""获取当前流量文件信息"""
try:
info = simulation.get_traffic_info()
@@ -709,7 +1142,7 @@ def get_traffic_info() -> Response | tuple[Response, int]:
def main() -> None:
global simulation
parser = argparse.ArgumentParser(description="Elevator Simulation Server")
parser = argparse.ArgumentParser(description="Elevator Simulation Server (Async)")
parser.add_argument("--host", default="127.0.0.1", help="Server host")
parser.add_argument("--port", type=int, default=8000, help="Server port")
parser.add_argument("--debug", default=True, action="store_true", help="Enable debug logging")
@@ -718,20 +1151,24 @@ def main() -> None:
# Enable debug mode if requested
if args.debug:
set_server_debug_mode(True)
server_debug_log("Server debug mode enabled")
set_log_level(LogLevel.DEBUG)
debug("Server debug mode enabled", prefix="SERVER")
app.config["DEBUG"] = True
# Create simulation with traffic directory
simulation = ElevatorSimulation(f"{os.path.join(os.path.dirname(__file__), '..', 'traffic')}")
# Print traffic status
print(f"Elevator simulation server running on http://{args.host}:{args.port}")
info(f"Elevator simulation server (Async) running on http://{args.host}:{args.port}", prefix="SERVER")
info("Using Quart (async Flask) for better concurrency", prefix="SERVER")
debug_status = "enabled" if args.debug else "disabled"
info(f"Debug mode: {debug_status}", prefix="SERVER")
try:
app.run(host=args.host, port=args.port, debug=args.debug, threaded=True)
# 使用Quart的run方法底层使用hypercorn
app.run(host=args.host, port=args.port, debug=args.debug)
except KeyboardInterrupt:
print("\nShutting down server...")
info("Shutting down server...", prefix="SERVER")
if __name__ == "__main__":

View File

@@ -290,7 +290,7 @@ def generate_fire_evacuation_traffic(
for floor in range(1, floors):
# 每层随机数量的人需要疏散
num_people = random.randint(people_per_floor[0], people_per_floor[1])
for i in range(num_people):
for _ in range(num_people):
# 在10个tick内陆续到达模拟疏散的紧急性
arrival_tick = alarm_tick + random.randint(0, min(10, duration - alarm_tick - 1))
if arrival_tick < duration:
@@ -791,10 +791,12 @@ def generate_traffic_file(scenario: str, output_file: str, scale: Optional[str]
traffic_data = generator_func(**generator_params)
# 准备building配置
num_elevators = params["elevators"]
building_config = {
"floors": params["floors"],
"elevators": params["elevators"],
"elevators": num_elevators,
"elevator_capacity": params["elevator_capacity"],
"elevator_energy_rates": [1.0] * num_elevators, # 每台电梯的能耗率默认为1.0
"scenario": scenario,
"scale": scale,
"description": f"{config['description']} ({scale}规模)",
@@ -835,7 +837,7 @@ def generate_scaled_traffic_files(
if custom_building:
floors = custom_building.get("floors", BUILDING_SCALES[scale]["floors"][0])
elevators = custom_building.get("elevators", BUILDING_SCALES[scale]["elevators"][0])
elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0])
_elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0])
# 重新确定规模
detected_scale = determine_building_scale(floors, elevators)

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""
Utils package for Elevator Saga
工具包
"""
from elevator_saga.utils.logger import LogLevel, debug, error, get_logger, info, set_log_level, warning
__all__ = [
# Logger functions
"debug",
"info",
"warning",
"error",
"get_logger",
"set_log_level",
"LogLevel",
]

View File

@@ -1,25 +0,0 @@
#!/usr/bin/env python3
"""
Debug utilities for Elevator Saga
调试工具模块
"""
# Global debug flag
_debug_enabled: bool = True
def set_debug_mode(enabled: bool) -> None:
"""启用或禁用调试模式"""
global _debug_enabled
_debug_enabled = enabled
def debug_log(message: str) -> None:
"""输出调试信息(如果启用了调试模式)"""
if _debug_enabled:
print(f"[DEBUG] {message}", flush=True)
def is_debug_enabled() -> bool:
"""检查是否启用了调试模式"""
return _debug_enabled

View File

@@ -0,0 +1,208 @@
#!/usr/bin/env python3
"""
Unified logging system for Elevator Saga
统一的日志系统 - 支持多级别、带颜色的日志输出
"""
import os
import sys
from enum import Enum
from typing import Optional
class LogLevel(Enum):
"""日志级别枚举"""
DEBUG = 0
INFO = 1
WARNING = 2
ERROR = 3
@classmethod
def from_string(cls, level_str: str) -> "LogLevel":
"""从字符串转换为日志级别"""
level_map = {
"DEBUG": cls.DEBUG,
"INFO": cls.INFO,
"WARNING": cls.WARNING,
"ERROR": cls.ERROR,
}
return level_map.get(level_str.upper(), cls.DEBUG)
class Color:
"""ANSI颜色代码"""
# 基础颜色
RESET = "\033[0m"
BOLD = "\033[1m"
# 前景色
BLACK = "\033[30m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
# 亮色
BRIGHT_BLACK = "\033[90m"
BRIGHT_RED = "\033[91m"
BRIGHT_GREEN = "\033[92m"
BRIGHT_YELLOW = "\033[93m"
BRIGHT_BLUE = "\033[94m"
BRIGHT_MAGENTA = "\033[95m"
BRIGHT_CYAN = "\033[96m"
BRIGHT_WHITE = "\033[97m"
class Logger:
"""统一日志记录器"""
def __init__(self, name: str = "ElevatorSaga", min_level: LogLevel = LogLevel.INFO, use_color: bool = True):
"""
初始化日志记录器
Args:
name: 日志记录器名称
min_level: 最低日志级别
use_color: 是否使用颜色
"""
self.name = name
self.min_level = min_level
self.use_color = use_color and sys.stdout.isatty()
# 日志级别对应的颜色
self.level_colors = {
LogLevel.DEBUG: Color.BRIGHT_BLACK,
LogLevel.INFO: Color.BRIGHT_CYAN,
LogLevel.WARNING: Color.BRIGHT_YELLOW,
LogLevel.ERROR: Color.BRIGHT_RED,
}
# 日志级别对应的标签
self.level_labels = {
LogLevel.DEBUG: "DEBUG",
LogLevel.INFO: "INFO",
LogLevel.WARNING: "WARNING",
LogLevel.ERROR: "ERROR",
}
def _format_message(self, level: LogLevel, message: str, prefix: Optional[str] = None) -> str:
"""
格式化日志消息
Args:
level: 日志级别
message: 消息内容
prefix: 可选的前缀(如模块名)
Returns:
格式化后的消息
"""
level_label = self.level_labels[level]
if self.use_color:
color = self.level_colors[level]
level_str = f"{color}{level_label:8}{Color.RESET}"
else:
level_str = f"{level_label:8}"
if prefix:
prefix_str = f"[{prefix}] "
else:
prefix_str = ""
return f"{level_str} {prefix_str}{message}"
def _log(self, level: LogLevel, message: str, prefix: Optional[str] = None) -> None:
"""
记录日志
Args:
level: 日志级别
message: 消息内容
prefix: 可选的前缀
"""
if level.value < self.min_level.value:
return
formatted = self._format_message(level, message, prefix)
print(formatted, flush=True)
def debug(self, message: str, prefix: Optional[str] = None) -> None:
"""记录DEBUG级别日志"""
self._log(LogLevel.DEBUG, message, prefix)
def info(self, message: str, prefix: Optional[str] = None) -> None:
"""记录INFO级别日志"""
self._log(LogLevel.INFO, message, prefix)
def warning(self, message: str, prefix: Optional[str] = None) -> None:
"""记录WARNING级别日志"""
self._log(LogLevel.WARNING, message, prefix)
def error(self, message: str, prefix: Optional[str] = None) -> None:
"""记录ERROR级别日志"""
self._log(LogLevel.ERROR, message, prefix)
def set_level(self, level: LogLevel) -> None:
"""设置最低日志级别"""
self.min_level = level
# 全局日志记录器实例
_global_logger: Optional[Logger] = None
def _get_default_log_level() -> LogLevel:
"""从环境变量获取默认日志级别默认为DEBUG"""
env_level = os.environ.get("ELEVATOR_LOG_LEVEL", "DEBUG")
return LogLevel.from_string(env_level)
def get_logger(name: str = "ElevatorSaga", min_level: Optional[LogLevel] = None) -> Logger:
"""
获取全局日志记录器
Args:
name: 日志记录器名称
min_level: 最低日志级别如果为None则从环境变量读取默认DEBUG
Returns:
Logger实例
"""
global _global_logger
if _global_logger is None:
if min_level is None:
min_level = _get_default_log_level()
_global_logger = Logger(name, min_level)
return _global_logger
def set_log_level(level: LogLevel) -> None:
"""设置全局日志级别"""
logger = get_logger()
logger.set_level(level)
# 便捷函数
def debug(message: str, prefix: Optional[str] = None) -> None:
"""记录DEBUG日志"""
get_logger().debug(message, prefix)
def info(message: str, prefix: Optional[str] = None) -> None:
"""记录INFO日志"""
get_logger().info(message, prefix)
def warning(message: str, prefix: Optional[str] = None) -> None:
"""记录WARNING日志"""
get_logger().warning(message, prefix)
def error(message: str, prefix: Optional[str] = None) -> None:
"""记录ERROR日志"""
get_logger().error(message, prefix)

View File

@@ -30,7 +30,8 @@ classifiers = [
dependencies = [
"numpy>=1.20.0",
"flask>=2.0.0",
"quart>=0.19.0",
"hypercorn>=0.15.0",
]
[project.optional-dependencies]

View File

@@ -12,8 +12,7 @@
"docs/_build",
"elevatorpy.egg-info",
"elevator_saga.egg-info",
".eggs",
"MsgCenterPy"
".eggs"
],
"pythonVersion": "3.10",
"typeCheckingMode": "basic",