9 Commits

Author SHA1 Message Date
Xuwznln
1031e677e1 Bump version: 0.0.6 → 0.0.7 2025-10-09 16:49:15 +08:00
Xuwznln
889d554f19 Fix: remove abstractmethod decroation for on_elevator_move 2025-10-09 16:49:07 +08:00
Xuwznln
ee3c4bab7e Bump version: 0.0.5 → 0.0.6 2025-10-09 16:41:04 +08:00
Xuwznln
99524eee3d Add: elevator move event 2025-10-09 16:40:51 +08:00
Xuwznln
b2d03b2510 Bump version: 0.0.4 → 0.0.5 2025-10-06 15:16:47 +08:00
Xuwznln
1a8063e4fd fix performance calculation. fix floor error in approaching event. fix passenger board wrongly. 2025-10-06 15:16:35 +08:00
Xuwznln
692b853101 Update bug report template 2025-10-06 14:36:53 +08:00
Xuwznln
4c86816920 Update docs 2025-10-01 18:32:03 +08:00
Xuwznln
349511e00f Bump version: 0.0.3 → 0.0.4 2025-10-01 18:07:21 +08:00
16 changed files with 276 additions and 288 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.3
current_version = 0.0.7
commit = True
tag = True
tag_name = v{new_version}

View File

@@ -0,0 +1,30 @@
---
name: Bug Report
about: Report a bug in the project
title: "[BUG] "
labels: bug
assignees: ""
---
**Version Information**
- elevator-py version: [e.g. v0.0.5]
- Python version: [e.g. 3.11]
**Bug Description**
A clear and concise description of the bug
**Steps to Reproduce**
1.
2.
3.
**Expected Behavior**
What you expected to happen
**Actual Behavior**
What actually happened
**Additional Context (Optional)**
Any other information that might help, such as error logs, screenshots, etc.

View File

@@ -30,20 +30,6 @@ Elevator Saga is a Python implementation of an elevator [simulation game](https:
pip install elevator-py
```
### With Development Dependencies
```bash
pip install elevator-py[dev]
```
### From Source
```bash
git clone https://github.com/ZGCA-Forge/Elevator.git
cd Elevator
pip install -e .[dev]
```
## Quick Start
### Running the Game

View File

@@ -176,7 +176,7 @@ Dynamic proxy for ``PassengerInfo`` that provides access to passenger informatio
if passenger.status == PassengerStatus.IN_ELEVATOR:
print(f"In elevator {passenger.elevator_id}")
print(f"Waited {passenger.wait_time} ticks")
print(f"Waited {passenger.floor_wait_time} ticks")
Read-Only Protection
~~~~~~~~~~~~~~~~~~~~
@@ -284,6 +284,7 @@ The controller provides these event handlers:
- ``on_passenger_alight(elevator, passenger, floor)``: Passenger alights
- ``on_elevator_passing_floor(elevator, floor, direction)``: Elevator passes floor
- ``on_elevator_approaching(elevator, floor, direction)``: Elevator about to arrive
- ``on_elevator_move(elevator, from_position, to_position, direction, status)``: Elevator moves
Complete Example
----------------
@@ -343,25 +344,6 @@ Benefits of Proxy Architecture
5. **Separation of Concerns**: State management handled by proxies, logic in controller
6. **Testability**: Can mock API client for unit tests
Performance Considerations
--------------------------
Each attribute access triggers an API call. For better performance:
.. code-block:: python
# ❌ Inefficient - multiple API calls
if elevator.current_floor < elevator.target_floor:
diff = elevator.target_floor - elevator.current_floor
# ✅ Better - store references
current = elevator.current_floor
target = elevator.target_floor
if current < target:
diff = target - current
However, the API client implements **caching within a single tick**, so multiple accesses during event processing are efficient.
Next Steps
----------

View File

@@ -17,9 +17,9 @@ Architecture Overview
│ api_client │ │ │
└─────────────────┘ └─────────────────┘
│ │
│ GET /api/state │
│ POST /api/step │
│ POST /api/elevators/:id/go_to_floor │
│ GET /api/state
│ POST /api/step
│ POST /api/elevators/:id/go_to_floor
│ │
└───────────────────────────────────────┘
@@ -394,37 +394,37 @@ Typical communication sequence during one tick:
.. code-block:: text
Client Server
│ │
│ 1. GET /api/state │
├────────────────────────────────────►│
│ ◄── SimulationState (cached) │
│ │
│ 2. Analyze state, make decisions │
│ │
│ 3. POST /api/elevators/0/go_to_floor
├────────────────────────────────────►│
│ ◄── {"success": true} │
│ │
│ 4. GET /api/state (from cache) │
│ No HTTP request! │
│ │
│ 5. POST /api/step │
├────────────────────────────────────►│
│ Server processes tick │
│ - Moves elevators │
│ - Boards/alights passengers │
│ - Generates events │
│ ◄── {tick: 43, events: [...]} │
│ │
│ 6. Process events │
│ Cache invalidated │
│ │
│ 7. GET /api/state (fetches fresh) │
├────────────────────────────────────►│
│ ◄── SimulationState │
│ │
└─────────────────────────────────────┘
Client Server
│ 1. GET /api/state
├────────────────────────────────────►│
│ ◄── SimulationState (cached)
│ 2. Analyze state, make decisions
│ 3. POST /api/elevators/0/go_to_floor│
├────────────────────────────────────►│
│ ◄── {"success": true}
│ 4. GET /api/state (from cache)
│ No HTTP request!
│ 5. POST /api/step
├────────────────────────────────────►│
│ Server processes tick
│ - Moves elevators
│ - Boards/alights passengers
│ - Generates events
│ ◄── {tick: 43, events: [...]}
│ 6. Process events
│ Cache invalidated
│ 7. GET /api/state (fetches fresh)
├────────────────────────────────────►│
│ ◄── SimulationState
└─────────────────────────────────────
Error Handling
--------------
@@ -476,23 +476,6 @@ The simulator uses a lock to ensure thread-safe access:
This allows Flask to handle concurrent requests safely.
Performance Considerations
--------------------------
**Minimize HTTP Calls**:
.. code-block:: python
# ❌ Bad - 3 HTTP calls
for i in range(3):
state = api_client.get_state()
print(state.tick)
# ✅ Good - 1 HTTP call (cached)
state = api_client.get_state()
for i in range(3):
print(state.tick)
**Batch Commands**:
.. code-block:: python

View File

@@ -8,21 +8,21 @@ Simulation Overview
.. code-block:: text
┌─────────────────────────────────────────────────────────┐
┌─────────────────────────────────────────────────────────
│ Simulation Loop │
│ │
│ Tick N │
│ 1. Update elevator status (START_UP → CONSTANT_SPEED)│
│ 1. Update elevator status (START_UP → CONSTANT_SPEED)
│ 2. Process arrivals (new passengers) │
│ 3. Move elevators (physics simulation) │
│ 4. Process stops (boarding/alighting) │
│ 5. Generate events │
│ │
│ Events sent to client → Client processes → Commands │
│ Events sent to client → Client processes → Commands
│ │
│ Tick N+1 │
│ (repeat...) │
└─────────────────────────────────────────────────────────┘
└─────────────────────────────────────────────────────────
Tick-Based Execution
--------------------
@@ -202,7 +202,7 @@ Event System
Event Types
~~~~~~~~~~~
The simulation generates 8 types of events defined in ``EventType`` enum:
The simulation generates 9 types of events defined in ``EventType`` enum:
.. code-block:: python
@@ -215,6 +215,7 @@ The simulation generates 8 types of events defined in ``EventType`` enum:
IDLE = "idle"
PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move"
Event Generation
~~~~~~~~~~~~~~~~
@@ -260,6 +261,19 @@ Events are generated during tick processing:
for elevator in self.elevators:
# ... movement logic ...
# Elevator moves
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
}
)
# Passing a floor
if old_floor != new_floor and new_floor != target_floor:
self._emit_event(
@@ -277,7 +291,7 @@ Events are generated during tick processing:
EventType.ELEVATOR_APPROACHING,
{
"elevator": elevator.id,
"floor": elevator.target_floor,
"floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value
}
)
@@ -363,6 +377,14 @@ The ``ElevatorController`` base class automatically routes events to handler met
elevator = self.elevators[event.data["elevator"]]
self.on_elevator_idle(elevator)
elif event.type == EventType.ELEVATOR_MOVE:
elevator = self.elevators[event.data["elevator"]]
from_position = event.data["from_position"]
to_position = event.data["to_position"]
direction = event.data["direction"]
status = event.data["status"]
self.on_elevator_move(elevator, from_position, to_position, direction, status)
# ... other event types ...
Control Flow: Bus Example
@@ -463,27 +485,6 @@ Here's what happens in a typical tick:
Key Timing Concepts
-------------------
Command vs. Execution
~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
# Tick 42: Controller sends command
elevator.go_to_floor(5, immediate=False)
# ← Command queued in elevator.next_target_floor
# Tick 43: Server processes
# ← _update_elevator_status() assigns target
# ← Elevator starts moving
# Tick 44-46: Elevator in motion
# ← Events: PASSING_FLOOR
# Tick 47: Elevator arrives
# ← Event: STOPPED_AT_FLOOR
There's a **one-tick delay** between command and execution (unless ``immediate=True``).
Immediate vs. Queued
~~~~~~~~~~~~~~~~~~~~
@@ -508,69 +509,39 @@ Metrics are calculated from passenger data:
.. code-block:: python
def _calculate_metrics(self) -> MetricsResponse:
def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics"""
completed = [p for p in self.state.passengers.values()
if p.status == PassengerStatus.COMPLETED]
wait_times = [float(p.wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed]
floor_wait_times = [float(p.floor_wait_time) for p in completed]
arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
return MetricsResponse(
done=len(completed),
total=len(self.state.passengers),
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0,
p95_wait=percentile(wait_times, 95),
avg_system=sum(system_times) / len(system_times) if system_times else 0,
p95_system=percentile(system_times, 95),
def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
"""计算排除掉最长的指定百分比后的平均值"""
if not data:
return 0.0
sorted_data = sorted(data)
keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
if keep_count == 0:
return 0.0
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return PerformanceMetrics(
completed_passengers=len(completed),
total_passengers=len(self.state.passengers),
average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
)
Key metrics:
- **Wait time**: ``pickup_tick - arrive_tick`` (how long passenger waited)
- **System time**: ``dropoff_tick - arrive_tick`` (total time in system)
- **P95**: 95th percentile (worst-case for most passengers)
Best Practices
--------------
1. **React to Events**: Don't poll state - implement event handlers
2. **Use Queued Commands**: Default ``immediate=False`` is safer
3. **Track Passengers**: Monitor ``on_passenger_call`` to know demand
4. **Optimize for Wait Time**: Reduce time between arrival and pickup
5. **Consider Load**: Check ``elevator.is_full`` before dispatching
6. **Handle Idle**: Always give idle elevators something to do (even if it's "go to floor 0")
Example: Efficient Dispatch
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
"""Dispatch nearest suitable elevator"""
best_elevator = None
best_cost = float('inf')
for elevator in self.elevators:
# Skip if full
if elevator.is_full:
continue
# Calculate cost (distance + current load)
distance = abs(elevator.current_floor - floor.floor)
load_penalty = elevator.load_factor * 10
cost = distance + load_penalty
# Check if going in right direction
if elevator.target_floor_direction.value == direction:
cost *= 0.5 # Prefer elevators already going that way
if cost < best_cost:
best_cost = cost
best_elevator = elevator
if best_elevator:
best_elevator.go_to_floor(floor.floor)
- **Floor wait time**: ``pickup_tick - arrive_tick`` (在楼层等待的时间,从到达到上电梯)
- **Arrival wait time**: ``dropoff_tick - arrive_tick`` (总等待时间,从到达到下电梯)
- **P95 metrics**: 排除掉最长的5%时间后计算剩余95%的平均值
Summary
-------

View File

@@ -42,22 +42,6 @@ Basic Installation
pip install elevator-py
With Development Dependencies
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
pip install elevator-py[dev]
From Source
~~~~~~~~~~~
.. code-block:: bash
git clone https://github.com/ZGCA-Forge/Elevator.git
cd Elevator
pip install -e .[dev]
Quick Start
-----------

View File

@@ -257,10 +257,10 @@ Tracks simulation performance:
class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0
total_passengers: int = 0
average_wait_time: float = 0.0
p95_wait_time: float = 0.0 # 95th percentile
average_system_time: float = 0.0
p95_system_time: float = 0.0 # 95th percentile
average_floor_wait_time: float = 0.0
p95_floor_wait_time: float = 0.0 # 95th percentile
average_arrival_wait_time: float = 0.0
p95_arrival_wait_time: float = 0.0 # 95th percentile
Properties:

View File

@@ -6,5 +6,5 @@ A Python implementation of the Elevator Saga game with event-driven architecture
realistic elevator dispatch algorithm development and testing.
"""
__version__ = "0.0.3"
__version__ = "0.0.7"
__author__ = "ZGCA Team"

View File

@@ -6,7 +6,7 @@ Unified API Client for Elevator Saga
import json
import urllib.error
import urllib.request
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional
from elevator_saga.core.models import (
ElevatorState,
@@ -63,16 +63,8 @@ class ElevatorAPIClient:
# 使用服务端返回的metrics数据
metrics_data = response_data.get("metrics", {})
if metrics_data:
# 转换为PerformanceMetrics格式
metrics = PerformanceMetrics(
completed_passengers=metrics_data.get("done", 0),
total_passengers=metrics_data.get("total", 0),
average_wait_time=metrics_data.get("avg_wait", 0),
p95_wait_time=metrics_data.get("p95_wait", 0),
average_system_time=metrics_data.get("avg_system", 0),
p95_system_time=metrics_data.get("p95_system", 0),
# total_energy_consumption=metrics_data.get("energy_total", 0),
)
# 直接从字典创建PerformanceMetrics对象
metrics = PerformanceMetrics.from_dict(metrics_data)
else:
metrics = PerformanceMetrics()
@@ -131,7 +123,7 @@ class ElevatorAPIClient:
else:
raise RuntimeError(f"Step failed: {response_data.get('error')}")
def send_elevator_command(self, command: Union[GoToFloorCommand]) -> bool:
def send_elevator_command(self, command: GoToFloorCommand) -> bool:
"""发送电梯命令"""
endpoint = self._get_elevator_endpoint(command)
debug_log(
@@ -156,7 +148,7 @@ class ElevatorAPIClient:
debug_log(f"Go to floor failed: {e}")
return False
def _get_elevator_endpoint(self, command: Union[GoToFloorCommand]) -> str:
def _get_elevator_endpoint(self, command: GoToFloorCommand) -> str:
"""获取电梯命令端点"""
base = f"/api/elevators/{command.elevator_id}"

View File

@@ -7,7 +7,7 @@ import os
import time
from abc import ABC, abstractmethod
from pprint import pprint
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from elevator_saga.client.api_client import ElevatorAPIClient
from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger
@@ -171,6 +171,22 @@ class ElevatorController(ABC):
"""
pass
# @abstractmethod 为了兼容性暂不强制要求elevator_move必须实现
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调 - 可选实现
Args:
elevator: 电梯代理对象
from_position: 起始位置(浮点数表示楼层)
to_position: 目标位置(浮点数表示楼层)
direction: 移动方向
status: 电梯运行状态
"""
pass
def _internal_init(self, elevators: List[Any], floors: List[Any]) -> None:
"""内部初始化方法"""
self.elevators = elevators
@@ -218,7 +234,7 @@ class ElevatorController(ABC):
# 获取初始状态并初始化默认从0开始
try:
state = self.api_client.get_state()
except ConnectionResetError as ex:
except ConnectionResetError as _: # noqa: F841
print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}")
os._exit(1)
if state.tick > 0:
@@ -352,9 +368,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction_str)
self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction)
elif event.type == EventType.ELEVATOR_APPROACHING:
elevator_id = event.data.get("elevator")
@@ -363,9 +377,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_approaching(elevator_proxy, floor_proxy, direction_str)
self.on_elevator_approaching(elevator_proxy, floor_proxy, direction)
elif event.type == EventType.PASSENGER_BOARD:
elevator_id = event.data.get("elevator")
@@ -385,6 +397,22 @@ class ElevatorController(ABC):
floor_proxy = ProxyFloor(floor_id, self.api_client)
self.on_passenger_alight(elevator_proxy, passenger_proxy, floor_proxy)
elif event.type == EventType.ELEVATOR_MOVE:
elevator_id = event.data.get("elevator")
from_position = event.data.get("from_position")
to_position = event.data.get("to_position")
direction = event.data.get("direction")
status = event.data.get("status")
if (
elevator_id is not None
and from_position is not None
and to_position is not None
and direction is not None
and status is not None
):
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
self.on_elevator_move(elevator_proxy, from_position, to_position, direction, status)
def _reset_and_reinit(self) -> None:
"""重置并重新初始化"""
try:

View File

@@ -41,6 +41,7 @@ class ElevatorBusExampleController(ElevatorController):
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
self.all_passengers.append(passenger)
print(f"乘客 {passenger.id} F{floor.floor} 请求 {passenger.origin} -> {passenger.destination} ({direction})")
pass
def on_elevator_idle(self, elevator: ProxyElevator) -> None:
@@ -60,10 +61,10 @@ class ElevatorBusExampleController(ElevatorController):
elevator.go_to_floor(elevator.current_floor - 1)
def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None:
pass
print(f" 乘客{passenger.id} E{elevator.id}⬆️ F{elevator.current_floor} -> F{passenger.destination}")
def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None:
pass
print(f" 乘客{passenger.id} E{elevator.id}⬇️ F{floor.floor}")
def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass
@@ -71,6 +72,11 @@ class ElevatorBusExampleController(ElevatorController):
def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
pass
if __name__ == "__main__":
algorithm = ElevatorBusExampleController()

View File

@@ -139,6 +139,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True)

View File

@@ -8,7 +8,7 @@ import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
# 类型变量
T = TypeVar("T", bound="SerializableModel")
@@ -55,6 +55,7 @@ class EventType(Enum):
IDLE = "idle"
PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move" # 电梯移动事件
class SerializableModel:
@@ -113,7 +114,7 @@ class Position(SerializableModel):
@property
def current_floor_float(self) -> float:
return self.current_floor + self.floor_up_position / 10
return round(self.current_floor + self.floor_up_position / 10, 1)
def floor_up_position_add(self, num: int) -> int:
self.floor_up_position += num
@@ -174,13 +175,13 @@ class PassengerInfo(SerializableModel):
return PassengerStatus.WAITING
@property
def wait_time(self) -> int:
"""等待时间"""
def floor_wait_time(self) -> int:
"""在楼层等待时间(从到达到上电梯)"""
return self.pickup_tick - self.arrive_tick
@property
def system_time(self) -> int:
"""系统时间(总时间"""
def arrival_wait_time(self) -> int:
"""总等待时间(从到达到下电梯"""
return self.dropoff_tick - self.arrive_tick
@property
@@ -331,10 +332,10 @@ class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0
total_passengers: int = 0
average_wait_time: float = 0.0
p95_wait_time: float = 0.0
average_system_time: float = 0.0
p95_system_time: float = 0.0
average_floor_wait_time: float = 0.0
p95_floor_wait_time: float = 0.0
average_arrival_wait_time: float = 0.0
p95_arrival_wait_time: float = 0.0
# total_energy_consumption: float = 0.0
@property

View File

@@ -138,6 +138,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True)

View File

@@ -10,7 +10,7 @@ import threading
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, cast
from flask import Flask, Response, request
@@ -22,6 +22,7 @@ from elevator_saga.core.models import (
FloorState,
PassengerInfo,
PassengerStatus,
PerformanceMetrics,
SerializableModel,
SimulationEvent,
SimulationState,
@@ -89,19 +90,6 @@ def json_response(data: Any, status: int = 200) -> Response | tuple[Response, in
return response, status
@dataclass
class MetricsResponse(SerializableModel):
"""性能指标响应"""
done: int
total: int
avg_wait: float
p95_wait: float
avg_system: float
p95_system: float
energy_total: float
@dataclass
class PassengerSummary(SerializableModel):
"""乘客摘要"""
@@ -120,7 +108,7 @@ class SimulationStateResponse(SerializableModel):
elevators: List[ElevatorState]
floors: List[FloorState]
passengers: Dict[int, PassengerInfo]
metrics: MetricsResponse
metrics: PerformanceMetrics
class ElevatorSimulation:
@@ -286,44 +274,42 @@ class ElevatorSimulation:
# 2. Move elevators
self._move_elevators()
# 3. Process elevator stops and passenger boarding/alighting
# 3. Process elevator stops and passenger alighting
self._process_elevator_stops()
# Return events generated this tick
return self.state.events[events_start:]
def _process_passenger_in(self) -> None:
for elevator in self.elevators:
current_floor = elevator.current_floor
# 处于Stopped状态方向也已经清空说明没有调度。
floor = self.floors[current_floor]
passengers_to_board: List[int] = []
available_capacity = elevator.max_capacity - len(elevator.passengers)
# Board passengers going up (if up indicator is on or no direction set)
if elevator.target_floor_direction == Direction.UP:
passengers_to_board.extend(floor.up_queue[:available_capacity])
floor.up_queue = floor.up_queue[available_capacity:]
def _process_passenger_in(self, elevator: ElevatorState) -> None:
current_floor = elevator.current_floor
# 处于Stopped状态方向也已经清空说明没有调度。
floor = self.floors[current_floor]
passengers_to_board: List[int] = []
available_capacity = elevator.max_capacity - len(elevator.passengers)
# Board passengers going up (if up indicator is on or no direction set)
if elevator.target_floor_direction == Direction.UP:
passengers_to_board.extend(floor.up_queue[:available_capacity])
floor.up_queue = floor.up_queue[available_capacity:]
# Board passengers going down (if down indicator is on or no direction set)
if elevator.target_floor_direction == Direction.DOWN:
passengers_to_board.extend(floor.down_queue[:available_capacity])
floor.down_queue = floor.down_queue[available_capacity:]
# Board passengers going down (if down indicator is on or no direction set)
if elevator.target_floor_direction == Direction.DOWN:
passengers_to_board.extend(floor.down_queue[:available_capacity])
floor.down_queue = floor.down_queue[available_capacity:]
# Process boarding
for passenger_id in passengers_to_board:
passenger = self.passengers[passenger_id]
passenger.pickup_tick = self.tick
passenger.elevator_id = elevator.id
elevator.passengers.append(passenger_id)
self._emit_event(
EventType.PASSENGER_BOARD,
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
)
# Process boarding
for passenger_id in passengers_to_board:
passenger = self.passengers[passenger_id]
passenger.pickup_tick = self.tick
passenger.elevator_id = elevator.id
elevator.passengers.append(passenger_id)
self._emit_event(
EventType.PASSENGER_BOARD,
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
)
def _update_elevator_status(self) -> None:
"""更新电梯运行状态"""
for elevator in self.elevators:
current_floor = elevator.position.current_floor
target_floor = elevator.target_floor
old_status = elevator.run_status.value
# 没有移动方向,说明电梯已经到达目标楼层
@@ -331,7 +317,7 @@ class ElevatorSimulation:
if elevator.next_target_floor is not None:
self._set_elevator_target_floor(elevator, elevator.next_target_floor)
self._process_passenger_in()
self._process_passenger_in(elevator)
elevator.next_target_floor = None
else:
continue
@@ -391,6 +377,7 @@ class ElevatorSimulation:
# 根据状态和方向调整移动距离
elevator.last_tick_direction = elevator.target_floor_direction
old_position = elevator.position.current_floor_float
if elevator.target_floor_direction == Direction.UP:
new_floor = elevator.position.floor_up_position_add(movement_speed)
elif elevator.target_floor_direction == Direction.DOWN:
@@ -399,6 +386,19 @@ class ElevatorSimulation:
# 之前的状态已经是到站了,清空上一次到站的方向
pass
# 发送电梯移动事件
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
},
)
# 移动后检测是否即将到站,从匀速状态切换到减速
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
# 检查是否需要开始减速这里加速减速设置路程为1匀速路程为2这样能够保证不会匀速恰好到达必须加减速
@@ -411,7 +411,7 @@ class ElevatorSimulation:
EventType.ELEVATOR_APPROACHING,
{
"elevator": elevator.id,
"floor": elevator.target_floor,
"floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value,
},
)
@@ -478,7 +478,6 @@ class ElevatorSimulation:
[SERVER-DEBUG] 电梯 E0 被设定为前往 F1
说明电梯处于stop状态这个tick直接采用下一个目的地运行了
"""
original_target_floor = elevator.target_floor
elevator.position.target_floor = floor
server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}")
new_target_floor_should_accel = self._should_start_deceleration(elevator)
@@ -547,41 +546,45 @@ class ElevatorSimulation:
metrics=metrics,
)
def _calculate_metrics(self) -> MetricsResponse:
def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics"""
# 直接从state中筛选已完成的乘客
completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED]
total_passengers = len(self.state.passengers)
if not completed:
return MetricsResponse(
done=0,
total=total_passengers,
avg_wait=0,
p95_wait=0,
avg_system=0,
p95_system=0,
energy_total=sum(e.energy_consumed for e in self.elevators),
return PerformanceMetrics(
completed_passengers=0,
total_passengers=total_passengers,
average_floor_wait_time=0,
p95_floor_wait_time=0,
average_arrival_wait_time=0,
p95_arrival_wait_time=0,
)
wait_times = [float(p.wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed]
floor_wait_times = [float(p.floor_wait_time) for p in completed]
arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
def percentile(data: List[float], p: int) -> float:
def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
"""计算排除掉最长的指定百分比后的平均值"""
if not data:
return 0.0
sorted_data = sorted(data)
index = int(len(sorted_data) * p / 100)
return sorted_data[min(index, len(sorted_data) - 1)]
# 计算要保留的数据数量(排除掉最长的 exclude_percent
keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
if keep_count == 0:
return 0.0
# 只保留前 keep_count 个数据,排除最长的部分
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return MetricsResponse(
done=len(completed),
total=total_passengers,
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0,
p95_wait=percentile(wait_times, 95),
avg_system=sum(system_times) / len(system_times) if system_times else 0,
p95_system=percentile(system_times, 95),
energy_total=sum(e.energy_consumed for e in self.elevators),
return PerformanceMetrics(
completed_passengers=len(completed),
total_passengers=total_passengers,
average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
)
def get_events(self, since_tick: int = 0) -> List[SimulationEvent]: