15 Commits

Author SHA1 Message Date
Xuwznln
83459923e8 Update docs (Energy) 2025-10-16 01:26:28 +08:00
Xuwznln
b4b99daead Bump version: 0.0.8 → 0.0.9 2025-10-16 01:20:39 +08:00
Xuwznln
d44ba8b6cd Update gitignore 2025-10-16 01:20:11 +08:00
Xuwznln
71e8f2a451 Feat: add energy rate for elevators 2025-10-15 20:46:47 +08:00
Xuwznln
4b60359894 Bump version: 0.0.7 → 0.0.8 2025-10-12 02:14:27 +08:00
Xuwznln
0157496e6f Fix: client completed_passengers calculation error 2025-10-12 02:14:18 +08:00
Xuwznln
1031e677e1 Bump version: 0.0.6 → 0.0.7 2025-10-09 16:49:15 +08:00
Xuwznln
889d554f19 Fix: remove abstractmethod decroation for on_elevator_move 2025-10-09 16:49:07 +08:00
Xuwznln
ee3c4bab7e Bump version: 0.0.5 → 0.0.6 2025-10-09 16:41:04 +08:00
Xuwznln
99524eee3d Add: elevator move event 2025-10-09 16:40:51 +08:00
Xuwznln
b2d03b2510 Bump version: 0.0.4 → 0.0.5 2025-10-06 15:16:47 +08:00
Xuwznln
1a8063e4fd fix performance calculation. fix floor error in approaching event. fix passenger board wrongly. 2025-10-06 15:16:35 +08:00
Xuwznln
692b853101 Update bug report template 2025-10-06 14:36:53 +08:00
Xuwznln
4c86816920 Update docs 2025-10-01 18:32:03 +08:00
Xuwznln
349511e00f Bump version: 0.0.3 → 0.0.4 2025-10-01 18:07:21 +08:00
18 changed files with 395 additions and 303 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.3
current_version = 0.0.9
commit = True
tag = True
tag_name = v{new_version}

View File

@@ -0,0 +1,30 @@
---
name: Bug Report
about: Report a bug in the project
title: "[BUG] "
labels: bug
assignees: ""
---
**Version Information**
- elevator-py version: [e.g. v0.0.5]
- Python version: [e.g. 3.11]
**Bug Description**
A clear and concise description of the bug
**Steps to Reproduce**
1.
2.
3.
**Expected Behavior**
What you expected to happen
**Actual Behavior**
What actually happened
**Additional Context (Optional)**
Any other information that might help, such as error logs, screenshots, etc.

1
.gitignore vendored
View File

@@ -1,6 +1,7 @@
# ================================
# Python-related files
# ================================
elevator_saga/traffic/test_cases.py
# Compiled Python files
__pycache__/

View File

@@ -30,20 +30,6 @@ Elevator Saga is a Python implementation of an elevator [simulation game](https:
pip install elevator-py
```
### With Development Dependencies
```bash
pip install elevator-py[dev]
```
### From Source
```bash
git clone https://github.com/ZGCA-Forge/Elevator.git
cd Elevator
pip install -e .[dev]
```
## Quick Start
### Running the Game

View File

@@ -176,7 +176,7 @@ Dynamic proxy for ``PassengerInfo`` that provides access to passenger informatio
if passenger.status == PassengerStatus.IN_ELEVATOR:
print(f"In elevator {passenger.elevator_id}")
print(f"Waited {passenger.wait_time} ticks")
print(f"Waited {passenger.floor_wait_time} ticks")
Read-Only Protection
~~~~~~~~~~~~~~~~~~~~
@@ -284,6 +284,7 @@ The controller provides these event handlers:
- ``on_passenger_alight(elevator, passenger, floor)``: Passenger alights
- ``on_elevator_passing_floor(elevator, floor, direction)``: Elevator passes floor
- ``on_elevator_approaching(elevator, floor, direction)``: Elevator about to arrive
- ``on_elevator_move(elevator, from_position, to_position, direction, status)``: Elevator moves
Complete Example
----------------
@@ -343,25 +344,6 @@ Benefits of Proxy Architecture
5. **Separation of Concerns**: State management handled by proxies, logic in controller
6. **Testability**: Can mock API client for unit tests
Performance Considerations
--------------------------
Each attribute access triggers an API call. For better performance:
.. code-block:: python
# ❌ Inefficient - multiple API calls
if elevator.current_floor < elevator.target_floor:
diff = elevator.target_floor - elevator.current_floor
# ✅ Better - store references
current = elevator.current_floor
target = elevator.target_floor
if current < target:
diff = target - current
However, the API client implements **caching within a single tick**, so multiple accesses during event processing are efficient.
Next Steps
----------

View File

@@ -17,9 +17,9 @@ Architecture Overview
│ api_client │ │ │
└─────────────────┘ └─────────────────┘
│ │
│ GET /api/state │
│ POST /api/step │
│ POST /api/elevators/:id/go_to_floor │
│ GET /api/state
│ POST /api/step
│ POST /api/elevators/:id/go_to_floor
│ │
└───────────────────────────────────────┘
@@ -65,6 +65,8 @@ Response format:
"passengers": [101, 102],
"max_capacity": 10,
"run_status": "constant_speed",
"energy_consumed": 38.5,
"energy_rate": 1.0,
"..."
}
],
@@ -81,7 +83,8 @@ Response format:
"avg_wait": 15.2,
"p95_wait": 30.0,
"avg_system": 25.5,
"p95_system": 45.0
"p95_system": 45.0,
"total_energy_consumption": 156.0
}
}
@@ -394,37 +397,37 @@ Typical communication sequence during one tick:
.. code-block:: text
Client Server
│ │
│ 1. GET /api/state │
├────────────────────────────────────►│
│ ◄── SimulationState (cached) │
│ │
│ 2. Analyze state, make decisions │
│ │
│ 3. POST /api/elevators/0/go_to_floor
├────────────────────────────────────►│
│ ◄── {"success": true} │
│ │
│ 4. GET /api/state (from cache) │
│ No HTTP request! │
│ │
│ 5. POST /api/step │
├────────────────────────────────────►│
│ Server processes tick │
│ - Moves elevators │
│ - Boards/alights passengers │
│ - Generates events │
│ ◄── {tick: 43, events: [...]} │
│ │
│ 6. Process events │
│ Cache invalidated │
│ │
│ 7. GET /api/state (fetches fresh) │
├────────────────────────────────────►│
│ ◄── SimulationState │
│ │
└─────────────────────────────────────┘
Client Server
│ 1. GET /api/state
├────────────────────────────────────►│
│ ◄── SimulationState (cached)
│ 2. Analyze state, make decisions
│ 3. POST /api/elevators/0/go_to_floor│
├────────────────────────────────────►│
│ ◄── {"success": true}
│ 4. GET /api/state (from cache)
│ No HTTP request!
│ 5. POST /api/step
├────────────────────────────────────►│
│ Server processes tick
│ - Moves elevators
│ - Boards/alights passengers
│ - Generates events
│ ◄── {tick: 43, events: [...]}
│ 6. Process events
│ Cache invalidated
│ 7. GET /api/state (fetches fresh)
├────────────────────────────────────►│
│ ◄── SimulationState
└─────────────────────────────────────
Error Handling
--------------
@@ -476,23 +479,6 @@ The simulator uses a lock to ensure thread-safe access:
This allows Flask to handle concurrent requests safely.
Performance Considerations
--------------------------
**Minimize HTTP Calls**:
.. code-block:: python
# ❌ Bad - 3 HTTP calls
for i in range(3):
state = api_client.get_state()
print(state.tick)
# ✅ Good - 1 HTTP call (cached)
state = api_client.get_state()
for i in range(3):
print(state.tick)
**Batch Commands**:
.. code-block:: python

View File

@@ -8,21 +8,21 @@ Simulation Overview
.. code-block:: text
┌─────────────────────────────────────────────────────────┐
┌─────────────────────────────────────────────────────────
│ Simulation Loop │
│ │
│ Tick N │
│ 1. Update elevator status (START_UP → CONSTANT_SPEED)│
│ 1. Update elevator status (START_UP → CONSTANT_SPEED)
│ 2. Process arrivals (new passengers) │
│ 3. Move elevators (physics simulation) │
│ 4. Process stops (boarding/alighting) │
│ 5. Generate events │
│ │
│ Events sent to client → Client processes → Commands │
│ Events sent to client → Client processes → Commands
│ │
│ Tick N+1 │
│ (repeat...) │
└─────────────────────────────────────────────────────────┘
└─────────────────────────────────────────────────────────
Tick-Based Execution
--------------------
@@ -202,7 +202,7 @@ Event System
Event Types
~~~~~~~~~~~
The simulation generates 8 types of events defined in ``EventType`` enum:
The simulation generates 9 types of events defined in ``EventType`` enum:
.. code-block:: python
@@ -215,6 +215,7 @@ The simulation generates 8 types of events defined in ``EventType`` enum:
IDLE = "idle"
PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move"
Event Generation
~~~~~~~~~~~~~~~~
@@ -260,6 +261,19 @@ Events are generated during tick processing:
for elevator in self.elevators:
# ... movement logic ...
# Elevator moves
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
}
)
# Passing a floor
if old_floor != new_floor and new_floor != target_floor:
self._emit_event(
@@ -277,7 +291,7 @@ Events are generated during tick processing:
EventType.ELEVATOR_APPROACHING,
{
"elevator": elevator.id,
"floor": elevator.target_floor,
"floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value
}
)
@@ -363,6 +377,14 @@ The ``ElevatorController`` base class automatically routes events to handler met
elevator = self.elevators[event.data["elevator"]]
self.on_elevator_idle(elevator)
elif event.type == EventType.ELEVATOR_MOVE:
elevator = self.elevators[event.data["elevator"]]
from_position = event.data["from_position"]
to_position = event.data["to_position"]
direction = event.data["direction"]
status = event.data["status"]
self.on_elevator_move(elevator, from_position, to_position, direction, status)
# ... other event types ...
Control Flow: Bus Example
@@ -463,27 +485,6 @@ Here's what happens in a typical tick:
Key Timing Concepts
-------------------
Command vs. Execution
~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
# Tick 42: Controller sends command
elevator.go_to_floor(5, immediate=False)
# ← Command queued in elevator.next_target_floor
# Tick 43: Server processes
# ← _update_elevator_status() assigns target
# ← Elevator starts moving
# Tick 44-46: Elevator in motion
# ← Events: PASSING_FLOOR
# Tick 47: Elevator arrives
# ← Event: STOPPED_AT_FLOOR
There's a **one-tick delay** between command and execution (unless ``immediate=True``).
Immediate vs. Queued
~~~~~~~~~~~~~~~~~~~~
@@ -508,69 +509,39 @@ Metrics are calculated from passenger data:
.. code-block:: python
def _calculate_metrics(self) -> MetricsResponse:
def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics"""
completed = [p for p in self.state.passengers.values()
if p.status == PassengerStatus.COMPLETED]
wait_times = [float(p.wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed]
floor_wait_times = [float(p.floor_wait_time) for p in completed]
arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
return MetricsResponse(
done=len(completed),
total=len(self.state.passengers),
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0,
p95_wait=percentile(wait_times, 95),
avg_system=sum(system_times) / len(system_times) if system_times else 0,
p95_system=percentile(system_times, 95),
def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
"""计算排除掉最长的指定百分比后的平均值"""
if not data:
return 0.0
sorted_data = sorted(data)
keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
if keep_count == 0:
return 0.0
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return PerformanceMetrics(
completed_passengers=len(completed),
total_passengers=len(self.state.passengers),
average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
)
Key metrics:
- **Wait time**: ``pickup_tick - arrive_tick`` (how long passenger waited)
- **System time**: ``dropoff_tick - arrive_tick`` (total time in system)
- **P95**: 95th percentile (worst-case for most passengers)
Best Practices
--------------
1. **React to Events**: Don't poll state - implement event handlers
2. **Use Queued Commands**: Default ``immediate=False`` is safer
3. **Track Passengers**: Monitor ``on_passenger_call`` to know demand
4. **Optimize for Wait Time**: Reduce time between arrival and pickup
5. **Consider Load**: Check ``elevator.is_full`` before dispatching
6. **Handle Idle**: Always give idle elevators something to do (even if it's "go to floor 0")
Example: Efficient Dispatch
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
"""Dispatch nearest suitable elevator"""
best_elevator = None
best_cost = float('inf')
for elevator in self.elevators:
# Skip if full
if elevator.is_full:
continue
# Calculate cost (distance + current load)
distance = abs(elevator.current_floor - floor.floor)
load_penalty = elevator.load_factor * 10
cost = distance + load_penalty
# Check if going in right direction
if elevator.target_floor_direction.value == direction:
cost *= 0.5 # Prefer elevators already going that way
if cost < best_cost:
best_cost = cost
best_elevator = elevator
if best_elevator:
best_elevator.go_to_floor(floor.floor)
- **Floor wait time**: ``pickup_tick - arrive_tick`` (在楼层等待的时间,从到达到上电梯)
- **Arrival wait time**: ``dropoff_tick - arrive_tick`` (总等待时间,从到达到下电梯)
- **P95 metrics**: 排除掉最长的5%时间后计算剩余95%的平均值
Summary
-------

View File

@@ -28,7 +28,9 @@ Features
🔌 **Client-Server Model**: Separate simulation server from control logic for clean architecture
📊 **Performance Metrics**: Track wait times, system times, and completion rates
📊 **Performance Metrics**: Track wait times, system times, completion rates, and energy consumption
**Energy Tracking**: Monitor and optimize energy consumption with configurable per-elevator energy rates
🎯 **Flexible Control**: Implement your own algorithms using a simple controller interface
@@ -42,22 +44,6 @@ Basic Installation
pip install elevator-py
With Development Dependencies
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
pip install elevator-py[dev]
From Source
~~~~~~~~~~~
.. code-block:: bash
git clone https://github.com/ZGCA-Forge/Elevator.git
cd Elevator
pip install -e .[dev]
Quick Start
-----------

View File

@@ -128,6 +128,7 @@ Complete state information for an elevator:
indicators: ElevatorIndicators = field(default_factory=ElevatorIndicators)
passenger_destinations: Dict[int, int] = {} # passenger_id -> floor
energy_consumed: float = 0.0
energy_rate: float = 1.0 # Energy consumption rate per tick
last_update_tick: int = 0
Key Properties:
@@ -142,6 +143,11 @@ Key Properties:
- ``pressed_floors``: List of destination floors for current passengers
- ``load_factor``: Current load as fraction of capacity (0.0 to 1.0)
Energy Tracking:
- ``energy_consumed``: Total energy consumed by this elevator during the simulation
- ``energy_rate``: Energy consumption rate per tick when moving (default: 1.0). Can be customized in traffic configuration files to simulate different elevator types (e.g., older elevators with higher rates, newer energy-efficient elevators with lower rates)
FloorState
~~~~~~~~~~
@@ -257,15 +263,20 @@ Tracks simulation performance:
class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0
total_passengers: int = 0
average_wait_time: float = 0.0
p95_wait_time: float = 0.0 # 95th percentile
average_system_time: float = 0.0
p95_system_time: float = 0.0 # 95th percentile
average_floor_wait_time: float = 0.0
p95_floor_wait_time: float = 0.0 # 95th percentile
average_arrival_wait_time: float = 0.0
p95_arrival_wait_time: float = 0.0 # 95th percentile
total_energy_consumption: float = 0.0 # Total energy consumed by all elevators
Properties:
- ``completion_rate``: Fraction of passengers completed (0.0 to 1.0)
Energy Metrics:
- ``total_energy_consumption``: Sum of energy consumed by all elevators in the system. Each elevator consumes ``energy_rate`` units of energy per tick when moving.
API Models
----------
@@ -345,3 +356,75 @@ All models support JSON serialization:
restored = ElevatorState.from_dict(data)
This enables seamless transmission over HTTP between client and server.
Energy System
-------------
Overview
~~~~~~~~
The energy system tracks energy consumption of elevators to help optimize control algorithms for both passenger service and energy efficiency.
How Energy Works
~~~~~~~~~~~~~~~~
**Energy Consumption:**
- Each elevator has an ``energy_rate`` attribute (default: 1.0)
- When an elevator moves (any tick where it's not stopped), it consumes energy equal to its ``energy_rate``
- Energy consumption is independent of speed, direction, or load
- Total system energy is the sum of all individual elevator energy consumption
**Configuration:**
Energy rates are configured in traffic JSON files via the ``elevator_energy_rates`` field:
.. code-block:: json
{
"building": {
"floors": 10,
"elevators": 3,
"elevator_capacity": 10,
"elevator_energy_rates": [1.0, 1.0, 1.2],
"scenario": "custom_scenario",
"duration": 600
},
"traffic": []
}
In this example, elevators 0 and 1 have standard energy rates (1.0), while elevator 2 consumes 20% more energy (1.2), perhaps representing an older or less efficient unit.
**Use Cases:**
1. **Algorithm Optimization**: Balance passenger wait times against energy consumption
2. **Heterogeneous Fleets**: Model buildings with elevators of different ages/efficiencies
3. **Cost Analysis**: Evaluate the energy cost of different control strategies
4. **Green Building Simulation**: Optimize for minimal energy while maintaining service quality
Example Usage
~~~~~~~~~~~~~
.. code-block:: python
# Get current state
state = api_client.get_state()
# Check individual elevator energy
for elevator in state.elevators:
print(f"Elevator {elevator.id}: {elevator.energy_consumed} units consumed")
print(f" Energy rate: {elevator.energy_rate} units/tick")
# Check total system energy
metrics = state.metrics
print(f"Total system energy: {metrics.total_energy_consumption} units")
print(f"Completed passengers: {metrics.completed_passengers}")
# Calculate energy per passenger
if metrics.completed_passengers > 0:
energy_per_passenger = metrics.total_energy_consumption / metrics.completed_passengers
print(f"Energy per passenger: {energy_per_passenger:.2f} units")
**Default Behavior:**
If ``elevator_energy_rates`` is not specified in the traffic file, all elevators default to an energy rate of 1.0, ensuring backward compatibility with existing traffic files.

View File

@@ -6,5 +6,5 @@ A Python implementation of the Elevator Saga game with event-driven architecture
realistic elevator dispatch algorithm development and testing.
"""
__version__ = "0.0.3"
__version__ = "0.0.9"
__author__ = "ZGCA Team"

View File

@@ -6,7 +6,7 @@ Unified API Client for Elevator Saga
import json
import urllib.error
import urllib.request
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional
from elevator_saga.core.models import (
ElevatorState,
@@ -63,16 +63,8 @@ class ElevatorAPIClient:
# 使用服务端返回的metrics数据
metrics_data = response_data.get("metrics", {})
if metrics_data:
# 转换为PerformanceMetrics格式
metrics = PerformanceMetrics(
completed_passengers=metrics_data.get("done", 0),
total_passengers=metrics_data.get("total", 0),
average_wait_time=metrics_data.get("avg_wait", 0),
p95_wait_time=metrics_data.get("p95_wait", 0),
average_system_time=metrics_data.get("avg_system", 0),
p95_system_time=metrics_data.get("p95_system", 0),
# total_energy_consumption=metrics_data.get("energy_total", 0),
)
# 直接从字典创建PerformanceMetrics对象
metrics = PerformanceMetrics.from_dict(metrics_data)
else:
metrics = PerformanceMetrics()
@@ -131,7 +123,7 @@ class ElevatorAPIClient:
else:
raise RuntimeError(f"Step failed: {response_data.get('error')}")
def send_elevator_command(self, command: Union[GoToFloorCommand]) -> bool:
def send_elevator_command(self, command: GoToFloorCommand) -> bool:
"""发送电梯命令"""
endpoint = self._get_elevator_endpoint(command)
debug_log(
@@ -156,7 +148,7 @@ class ElevatorAPIClient:
debug_log(f"Go to floor failed: {e}")
return False
def _get_elevator_endpoint(self, command: Union[GoToFloorCommand]) -> str:
def _get_elevator_endpoint(self, command: GoToFloorCommand) -> str:
"""获取电梯命令端点"""
base = f"/api/elevators/{command.elevator_id}"

View File

@@ -7,7 +7,7 @@ import os
import time
from abc import ABC, abstractmethod
from pprint import pprint
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from elevator_saga.client.api_client import ElevatorAPIClient
from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger
@@ -171,6 +171,22 @@ class ElevatorController(ABC):
"""
pass
# @abstractmethod 为了兼容性暂不强制要求elevator_move必须实现
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调 - 可选实现
Args:
elevator: 电梯代理对象
from_position: 起始位置(浮点数表示楼层)
to_position: 目标位置(浮点数表示楼层)
direction: 移动方向
status: 电梯运行状态
"""
pass
def _internal_init(self, elevators: List[Any], floors: List[Any]) -> None:
"""内部初始化方法"""
self.elevators = elevators
@@ -218,7 +234,7 @@ class ElevatorController(ABC):
# 获取初始状态并初始化默认从0开始
try:
state = self.api_client.get_state()
except ConnectionResetError as ex:
except ConnectionResetError as _: # noqa: F841
print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}")
os._exit(1)
if state.tick > 0:
@@ -352,9 +368,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction_str)
self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction)
elif event.type == EventType.ELEVATOR_APPROACHING:
elevator_id = event.data.get("elevator")
@@ -363,9 +377,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_approaching(elevator_proxy, floor_proxy, direction_str)
self.on_elevator_approaching(elevator_proxy, floor_proxy, direction)
elif event.type == EventType.PASSENGER_BOARD:
elevator_id = event.data.get("elevator")
@@ -385,6 +397,22 @@ class ElevatorController(ABC):
floor_proxy = ProxyFloor(floor_id, self.api_client)
self.on_passenger_alight(elevator_proxy, passenger_proxy, floor_proxy)
elif event.type == EventType.ELEVATOR_MOVE:
elevator_id = event.data.get("elevator")
from_position = event.data.get("from_position")
to_position = event.data.get("to_position")
direction = event.data.get("direction")
status = event.data.get("status")
if (
elevator_id is not None
and from_position is not None
and to_position is not None
and direction is not None
and status is not None
):
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
self.on_elevator_move(elevator_proxy, from_position, to_position, direction, status)
def _reset_and_reinit(self) -> None:
"""重置并重新初始化"""
try:

View File

@@ -41,6 +41,7 @@ class ElevatorBusExampleController(ElevatorController):
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
self.all_passengers.append(passenger)
print(f"乘客 {passenger.id} F{floor.floor} 请求 {passenger.origin} -> {passenger.destination} ({direction})")
pass
def on_elevator_idle(self, elevator: ProxyElevator) -> None:
@@ -60,10 +61,10 @@ class ElevatorBusExampleController(ElevatorController):
elevator.go_to_floor(elevator.current_floor - 1)
def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None:
pass
print(f" 乘客{passenger.id} E{elevator.id}⬆️ F{elevator.current_floor} -> F{passenger.destination}")
def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None:
pass
print(f" 乘客{passenger.id} E{elevator.id}⬇️ F{floor.floor}")
def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass
@@ -71,6 +72,11 @@ class ElevatorBusExampleController(ElevatorController):
def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
pass
if __name__ == "__main__":
algorithm = ElevatorBusExampleController()

View File

@@ -139,6 +139,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True)

View File

@@ -8,7 +8,7 @@ import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
# 类型变量
T = TypeVar("T", bound="SerializableModel")
@@ -55,6 +55,7 @@ class EventType(Enum):
IDLE = "idle"
PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move" # 电梯移动事件
class SerializableModel:
@@ -113,7 +114,7 @@ class Position(SerializableModel):
@property
def current_floor_float(self) -> float:
return self.current_floor + self.floor_up_position / 10
return round(self.current_floor + self.floor_up_position / 10, 1)
def floor_up_position_add(self, num: int) -> int:
self.floor_up_position += num
@@ -161,12 +162,13 @@ class PassengerInfo(SerializableModel):
arrive_tick: int
pickup_tick: int = 0
dropoff_tick: int = 0
arrived: bool = False
elevator_id: Optional[int] = None
@property
def status(self) -> PassengerStatus:
"""乘客状态"""
if self.dropoff_tick > 0:
if self.arrived:
return PassengerStatus.COMPLETED
elif self.pickup_tick > 0:
return PassengerStatus.IN_ELEVATOR
@@ -174,13 +176,13 @@ class PassengerInfo(SerializableModel):
return PassengerStatus.WAITING
@property
def wait_time(self) -> int:
"""等待时间"""
def floor_wait_time(self) -> int:
"""在楼层等待时间(从到达到上电梯)"""
return self.pickup_tick - self.arrive_tick
@property
def system_time(self) -> int:
"""系统时间(总时间"""
def arrival_wait_time(self) -> int:
"""总等待时间(从到达到下电梯"""
return self.dropoff_tick - self.arrive_tick
@property
@@ -209,6 +211,7 @@ class ElevatorState(SerializableModel):
indicators: ElevatorIndicators = field(default_factory=ElevatorIndicators)
passenger_destinations: Dict[int, int] = field(default_factory=dict) # 乘客ID -> 目的地楼层映射
energy_consumed: float = 0.0
energy_rate: float = 1.0 # 能耗率每tick消耗的能量单位
last_update_tick: int = 0
@property
@@ -331,11 +334,11 @@ class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0
total_passengers: int = 0
average_wait_time: float = 0.0
p95_wait_time: float = 0.0
average_system_time: float = 0.0
p95_system_time: float = 0.0
# total_energy_consumption: float = 0.0
average_floor_wait_time: float = 0.0
p95_floor_wait_time: float = 0.0
average_arrival_wait_time: float = 0.0
p95_arrival_wait_time: float = 0.0
total_energy_consumption: float = 0.0
@property
def completion_rate(self) -> float:
@@ -344,13 +347,6 @@ class PerformanceMetrics(SerializableModel):
return 0.0
return self.completed_passengers / self.total_passengers
# @property
# def energy_per_passenger(self) -> float:
# """每位乘客能耗"""
# if self.completed_passengers == 0:
# return 0.0
# return self.total_energy_consumption / self.completed_passengers
@dataclass
class SimulationState(SerializableModel):

View File

@@ -138,6 +138,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True)

View File

@@ -10,7 +10,7 @@ import threading
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, cast
from flask import Flask, Response, request
@@ -22,6 +22,7 @@ from elevator_saga.core.models import (
FloorState,
PassengerInfo,
PassengerStatus,
PerformanceMetrics,
SerializableModel,
SimulationEvent,
SimulationState,
@@ -89,19 +90,6 @@ def json_response(data: Any, status: int = 200) -> Response | tuple[Response, in
return response, status
@dataclass
class MetricsResponse(SerializableModel):
"""性能指标响应"""
done: int
total: int
avg_wait: float
p95_wait: float
avg_system: float
p95_system: float
energy_total: float
@dataclass
class PassengerSummary(SerializableModel):
"""乘客摘要"""
@@ -120,7 +108,7 @@ class SimulationStateResponse(SerializableModel):
elevators: List[ElevatorState]
floors: List[FloorState]
passengers: Dict[int, PassengerInfo]
metrics: MetricsResponse
metrics: PerformanceMetrics
class ElevatorSimulation:
@@ -192,6 +180,14 @@ class ElevatorSimulation:
building_config["elevators"], building_config["floors"], building_config["elevator_capacity"]
)
self.reset()
# 设置电梯能耗率
elevator_energy_rates = building_config.get("elevator_energy_rates", [1.0] * building_config["elevators"])
for i, elevator in enumerate(self.state.elevators):
if i < len(elevator_energy_rates):
elevator.energy_rate = elevator_energy_rates[i]
server_debug_log(f"电梯 E{elevator.id} 能耗率设置为: {elevator.energy_rate}")
self.max_duration_ticks = building_config["duration"]
traffic_data: list[Dict[str, Any]] = file_data["traffic"]
traffic_data.sort(key=lambda t: cast(int, t["tick"]))
@@ -286,44 +282,42 @@ class ElevatorSimulation:
# 2. Move elevators
self._move_elevators()
# 3. Process elevator stops and passenger boarding/alighting
# 3. Process elevator stops and passenger alighting
self._process_elevator_stops()
# Return events generated this tick
return self.state.events[events_start:]
def _process_passenger_in(self) -> None:
for elevator in self.elevators:
current_floor = elevator.current_floor
# 处于Stopped状态方向也已经清空说明没有调度。
floor = self.floors[current_floor]
passengers_to_board: List[int] = []
available_capacity = elevator.max_capacity - len(elevator.passengers)
# Board passengers going up (if up indicator is on or no direction set)
if elevator.target_floor_direction == Direction.UP:
passengers_to_board.extend(floor.up_queue[:available_capacity])
floor.up_queue = floor.up_queue[available_capacity:]
def _process_passenger_in(self, elevator: ElevatorState) -> None:
current_floor = elevator.current_floor
# 处于Stopped状态方向也已经清空说明没有调度。
floor = self.floors[current_floor]
passengers_to_board: List[int] = []
available_capacity = elevator.max_capacity - len(elevator.passengers)
# Board passengers going up (if up indicator is on or no direction set)
if elevator.target_floor_direction == Direction.UP:
passengers_to_board.extend(floor.up_queue[:available_capacity])
floor.up_queue = floor.up_queue[available_capacity:]
# Board passengers going down (if down indicator is on or no direction set)
if elevator.target_floor_direction == Direction.DOWN:
passengers_to_board.extend(floor.down_queue[:available_capacity])
floor.down_queue = floor.down_queue[available_capacity:]
# Board passengers going down (if down indicator is on or no direction set)
if elevator.target_floor_direction == Direction.DOWN:
passengers_to_board.extend(floor.down_queue[:available_capacity])
floor.down_queue = floor.down_queue[available_capacity:]
# Process boarding
for passenger_id in passengers_to_board:
passenger = self.passengers[passenger_id]
passenger.pickup_tick = self.tick
passenger.elevator_id = elevator.id
elevator.passengers.append(passenger_id)
self._emit_event(
EventType.PASSENGER_BOARD,
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
)
# Process boarding
for passenger_id in passengers_to_board:
passenger = self.passengers[passenger_id]
passenger.pickup_tick = self.tick
passenger.elevator_id = elevator.id
elevator.passengers.append(passenger_id)
self._emit_event(
EventType.PASSENGER_BOARD,
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
)
def _update_elevator_status(self) -> None:
"""更新电梯运行状态"""
for elevator in self.elevators:
current_floor = elevator.position.current_floor
target_floor = elevator.target_floor
old_status = elevator.run_status.value
# 没有移动方向,说明电梯已经到达目标楼层
@@ -331,7 +325,7 @@ class ElevatorSimulation:
if elevator.next_target_floor is not None:
self._set_elevator_target_floor(elevator, elevator.next_target_floor)
self._process_passenger_in()
self._process_passenger_in(elevator)
elevator.next_target_floor = None
else:
continue
@@ -391,14 +385,32 @@ class ElevatorSimulation:
# 根据状态和方向调整移动距离
elevator.last_tick_direction = elevator.target_floor_direction
old_position = elevator.position.current_floor_float
if elevator.target_floor_direction == Direction.UP:
new_floor = elevator.position.floor_up_position_add(movement_speed)
# 电梯移动时增加能耗每tick增加电梯的能耗率
elevator.energy_consumed += elevator.energy_rate
elif elevator.target_floor_direction == Direction.DOWN:
new_floor = elevator.position.floor_up_position_add(-movement_speed)
# 电梯移动时增加能耗每tick增加电梯的能耗率
elevator.energy_consumed += elevator.energy_rate
else:
# 之前的状态已经是到站了,清空上一次到站的方向
pass
# 发送电梯移动事件
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
},
)
# 移动后检测是否即将到站,从匀速状态切换到减速
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
# 检查是否需要开始减速这里加速减速设置路程为1匀速路程为2这样能够保证不会匀速恰好到达必须加减速
@@ -411,7 +423,7 @@ class ElevatorSimulation:
EventType.ELEVATOR_APPROACHING,
{
"elevator": elevator.id,
"floor": elevator.target_floor,
"floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value,
},
)
@@ -435,7 +447,6 @@ class ElevatorSimulation:
self._emit_event(
EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor, "reason": "move_reached"}
)
# elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5
def _process_elevator_stops(self) -> None:
"""
@@ -457,6 +468,7 @@ class ElevatorSimulation:
passenger = self.passengers[passenger_id]
if passenger.destination == current_floor:
passenger.dropoff_tick = self.tick
passenger.arrived = True
passengers_to_remove.append(passenger_id)
# Remove passengers who alighted
@@ -478,7 +490,6 @@ class ElevatorSimulation:
[SERVER-DEBUG] 电梯 E0 被设定为前往 F1
说明电梯处于stop状态这个tick直接采用下一个目的地运行了
"""
original_target_floor = elevator.target_floor
elevator.position.target_floor = floor
server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}")
new_target_floor_should_accel = self._should_start_deceleration(elevator)
@@ -547,41 +558,51 @@ class ElevatorSimulation:
metrics=metrics,
)
def _calculate_metrics(self) -> MetricsResponse:
def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics"""
# 直接从state中筛选已完成的乘客
completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED]
total_passengers = len(self.state.passengers)
# 计算总能耗
total_energy = sum(elevator.energy_consumed for elevator in self.state.elevators)
if not completed:
return MetricsResponse(
done=0,
total=total_passengers,
avg_wait=0,
p95_wait=0,
avg_system=0,
p95_system=0,
energy_total=sum(e.energy_consumed for e in self.elevators),
return PerformanceMetrics(
completed_passengers=0,
total_passengers=total_passengers,
average_floor_wait_time=0,
p95_floor_wait_time=0,
average_arrival_wait_time=0,
p95_arrival_wait_time=0,
total_energy_consumption=total_energy,
)
wait_times = [float(p.wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed]
floor_wait_times = [float(p.floor_wait_time) for p in completed]
arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
def percentile(data: List[float], p: int) -> float:
def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
"""计算排除掉最长的指定百分比后的平均值"""
if not data:
return 0.0
sorted_data = sorted(data)
index = int(len(sorted_data) * p / 100)
return sorted_data[min(index, len(sorted_data) - 1)]
# 计算要保留的数据数量(排除掉最长的 exclude_percent
keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
if keep_count == 0:
return 0.0
# 只保留前 keep_count 个数据,排除最长的部分
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return MetricsResponse(
done=len(completed),
total=total_passengers,
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0,
p95_wait=percentile(wait_times, 95),
avg_system=sum(system_times) / len(system_times) if system_times else 0,
p95_system=percentile(system_times, 95),
energy_total=sum(e.energy_consumed for e in self.elevators),
return PerformanceMetrics(
completed_passengers=len(completed),
total_passengers=total_passengers,
average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
total_energy_consumption=total_energy,
)
def get_events(self, since_tick: int = 0) -> List[SimulationEvent]:

View File

@@ -290,7 +290,7 @@ def generate_fire_evacuation_traffic(
for floor in range(1, floors):
# 每层随机数量的人需要疏散
num_people = random.randint(people_per_floor[0], people_per_floor[1])
for i in range(num_people):
for _ in range(num_people):
# 在10个tick内陆续到达模拟疏散的紧急性
arrival_tick = alarm_tick + random.randint(0, min(10, duration - alarm_tick - 1))
if arrival_tick < duration:
@@ -791,10 +791,12 @@ def generate_traffic_file(scenario: str, output_file: str, scale: Optional[str]
traffic_data = generator_func(**generator_params)
# 准备building配置
num_elevators = params["elevators"]
building_config = {
"floors": params["floors"],
"elevators": params["elevators"],
"elevators": num_elevators,
"elevator_capacity": params["elevator_capacity"],
"elevator_energy_rates": [1.0] * num_elevators, # 每台电梯的能耗率默认为1.0
"scenario": scenario,
"scale": scale,
"description": f"{config['description']} ({scale}规模)",
@@ -835,7 +837,7 @@ def generate_scaled_traffic_files(
if custom_building:
floors = custom_building.get("floors", BUILDING_SCALES[scale]["floors"][0])
elevators = custom_building.get("elevators", BUILDING_SCALES[scale]["elevators"][0])
elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0])
_elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0])
# 重新确定规模
detected_scale = determine_building_scale(floors, elevators)