13 Commits
0.0.4 ... 0.0.9

Author SHA1 Message Date
Xuwznln
b4b99daead Bump version: 0.0.8 → 0.0.9 2025-10-16 01:20:39 +08:00
Xuwznln
d44ba8b6cd Update gitignore 2025-10-16 01:20:11 +08:00
Xuwznln
71e8f2a451 Feat: add energy rate for elevators 2025-10-15 20:46:47 +08:00
Xuwznln
4b60359894 Bump version: 0.0.7 → 0.0.8 2025-10-12 02:14:27 +08:00
Xuwznln
0157496e6f Fix: client completed_passengers calculation error 2025-10-12 02:14:18 +08:00
Xuwznln
1031e677e1 Bump version: 0.0.6 → 0.0.7 2025-10-09 16:49:15 +08:00
Xuwznln
889d554f19 Fix: remove abstractmethod decroation for on_elevator_move 2025-10-09 16:49:07 +08:00
Xuwznln
ee3c4bab7e Bump version: 0.0.5 → 0.0.6 2025-10-09 16:41:04 +08:00
Xuwznln
99524eee3d Add: elevator move event 2025-10-09 16:40:51 +08:00
Xuwznln
b2d03b2510 Bump version: 0.0.4 → 0.0.5 2025-10-06 15:16:47 +08:00
Xuwznln
1a8063e4fd fix performance calculation. fix floor error in approaching event. fix passenger board wrongly. 2025-10-06 15:16:35 +08:00
Xuwznln
692b853101 Update bug report template 2025-10-06 14:36:53 +08:00
Xuwznln
4c86816920 Update docs 2025-10-01 18:32:03 +08:00
18 changed files with 305 additions and 301 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion] [bumpversion]
current_version = 0.0.4 current_version = 0.0.9
commit = True commit = True
tag = True tag = True
tag_name = v{new_version} tag_name = v{new_version}

View File

@@ -0,0 +1,30 @@
---
name: Bug Report
about: Report a bug in the project
title: "[BUG] "
labels: bug
assignees: ""
---
**Version Information**
- elevator-py version: [e.g. v0.0.5]
- Python version: [e.g. 3.11]
**Bug Description**
A clear and concise description of the bug
**Steps to Reproduce**
1.
2.
3.
**Expected Behavior**
What you expected to happen
**Actual Behavior**
What actually happened
**Additional Context (Optional)**
Any other information that might help, such as error logs, screenshots, etc.

1
.gitignore vendored
View File

@@ -1,6 +1,7 @@
# ================================ # ================================
# Python-related files # Python-related files
# ================================ # ================================
elevator_saga/traffic/test_cases.py
# Compiled Python files # Compiled Python files
__pycache__/ __pycache__/

View File

@@ -30,20 +30,6 @@ Elevator Saga is a Python implementation of an elevator [simulation game](https:
pip install elevator-py pip install elevator-py
``` ```
### With Development Dependencies
```bash
pip install elevator-py[dev]
```
### From Source
```bash
git clone https://github.com/ZGCA-Forge/Elevator.git
cd Elevator
pip install -e .[dev]
```
## Quick Start ## Quick Start
### Running the Game ### Running the Game

View File

@@ -176,7 +176,7 @@ Dynamic proxy for ``PassengerInfo`` that provides access to passenger informatio
if passenger.status == PassengerStatus.IN_ELEVATOR: if passenger.status == PassengerStatus.IN_ELEVATOR:
print(f"In elevator {passenger.elevator_id}") print(f"In elevator {passenger.elevator_id}")
print(f"Waited {passenger.wait_time} ticks") print(f"Waited {passenger.floor_wait_time} ticks")
Read-Only Protection Read-Only Protection
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
@@ -284,6 +284,7 @@ The controller provides these event handlers:
- ``on_passenger_alight(elevator, passenger, floor)``: Passenger alights - ``on_passenger_alight(elevator, passenger, floor)``: Passenger alights
- ``on_elevator_passing_floor(elevator, floor, direction)``: Elevator passes floor - ``on_elevator_passing_floor(elevator, floor, direction)``: Elevator passes floor
- ``on_elevator_approaching(elevator, floor, direction)``: Elevator about to arrive - ``on_elevator_approaching(elevator, floor, direction)``: Elevator about to arrive
- ``on_elevator_move(elevator, from_position, to_position, direction, status)``: Elevator moves
Complete Example Complete Example
---------------- ----------------
@@ -343,25 +344,6 @@ Benefits of Proxy Architecture
5. **Separation of Concerns**: State management handled by proxies, logic in controller 5. **Separation of Concerns**: State management handled by proxies, logic in controller
6. **Testability**: Can mock API client for unit tests 6. **Testability**: Can mock API client for unit tests
Performance Considerations
--------------------------
Each attribute access triggers an API call. For better performance:
.. code-block:: python
# ❌ Inefficient - multiple API calls
if elevator.current_floor < elevator.target_floor:
diff = elevator.target_floor - elevator.current_floor
# ✅ Better - store references
current = elevator.current_floor
target = elevator.target_floor
if current < target:
diff = target - current
However, the API client implements **caching within a single tick**, so multiple accesses during event processing are efficient.
Next Steps Next Steps
---------- ----------

View File

@@ -397,20 +397,20 @@ Typical communication sequence during one tick:
Client Server Client Server
│ │ │ │
│ 1. GET /api/state │ │ 1. GET /api/state │
├────────────────────────────────────►│ ├────────────────────────────────────►│
│ ◄── SimulationState (cached) │ │ ◄── SimulationState (cached) │
│ │ │ │
│ 2. Analyze state, make decisions │ │ 2. Analyze state, make decisions │
│ │ │ │
│ 3. POST /api/elevators/0/go_to_floor│ │ 3. POST /api/elevators/0/go_to_floor│
├────────────────────────────────────►│ ├────────────────────────────────────►│
│ ◄── {"success": true} │ │ ◄── {"success": true} │
│ │ │ │
│ 4. GET /api/state (from cache) │ │ 4. GET /api/state (from cache) │
│ No HTTP request! │ │ No HTTP request! │
│ │ │ │
│ 5. POST /api/step │ │ 5. POST /api/step │
├────────────────────────────────────►│ ├────────────────────────────────────►│
│ Server processes tick │ │ Server processes tick │
│ - Moves elevators │ │ - Moves elevators │
│ - Boards/alights passengers │ │ - Boards/alights passengers │
@@ -421,10 +421,10 @@ Typical communication sequence during one tick:
│ Cache invalidated │ │ Cache invalidated │
│ │ │ │
│ 7. GET /api/state (fetches fresh) │ │ 7. GET /api/state (fetches fresh) │
├────────────────────────────────────►│ ├────────────────────────────────────►│
│ ◄── SimulationState │ │ ◄── SimulationState │
│ │ │ │
└─────────────────────────────────────┘ └─────────────────────────────────────
Error Handling Error Handling
-------------- --------------
@@ -476,23 +476,6 @@ The simulator uses a lock to ensure thread-safe access:
This allows Flask to handle concurrent requests safely. This allows Flask to handle concurrent requests safely.
Performance Considerations
--------------------------
**Minimize HTTP Calls**:
.. code-block:: python
# ❌ Bad - 3 HTTP calls
for i in range(3):
state = api_client.get_state()
print(state.tick)
# ✅ Good - 1 HTTP call (cached)
state = api_client.get_state()
for i in range(3):
print(state.tick)
**Batch Commands**: **Batch Commands**:
.. code-block:: python .. code-block:: python

View File

@@ -8,7 +8,7 @@ Simulation Overview
.. code-block:: text .. code-block:: text
┌─────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────
│ Simulation Loop │ │ Simulation Loop │
│ │ │ │
│ Tick N │ │ Tick N │
@@ -22,7 +22,7 @@ Simulation Overview
│ │ │ │
│ Tick N+1 │ │ Tick N+1 │
│ (repeat...) │ │ (repeat...) │
└─────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────
Tick-Based Execution Tick-Based Execution
-------------------- --------------------
@@ -202,7 +202,7 @@ Event System
Event Types Event Types
~~~~~~~~~~~ ~~~~~~~~~~~
The simulation generates 8 types of events defined in ``EventType`` enum: The simulation generates 9 types of events defined in ``EventType`` enum:
.. code-block:: python .. code-block:: python
@@ -215,6 +215,7 @@ The simulation generates 8 types of events defined in ``EventType`` enum:
IDLE = "idle" IDLE = "idle"
PASSENGER_BOARD = "passenger_board" PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight" PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move"
Event Generation Event Generation
~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~
@@ -260,6 +261,19 @@ Events are generated during tick processing:
for elevator in self.elevators: for elevator in self.elevators:
# ... movement logic ... # ... movement logic ...
# Elevator moves
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
}
)
# Passing a floor # Passing a floor
if old_floor != new_floor and new_floor != target_floor: if old_floor != new_floor and new_floor != target_floor:
self._emit_event( self._emit_event(
@@ -277,7 +291,7 @@ Events are generated during tick processing:
EventType.ELEVATOR_APPROACHING, EventType.ELEVATOR_APPROACHING,
{ {
"elevator": elevator.id, "elevator": elevator.id,
"floor": elevator.target_floor, "floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value "direction": elevator.target_floor_direction.value
} }
) )
@@ -363,6 +377,14 @@ The ``ElevatorController`` base class automatically routes events to handler met
elevator = self.elevators[event.data["elevator"]] elevator = self.elevators[event.data["elevator"]]
self.on_elevator_idle(elevator) self.on_elevator_idle(elevator)
elif event.type == EventType.ELEVATOR_MOVE:
elevator = self.elevators[event.data["elevator"]]
from_position = event.data["from_position"]
to_position = event.data["to_position"]
direction = event.data["direction"]
status = event.data["status"]
self.on_elevator_move(elevator, from_position, to_position, direction, status)
# ... other event types ... # ... other event types ...
Control Flow: Bus Example Control Flow: Bus Example
@@ -463,27 +485,6 @@ Here's what happens in a typical tick:
Key Timing Concepts Key Timing Concepts
------------------- -------------------
Command vs. Execution
~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
# Tick 42: Controller sends command
elevator.go_to_floor(5, immediate=False)
# ← Command queued in elevator.next_target_floor
# Tick 43: Server processes
# ← _update_elevator_status() assigns target
# ← Elevator starts moving
# Tick 44-46: Elevator in motion
# ← Events: PASSING_FLOOR
# Tick 47: Elevator arrives
# ← Event: STOPPED_AT_FLOOR
There's a **one-tick delay** between command and execution (unless ``immediate=True``).
Immediate vs. Queued Immediate vs. Queued
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
@@ -508,69 +509,39 @@ Metrics are calculated from passenger data:
.. code-block:: python .. code-block:: python
def _calculate_metrics(self) -> MetricsResponse: def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics""" """Calculate performance metrics"""
completed = [p for p in self.state.passengers.values() completed = [p for p in self.state.passengers.values()
if p.status == PassengerStatus.COMPLETED] if p.status == PassengerStatus.COMPLETED]
wait_times = [float(p.wait_time) for p in completed] floor_wait_times = [float(p.floor_wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed] arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
return MetricsResponse( def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
done=len(completed), """计算排除掉最长的指定百分比后的平均值"""
total=len(self.state.passengers), if not data:
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0, return 0.0
p95_wait=percentile(wait_times, 95), sorted_data = sorted(data)
avg_system=sum(system_times) / len(system_times) if system_times else 0, keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
p95_system=percentile(system_times, 95), if keep_count == 0:
return 0.0
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return PerformanceMetrics(
completed_passengers=len(completed),
total_passengers=len(self.state.passengers),
average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
) )
Key metrics: Key metrics:
- **Wait time**: ``pickup_tick - arrive_tick`` (how long passenger waited) - **Floor wait time**: ``pickup_tick - arrive_tick`` (在楼层等待的时间,从到达到上电梯)
- **System time**: ``dropoff_tick - arrive_tick`` (total time in system) - **Arrival wait time**: ``dropoff_tick - arrive_tick`` (总等待时间,从到达到下电梯)
- **P95**: 95th percentile (worst-case for most passengers) - **P95 metrics**: 排除掉最长的5%时间后计算剩余95%的平均值
Best Practices
--------------
1. **React to Events**: Don't poll state - implement event handlers
2. **Use Queued Commands**: Default ``immediate=False`` is safer
3. **Track Passengers**: Monitor ``on_passenger_call`` to know demand
4. **Optimize for Wait Time**: Reduce time between arrival and pickup
5. **Consider Load**: Check ``elevator.is_full`` before dispatching
6. **Handle Idle**: Always give idle elevators something to do (even if it's "go to floor 0")
Example: Efficient Dispatch
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
"""Dispatch nearest suitable elevator"""
best_elevator = None
best_cost = float('inf')
for elevator in self.elevators:
# Skip if full
if elevator.is_full:
continue
# Calculate cost (distance + current load)
distance = abs(elevator.current_floor - floor.floor)
load_penalty = elevator.load_factor * 10
cost = distance + load_penalty
# Check if going in right direction
if elevator.target_floor_direction.value == direction:
cost *= 0.5 # Prefer elevators already going that way
if cost < best_cost:
best_cost = cost
best_elevator = elevator
if best_elevator:
best_elevator.go_to_floor(floor.floor)
Summary Summary
------- -------

View File

@@ -42,22 +42,6 @@ Basic Installation
pip install elevator-py pip install elevator-py
With Development Dependencies
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
pip install elevator-py[dev]
From Source
~~~~~~~~~~~
.. code-block:: bash
git clone https://github.com/ZGCA-Forge/Elevator.git
cd Elevator
pip install -e .[dev]
Quick Start Quick Start
----------- -----------

View File

@@ -257,10 +257,10 @@ Tracks simulation performance:
class PerformanceMetrics(SerializableModel): class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0 completed_passengers: int = 0
total_passengers: int = 0 total_passengers: int = 0
average_wait_time: float = 0.0 average_floor_wait_time: float = 0.0
p95_wait_time: float = 0.0 # 95th percentile p95_floor_wait_time: float = 0.0 # 95th percentile
average_system_time: float = 0.0 average_arrival_wait_time: float = 0.0
p95_system_time: float = 0.0 # 95th percentile p95_arrival_wait_time: float = 0.0 # 95th percentile
Properties: Properties:

View File

@@ -6,5 +6,5 @@ A Python implementation of the Elevator Saga game with event-driven architecture
realistic elevator dispatch algorithm development and testing. realistic elevator dispatch algorithm development and testing.
""" """
__version__ = "0.0.4" __version__ = "0.0.9"
__author__ = "ZGCA Team" __author__ = "ZGCA Team"

View File

@@ -6,7 +6,7 @@ Unified API Client for Elevator Saga
import json import json
import urllib.error import urllib.error
import urllib.request import urllib.request
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional
from elevator_saga.core.models import ( from elevator_saga.core.models import (
ElevatorState, ElevatorState,
@@ -63,16 +63,8 @@ class ElevatorAPIClient:
# 使用服务端返回的metrics数据 # 使用服务端返回的metrics数据
metrics_data = response_data.get("metrics", {}) metrics_data = response_data.get("metrics", {})
if metrics_data: if metrics_data:
# 转换为PerformanceMetrics格式 # 直接从字典创建PerformanceMetrics对象
metrics = PerformanceMetrics( metrics = PerformanceMetrics.from_dict(metrics_data)
completed_passengers=metrics_data.get("done", 0),
total_passengers=metrics_data.get("total", 0),
average_wait_time=metrics_data.get("avg_wait", 0),
p95_wait_time=metrics_data.get("p95_wait", 0),
average_system_time=metrics_data.get("avg_system", 0),
p95_system_time=metrics_data.get("p95_system", 0),
# total_energy_consumption=metrics_data.get("energy_total", 0),
)
else: else:
metrics = PerformanceMetrics() metrics = PerformanceMetrics()
@@ -131,7 +123,7 @@ class ElevatorAPIClient:
else: else:
raise RuntimeError(f"Step failed: {response_data.get('error')}") raise RuntimeError(f"Step failed: {response_data.get('error')}")
def send_elevator_command(self, command: Union[GoToFloorCommand]) -> bool: def send_elevator_command(self, command: GoToFloorCommand) -> bool:
"""发送电梯命令""" """发送电梯命令"""
endpoint = self._get_elevator_endpoint(command) endpoint = self._get_elevator_endpoint(command)
debug_log( debug_log(
@@ -156,7 +148,7 @@ class ElevatorAPIClient:
debug_log(f"Go to floor failed: {e}") debug_log(f"Go to floor failed: {e}")
return False return False
def _get_elevator_endpoint(self, command: Union[GoToFloorCommand]) -> str: def _get_elevator_endpoint(self, command: GoToFloorCommand) -> str:
"""获取电梯命令端点""" """获取电梯命令端点"""
base = f"/api/elevators/{command.elevator_id}" base = f"/api/elevators/{command.elevator_id}"

View File

@@ -7,7 +7,7 @@ import os
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pprint import pprint from pprint import pprint
from typing import Any, Dict, List, Optional from typing import Any, Dict, List
from elevator_saga.client.api_client import ElevatorAPIClient from elevator_saga.client.api_client import ElevatorAPIClient
from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger
@@ -171,6 +171,22 @@ class ElevatorController(ABC):
""" """
pass pass
# @abstractmethod 为了兼容性暂不强制要求elevator_move必须实现
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调 - 可选实现
Args:
elevator: 电梯代理对象
from_position: 起始位置(浮点数表示楼层)
to_position: 目标位置(浮点数表示楼层)
direction: 移动方向
status: 电梯运行状态
"""
pass
def _internal_init(self, elevators: List[Any], floors: List[Any]) -> None: def _internal_init(self, elevators: List[Any], floors: List[Any]) -> None:
"""内部初始化方法""" """内部初始化方法"""
self.elevators = elevators self.elevators = elevators
@@ -218,7 +234,7 @@ class ElevatorController(ABC):
# 获取初始状态并初始化默认从0开始 # 获取初始状态并初始化默认从0开始
try: try:
state = self.api_client.get_state() state = self.api_client.get_state()
except ConnectionResetError as ex: except ConnectionResetError as _: # noqa: F841
print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}") print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}")
os._exit(1) os._exit(1)
if state.tick > 0: if state.tick > 0:
@@ -352,9 +368,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None: if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client) elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client) floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用 self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction)
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction_str)
elif event.type == EventType.ELEVATOR_APPROACHING: elif event.type == EventType.ELEVATOR_APPROACHING:
elevator_id = event.data.get("elevator") elevator_id = event.data.get("elevator")
@@ -363,9 +377,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None: if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client) elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client) floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用 self.on_elevator_approaching(elevator_proxy, floor_proxy, direction)
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_approaching(elevator_proxy, floor_proxy, direction_str)
elif event.type == EventType.PASSENGER_BOARD: elif event.type == EventType.PASSENGER_BOARD:
elevator_id = event.data.get("elevator") elevator_id = event.data.get("elevator")
@@ -385,6 +397,22 @@ class ElevatorController(ABC):
floor_proxy = ProxyFloor(floor_id, self.api_client) floor_proxy = ProxyFloor(floor_id, self.api_client)
self.on_passenger_alight(elevator_proxy, passenger_proxy, floor_proxy) self.on_passenger_alight(elevator_proxy, passenger_proxy, floor_proxy)
elif event.type == EventType.ELEVATOR_MOVE:
elevator_id = event.data.get("elevator")
from_position = event.data.get("from_position")
to_position = event.data.get("to_position")
direction = event.data.get("direction")
status = event.data.get("status")
if (
elevator_id is not None
and from_position is not None
and to_position is not None
and direction is not None
and status is not None
):
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
self.on_elevator_move(elevator_proxy, from_position, to_position, direction, status)
def _reset_and_reinit(self) -> None: def _reset_and_reinit(self) -> None:
"""重置并重新初始化""" """重置并重新初始化"""
try: try:

View File

@@ -41,6 +41,7 @@ class ElevatorBusExampleController(ElevatorController):
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None: def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
self.all_passengers.append(passenger) self.all_passengers.append(passenger)
print(f"乘客 {passenger.id} F{floor.floor} 请求 {passenger.origin} -> {passenger.destination} ({direction})")
pass pass
def on_elevator_idle(self, elevator: ProxyElevator) -> None: def on_elevator_idle(self, elevator: ProxyElevator) -> None:
@@ -60,10 +61,10 @@ class ElevatorBusExampleController(ElevatorController):
elevator.go_to_floor(elevator.current_floor - 1) elevator.go_to_floor(elevator.current_floor - 1)
def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None: def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None:
pass print(f" 乘客{passenger.id} E{elevator.id}⬆️ F{elevator.current_floor} -> F{passenger.destination}")
def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None: def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None:
pass print(f" 乘客{passenger.id} E{elevator.id}⬇️ F{floor.floor}")
def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None: def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass pass
@@ -71,6 +72,11 @@ class ElevatorBusExampleController(ElevatorController):
def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None: def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass pass
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
pass
if __name__ == "__main__": if __name__ == "__main__":
algorithm = ElevatorBusExampleController() algorithm = ElevatorBusExampleController()

View File

@@ -139,6 +139,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True) elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}") print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__": if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True) algorithm = ElevatorBusController(debug=True)

View File

@@ -8,7 +8,7 @@ import uuid
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
# 类型变量 # 类型变量
T = TypeVar("T", bound="SerializableModel") T = TypeVar("T", bound="SerializableModel")
@@ -55,6 +55,7 @@ class EventType(Enum):
IDLE = "idle" IDLE = "idle"
PASSENGER_BOARD = "passenger_board" PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight" PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move" # 电梯移动事件
class SerializableModel: class SerializableModel:
@@ -113,7 +114,7 @@ class Position(SerializableModel):
@property @property
def current_floor_float(self) -> float: def current_floor_float(self) -> float:
return self.current_floor + self.floor_up_position / 10 return round(self.current_floor + self.floor_up_position / 10, 1)
def floor_up_position_add(self, num: int) -> int: def floor_up_position_add(self, num: int) -> int:
self.floor_up_position += num self.floor_up_position += num
@@ -161,12 +162,13 @@ class PassengerInfo(SerializableModel):
arrive_tick: int arrive_tick: int
pickup_tick: int = 0 pickup_tick: int = 0
dropoff_tick: int = 0 dropoff_tick: int = 0
arrived: bool = False
elevator_id: Optional[int] = None elevator_id: Optional[int] = None
@property @property
def status(self) -> PassengerStatus: def status(self) -> PassengerStatus:
"""乘客状态""" """乘客状态"""
if self.dropoff_tick > 0: if self.arrived:
return PassengerStatus.COMPLETED return PassengerStatus.COMPLETED
elif self.pickup_tick > 0: elif self.pickup_tick > 0:
return PassengerStatus.IN_ELEVATOR return PassengerStatus.IN_ELEVATOR
@@ -174,13 +176,13 @@ class PassengerInfo(SerializableModel):
return PassengerStatus.WAITING return PassengerStatus.WAITING
@property @property
def wait_time(self) -> int: def floor_wait_time(self) -> int:
"""等待时间""" """在楼层等待时间(从到达到上电梯)"""
return self.pickup_tick - self.arrive_tick return self.pickup_tick - self.arrive_tick
@property @property
def system_time(self) -> int: def arrival_wait_time(self) -> int:
"""系统时间(总时间""" """总等待时间(从到达到下电梯"""
return self.dropoff_tick - self.arrive_tick return self.dropoff_tick - self.arrive_tick
@property @property
@@ -209,6 +211,7 @@ class ElevatorState(SerializableModel):
indicators: ElevatorIndicators = field(default_factory=ElevatorIndicators) indicators: ElevatorIndicators = field(default_factory=ElevatorIndicators)
passenger_destinations: Dict[int, int] = field(default_factory=dict) # 乘客ID -> 目的地楼层映射 passenger_destinations: Dict[int, int] = field(default_factory=dict) # 乘客ID -> 目的地楼层映射
energy_consumed: float = 0.0 energy_consumed: float = 0.0
energy_rate: float = 1.0 # 能耗率每tick消耗的能量单位
last_update_tick: int = 0 last_update_tick: int = 0
@property @property
@@ -331,11 +334,11 @@ class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0 completed_passengers: int = 0
total_passengers: int = 0 total_passengers: int = 0
average_wait_time: float = 0.0 average_floor_wait_time: float = 0.0
p95_wait_time: float = 0.0 p95_floor_wait_time: float = 0.0
average_system_time: float = 0.0 average_arrival_wait_time: float = 0.0
p95_system_time: float = 0.0 p95_arrival_wait_time: float = 0.0
# total_energy_consumption: float = 0.0 total_energy_consumption: float = 0.0
@property @property
def completion_rate(self) -> float: def completion_rate(self) -> float:
@@ -344,13 +347,6 @@ class PerformanceMetrics(SerializableModel):
return 0.0 return 0.0
return self.completed_passengers / self.total_passengers return self.completed_passengers / self.total_passengers
# @property
# def energy_per_passenger(self) -> float:
# """每位乘客能耗"""
# if self.completed_passengers == 0:
# return 0.0
# return self.total_energy_consumption / self.completed_passengers
@dataclass @dataclass
class SimulationState(SerializableModel): class SimulationState(SerializableModel):

View File

@@ -138,6 +138,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True) elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}") print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__": if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True) algorithm = ElevatorBusController(debug=True)

View File

@@ -10,7 +10,7 @@ import threading
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, cast from typing import Any, Dict, List, cast
from flask import Flask, Response, request from flask import Flask, Response, request
@@ -22,6 +22,7 @@ from elevator_saga.core.models import (
FloorState, FloorState,
PassengerInfo, PassengerInfo,
PassengerStatus, PassengerStatus,
PerformanceMetrics,
SerializableModel, SerializableModel,
SimulationEvent, SimulationEvent,
SimulationState, SimulationState,
@@ -89,19 +90,6 @@ def json_response(data: Any, status: int = 200) -> Response | tuple[Response, in
return response, status return response, status
@dataclass
class MetricsResponse(SerializableModel):
"""性能指标响应"""
done: int
total: int
avg_wait: float
p95_wait: float
avg_system: float
p95_system: float
energy_total: float
@dataclass @dataclass
class PassengerSummary(SerializableModel): class PassengerSummary(SerializableModel):
"""乘客摘要""" """乘客摘要"""
@@ -120,7 +108,7 @@ class SimulationStateResponse(SerializableModel):
elevators: List[ElevatorState] elevators: List[ElevatorState]
floors: List[FloorState] floors: List[FloorState]
passengers: Dict[int, PassengerInfo] passengers: Dict[int, PassengerInfo]
metrics: MetricsResponse metrics: PerformanceMetrics
class ElevatorSimulation: class ElevatorSimulation:
@@ -192,6 +180,14 @@ class ElevatorSimulation:
building_config["elevators"], building_config["floors"], building_config["elevator_capacity"] building_config["elevators"], building_config["floors"], building_config["elevator_capacity"]
) )
self.reset() self.reset()
# 设置电梯能耗率
elevator_energy_rates = building_config.get("elevator_energy_rates", [1.0] * building_config["elevators"])
for i, elevator in enumerate(self.state.elevators):
if i < len(elevator_energy_rates):
elevator.energy_rate = elevator_energy_rates[i]
server_debug_log(f"电梯 E{elevator.id} 能耗率设置为: {elevator.energy_rate}")
self.max_duration_ticks = building_config["duration"] self.max_duration_ticks = building_config["duration"]
traffic_data: list[Dict[str, Any]] = file_data["traffic"] traffic_data: list[Dict[str, Any]] = file_data["traffic"]
traffic_data.sort(key=lambda t: cast(int, t["tick"])) traffic_data.sort(key=lambda t: cast(int, t["tick"]))
@@ -286,14 +282,13 @@ class ElevatorSimulation:
# 2. Move elevators # 2. Move elevators
self._move_elevators() self._move_elevators()
# 3. Process elevator stops and passenger boarding/alighting # 3. Process elevator stops and passenger alighting
self._process_elevator_stops() self._process_elevator_stops()
# Return events generated this tick # Return events generated this tick
return self.state.events[events_start:] return self.state.events[events_start:]
def _process_passenger_in(self) -> None: def _process_passenger_in(self, elevator: ElevatorState) -> None:
for elevator in self.elevators:
current_floor = elevator.current_floor current_floor = elevator.current_floor
# 处于Stopped状态方向也已经清空说明没有调度。 # 处于Stopped状态方向也已经清空说明没有调度。
floor = self.floors[current_floor] floor = self.floors[current_floor]
@@ -323,7 +318,6 @@ class ElevatorSimulation:
def _update_elevator_status(self) -> None: def _update_elevator_status(self) -> None:
"""更新电梯运行状态""" """更新电梯运行状态"""
for elevator in self.elevators: for elevator in self.elevators:
current_floor = elevator.position.current_floor
target_floor = elevator.target_floor target_floor = elevator.target_floor
old_status = elevator.run_status.value old_status = elevator.run_status.value
# 没有移动方向,说明电梯已经到达目标楼层 # 没有移动方向,说明电梯已经到达目标楼层
@@ -331,7 +325,7 @@ class ElevatorSimulation:
if elevator.next_target_floor is not None: if elevator.next_target_floor is not None:
self._set_elevator_target_floor(elevator, elevator.next_target_floor) self._set_elevator_target_floor(elevator, elevator.next_target_floor)
self._process_passenger_in() self._process_passenger_in(elevator)
elevator.next_target_floor = None elevator.next_target_floor = None
else: else:
continue continue
@@ -391,14 +385,32 @@ class ElevatorSimulation:
# 根据状态和方向调整移动距离 # 根据状态和方向调整移动距离
elevator.last_tick_direction = elevator.target_floor_direction elevator.last_tick_direction = elevator.target_floor_direction
old_position = elevator.position.current_floor_float
if elevator.target_floor_direction == Direction.UP: if elevator.target_floor_direction == Direction.UP:
new_floor = elevator.position.floor_up_position_add(movement_speed) new_floor = elevator.position.floor_up_position_add(movement_speed)
# 电梯移动时增加能耗每tick增加电梯的能耗率
elevator.energy_consumed += elevator.energy_rate
elif elevator.target_floor_direction == Direction.DOWN: elif elevator.target_floor_direction == Direction.DOWN:
new_floor = elevator.position.floor_up_position_add(-movement_speed) new_floor = elevator.position.floor_up_position_add(-movement_speed)
# 电梯移动时增加能耗每tick增加电梯的能耗率
elevator.energy_consumed += elevator.energy_rate
else: else:
# 之前的状态已经是到站了,清空上一次到站的方向 # 之前的状态已经是到站了,清空上一次到站的方向
pass pass
# 发送电梯移动事件
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
},
)
# 移动后检测是否即将到站,从匀速状态切换到减速 # 移动后检测是否即将到站,从匀速状态切换到减速
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: if elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
# 检查是否需要开始减速这里加速减速设置路程为1匀速路程为2这样能够保证不会匀速恰好到达必须加减速 # 检查是否需要开始减速这里加速减速设置路程为1匀速路程为2这样能够保证不会匀速恰好到达必须加减速
@@ -411,7 +423,7 @@ class ElevatorSimulation:
EventType.ELEVATOR_APPROACHING, EventType.ELEVATOR_APPROACHING,
{ {
"elevator": elevator.id, "elevator": elevator.id,
"floor": elevator.target_floor, "floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value, "direction": elevator.target_floor_direction.value,
}, },
) )
@@ -435,7 +447,6 @@ class ElevatorSimulation:
self._emit_event( self._emit_event(
EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor, "reason": "move_reached"} EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor, "reason": "move_reached"}
) )
# elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5
def _process_elevator_stops(self) -> None: def _process_elevator_stops(self) -> None:
""" """
@@ -457,6 +468,7 @@ class ElevatorSimulation:
passenger = self.passengers[passenger_id] passenger = self.passengers[passenger_id]
if passenger.destination == current_floor: if passenger.destination == current_floor:
passenger.dropoff_tick = self.tick passenger.dropoff_tick = self.tick
passenger.arrived = True
passengers_to_remove.append(passenger_id) passengers_to_remove.append(passenger_id)
# Remove passengers who alighted # Remove passengers who alighted
@@ -478,7 +490,6 @@ class ElevatorSimulation:
[SERVER-DEBUG] 电梯 E0 被设定为前往 F1 [SERVER-DEBUG] 电梯 E0 被设定为前往 F1
说明电梯处于stop状态这个tick直接采用下一个目的地运行了 说明电梯处于stop状态这个tick直接采用下一个目的地运行了
""" """
original_target_floor = elevator.target_floor
elevator.position.target_floor = floor elevator.position.target_floor = floor
server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}") server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}")
new_target_floor_should_accel = self._should_start_deceleration(elevator) new_target_floor_should_accel = self._should_start_deceleration(elevator)
@@ -547,41 +558,51 @@ class ElevatorSimulation:
metrics=metrics, metrics=metrics,
) )
def _calculate_metrics(self) -> MetricsResponse: def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics""" """Calculate performance metrics"""
# 直接从state中筛选已完成的乘客 # 直接从state中筛选已完成的乘客
completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED] completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED]
total_passengers = len(self.state.passengers) total_passengers = len(self.state.passengers)
# 计算总能耗
total_energy = sum(elevator.energy_consumed for elevator in self.state.elevators)
if not completed: if not completed:
return MetricsResponse( return PerformanceMetrics(
done=0, completed_passengers=0,
total=total_passengers, total_passengers=total_passengers,
avg_wait=0, average_floor_wait_time=0,
p95_wait=0, p95_floor_wait_time=0,
avg_system=0, average_arrival_wait_time=0,
p95_system=0, p95_arrival_wait_time=0,
energy_total=sum(e.energy_consumed for e in self.elevators), total_energy_consumption=total_energy,
) )
wait_times = [float(p.wait_time) for p in completed] floor_wait_times = [float(p.floor_wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed] arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
def percentile(data: List[float], p: int) -> float: def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
"""计算排除掉最长的指定百分比后的平均值"""
if not data: if not data:
return 0.0 return 0.0
sorted_data = sorted(data) sorted_data = sorted(data)
index = int(len(sorted_data) * p / 100) # 计算要保留的数据数量(排除掉最长的 exclude_percent
return sorted_data[min(index, len(sorted_data) - 1)] keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
if keep_count == 0:
return 0.0
# 只保留前 keep_count 个数据,排除最长的部分
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return MetricsResponse( return PerformanceMetrics(
done=len(completed), completed_passengers=len(completed),
total=total_passengers, total_passengers=total_passengers,
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0, average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_wait=percentile(wait_times, 95), p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
avg_system=sum(system_times) / len(system_times) if system_times else 0, average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_system=percentile(system_times, 95), p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
energy_total=sum(e.energy_consumed for e in self.elevators), total_energy_consumption=total_energy,
) )
def get_events(self, since_tick: int = 0) -> List[SimulationEvent]: def get_events(self, since_tick: int = 0) -> List[SimulationEvent]:

View File

@@ -290,7 +290,7 @@ def generate_fire_evacuation_traffic(
for floor in range(1, floors): for floor in range(1, floors):
# 每层随机数量的人需要疏散 # 每层随机数量的人需要疏散
num_people = random.randint(people_per_floor[0], people_per_floor[1]) num_people = random.randint(people_per_floor[0], people_per_floor[1])
for i in range(num_people): for _ in range(num_people):
# 在10个tick内陆续到达模拟疏散的紧急性 # 在10个tick内陆续到达模拟疏散的紧急性
arrival_tick = alarm_tick + random.randint(0, min(10, duration - alarm_tick - 1)) arrival_tick = alarm_tick + random.randint(0, min(10, duration - alarm_tick - 1))
if arrival_tick < duration: if arrival_tick < duration:
@@ -791,10 +791,12 @@ def generate_traffic_file(scenario: str, output_file: str, scale: Optional[str]
traffic_data = generator_func(**generator_params) traffic_data = generator_func(**generator_params)
# 准备building配置 # 准备building配置
num_elevators = params["elevators"]
building_config = { building_config = {
"floors": params["floors"], "floors": params["floors"],
"elevators": params["elevators"], "elevators": num_elevators,
"elevator_capacity": params["elevator_capacity"], "elevator_capacity": params["elevator_capacity"],
"elevator_energy_rates": [1.0] * num_elevators, # 每台电梯的能耗率默认为1.0
"scenario": scenario, "scenario": scenario,
"scale": scale, "scale": scale,
"description": f"{config['description']} ({scale}规模)", "description": f"{config['description']} ({scale}规模)",
@@ -835,7 +837,7 @@ def generate_scaled_traffic_files(
if custom_building: if custom_building:
floors = custom_building.get("floors", BUILDING_SCALES[scale]["floors"][0]) floors = custom_building.get("floors", BUILDING_SCALES[scale]["floors"][0])
elevators = custom_building.get("elevators", BUILDING_SCALES[scale]["elevators"][0]) elevators = custom_building.get("elevators", BUILDING_SCALES[scale]["elevators"][0])
elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0]) _elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0])
# 重新确定规模 # 重新确定规模
detected_scale = determine_building_scale(floors, elevators) detected_scale = determine_building_scale(floors, elevators)