7 Commits

Author SHA1 Message Date
Xuwznln
1031e677e1 Bump version: 0.0.6 → 0.0.7 2025-10-09 16:49:15 +08:00
Xuwznln
889d554f19 Fix: remove abstractmethod decroation for on_elevator_move 2025-10-09 16:49:07 +08:00
Xuwznln
ee3c4bab7e Bump version: 0.0.5 → 0.0.6 2025-10-09 16:41:04 +08:00
Xuwznln
99524eee3d Add: elevator move event 2025-10-09 16:40:51 +08:00
Xuwznln
b2d03b2510 Bump version: 0.0.4 → 0.0.5 2025-10-06 15:16:47 +08:00
Xuwznln
1a8063e4fd fix performance calculation. fix floor error in approaching event. fix passenger board wrongly. 2025-10-06 15:16:35 +08:00
Xuwznln
692b853101 Update bug report template 2025-10-06 14:36:53 +08:00
13 changed files with 238 additions and 122 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion] [bumpversion]
current_version = 0.0.4 current_version = 0.0.7
commit = True commit = True
tag = True tag = True
tag_name = v{new_version} tag_name = v{new_version}

View File

@@ -0,0 +1,30 @@
---
name: Bug Report
about: Report a bug in the project
title: "[BUG] "
labels: bug
assignees: ""
---
**Version Information**
- elevator-py version: [e.g. v0.0.5]
- Python version: [e.g. 3.11]
**Bug Description**
A clear and concise description of the bug
**Steps to Reproduce**
1.
2.
3.
**Expected Behavior**
What you expected to happen
**Actual Behavior**
What actually happened
**Additional Context (Optional)**
Any other information that might help, such as error logs, screenshots, etc.

View File

@@ -176,7 +176,7 @@ Dynamic proxy for ``PassengerInfo`` that provides access to passenger informatio
if passenger.status == PassengerStatus.IN_ELEVATOR: if passenger.status == PassengerStatus.IN_ELEVATOR:
print(f"In elevator {passenger.elevator_id}") print(f"In elevator {passenger.elevator_id}")
print(f"Waited {passenger.wait_time} ticks") print(f"Waited {passenger.floor_wait_time} ticks")
Read-Only Protection Read-Only Protection
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
@@ -284,6 +284,7 @@ The controller provides these event handlers:
- ``on_passenger_alight(elevator, passenger, floor)``: Passenger alights - ``on_passenger_alight(elevator, passenger, floor)``: Passenger alights
- ``on_elevator_passing_floor(elevator, floor, direction)``: Elevator passes floor - ``on_elevator_passing_floor(elevator, floor, direction)``: Elevator passes floor
- ``on_elevator_approaching(elevator, floor, direction)``: Elevator about to arrive - ``on_elevator_approaching(elevator, floor, direction)``: Elevator about to arrive
- ``on_elevator_move(elevator, from_position, to_position, direction, status)``: Elevator moves
Complete Example Complete Example
---------------- ----------------

View File

@@ -202,7 +202,7 @@ Event System
Event Types Event Types
~~~~~~~~~~~ ~~~~~~~~~~~
The simulation generates 8 types of events defined in ``EventType`` enum: The simulation generates 9 types of events defined in ``EventType`` enum:
.. code-block:: python .. code-block:: python
@@ -215,6 +215,7 @@ The simulation generates 8 types of events defined in ``EventType`` enum:
IDLE = "idle" IDLE = "idle"
PASSENGER_BOARD = "passenger_board" PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight" PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move"
Event Generation Event Generation
~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~
@@ -260,6 +261,19 @@ Events are generated during tick processing:
for elevator in self.elevators: for elevator in self.elevators:
# ... movement logic ... # ... movement logic ...
# Elevator moves
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
}
)
# Passing a floor # Passing a floor
if old_floor != new_floor and new_floor != target_floor: if old_floor != new_floor and new_floor != target_floor:
self._emit_event( self._emit_event(
@@ -277,7 +291,7 @@ Events are generated during tick processing:
EventType.ELEVATOR_APPROACHING, EventType.ELEVATOR_APPROACHING,
{ {
"elevator": elevator.id, "elevator": elevator.id,
"floor": elevator.target_floor, "floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value "direction": elevator.target_floor_direction.value
} }
) )
@@ -363,6 +377,14 @@ The ``ElevatorController`` base class automatically routes events to handler met
elevator = self.elevators[event.data["elevator"]] elevator = self.elevators[event.data["elevator"]]
self.on_elevator_idle(elevator) self.on_elevator_idle(elevator)
elif event.type == EventType.ELEVATOR_MOVE:
elevator = self.elevators[event.data["elevator"]]
from_position = event.data["from_position"]
to_position = event.data["to_position"]
direction = event.data["direction"]
status = event.data["status"]
self.on_elevator_move(elevator, from_position, to_position, direction, status)
# ... other event types ... # ... other event types ...
Control Flow: Bus Example Control Flow: Bus Example
@@ -487,28 +509,39 @@ Metrics are calculated from passenger data:
.. code-block:: python .. code-block:: python
def _calculate_metrics(self) -> MetricsResponse: def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics""" """Calculate performance metrics"""
completed = [p for p in self.state.passengers.values() completed = [p for p in self.state.passengers.values()
if p.status == PassengerStatus.COMPLETED] if p.status == PassengerStatus.COMPLETED]
wait_times = [float(p.wait_time) for p in completed] floor_wait_times = [float(p.floor_wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed] arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
return MetricsResponse( def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
done=len(completed), """计算排除掉最长的指定百分比后的平均值"""
total=len(self.state.passengers), if not data:
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0, return 0.0
p95_wait=percentile(wait_times, 95), sorted_data = sorted(data)
avg_system=sum(system_times) / len(system_times) if system_times else 0, keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
p95_system=percentile(system_times, 95), if keep_count == 0:
return 0.0
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return PerformanceMetrics(
completed_passengers=len(completed),
total_passengers=len(self.state.passengers),
average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
) )
Key metrics: Key metrics:
- **Wait time**: ``pickup_tick - arrive_tick`` (how long passenger waited) - **Floor wait time**: ``pickup_tick - arrive_tick`` (在楼层等待的时间,从到达到上电梯)
- **System time**: ``dropoff_tick - arrive_tick`` (total time in system) - **Arrival wait time**: ``dropoff_tick - arrive_tick`` (总等待时间,从到达到下电梯)
- **P95**: 95th percentile (worst-case for most passengers) - **P95 metrics**: 排除掉最长的5%时间后计算剩余95%的平均值
Summary Summary
------- -------

View File

@@ -257,10 +257,10 @@ Tracks simulation performance:
class PerformanceMetrics(SerializableModel): class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0 completed_passengers: int = 0
total_passengers: int = 0 total_passengers: int = 0
average_wait_time: float = 0.0 average_floor_wait_time: float = 0.0
p95_wait_time: float = 0.0 # 95th percentile p95_floor_wait_time: float = 0.0 # 95th percentile
average_system_time: float = 0.0 average_arrival_wait_time: float = 0.0
p95_system_time: float = 0.0 # 95th percentile p95_arrival_wait_time: float = 0.0 # 95th percentile
Properties: Properties:

View File

@@ -6,5 +6,5 @@ A Python implementation of the Elevator Saga game with event-driven architecture
realistic elevator dispatch algorithm development and testing. realistic elevator dispatch algorithm development and testing.
""" """
__version__ = "0.0.4" __version__ = "0.0.7"
__author__ = "ZGCA Team" __author__ = "ZGCA Team"

View File

@@ -6,7 +6,7 @@ Unified API Client for Elevator Saga
import json import json
import urllib.error import urllib.error
import urllib.request import urllib.request
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional
from elevator_saga.core.models import ( from elevator_saga.core.models import (
ElevatorState, ElevatorState,
@@ -63,16 +63,8 @@ class ElevatorAPIClient:
# 使用服务端返回的metrics数据 # 使用服务端返回的metrics数据
metrics_data = response_data.get("metrics", {}) metrics_data = response_data.get("metrics", {})
if metrics_data: if metrics_data:
# 转换为PerformanceMetrics格式 # 直接从字典创建PerformanceMetrics对象
metrics = PerformanceMetrics( metrics = PerformanceMetrics.from_dict(metrics_data)
completed_passengers=metrics_data.get("done", 0),
total_passengers=metrics_data.get("total", 0),
average_wait_time=metrics_data.get("avg_wait", 0),
p95_wait_time=metrics_data.get("p95_wait", 0),
average_system_time=metrics_data.get("avg_system", 0),
p95_system_time=metrics_data.get("p95_system", 0),
# total_energy_consumption=metrics_data.get("energy_total", 0),
)
else: else:
metrics = PerformanceMetrics() metrics = PerformanceMetrics()
@@ -131,7 +123,7 @@ class ElevatorAPIClient:
else: else:
raise RuntimeError(f"Step failed: {response_data.get('error')}") raise RuntimeError(f"Step failed: {response_data.get('error')}")
def send_elevator_command(self, command: Union[GoToFloorCommand]) -> bool: def send_elevator_command(self, command: GoToFloorCommand) -> bool:
"""发送电梯命令""" """发送电梯命令"""
endpoint = self._get_elevator_endpoint(command) endpoint = self._get_elevator_endpoint(command)
debug_log( debug_log(
@@ -156,7 +148,7 @@ class ElevatorAPIClient:
debug_log(f"Go to floor failed: {e}") debug_log(f"Go to floor failed: {e}")
return False return False
def _get_elevator_endpoint(self, command: Union[GoToFloorCommand]) -> str: def _get_elevator_endpoint(self, command: GoToFloorCommand) -> str:
"""获取电梯命令端点""" """获取电梯命令端点"""
base = f"/api/elevators/{command.elevator_id}" base = f"/api/elevators/{command.elevator_id}"

View File

@@ -7,7 +7,7 @@ import os
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pprint import pprint from pprint import pprint
from typing import Any, Dict, List, Optional from typing import Any, Dict, List
from elevator_saga.client.api_client import ElevatorAPIClient from elevator_saga.client.api_client import ElevatorAPIClient
from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger
@@ -171,6 +171,22 @@ class ElevatorController(ABC):
""" """
pass pass
# @abstractmethod 为了兼容性暂不强制要求elevator_move必须实现
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调 - 可选实现
Args:
elevator: 电梯代理对象
from_position: 起始位置(浮点数表示楼层)
to_position: 目标位置(浮点数表示楼层)
direction: 移动方向
status: 电梯运行状态
"""
pass
def _internal_init(self, elevators: List[Any], floors: List[Any]) -> None: def _internal_init(self, elevators: List[Any], floors: List[Any]) -> None:
"""内部初始化方法""" """内部初始化方法"""
self.elevators = elevators self.elevators = elevators
@@ -218,7 +234,7 @@ class ElevatorController(ABC):
# 获取初始状态并初始化默认从0开始 # 获取初始状态并初始化默认从0开始
try: try:
state = self.api_client.get_state() state = self.api_client.get_state()
except ConnectionResetError as ex: except ConnectionResetError as _: # noqa: F841
print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}") print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}")
os._exit(1) os._exit(1)
if state.tick > 0: if state.tick > 0:
@@ -352,9 +368,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None: if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client) elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client) floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用 self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction)
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction_str)
elif event.type == EventType.ELEVATOR_APPROACHING: elif event.type == EventType.ELEVATOR_APPROACHING:
elevator_id = event.data.get("elevator") elevator_id = event.data.get("elevator")
@@ -363,9 +377,7 @@ class ElevatorController(ABC):
if elevator_id is not None and floor_id is not None and direction is not None: if elevator_id is not None and floor_id is not None and direction is not None:
elevator_proxy = ProxyElevator(elevator_id, self.api_client) elevator_proxy = ProxyElevator(elevator_id, self.api_client)
floor_proxy = ProxyFloor(floor_id, self.api_client) floor_proxy = ProxyFloor(floor_id, self.api_client)
# 服务端发送的direction是字符串直接使用 self.on_elevator_approaching(elevator_proxy, floor_proxy, direction)
direction_str = direction if isinstance(direction, str) else direction.value
self.on_elevator_approaching(elevator_proxy, floor_proxy, direction_str)
elif event.type == EventType.PASSENGER_BOARD: elif event.type == EventType.PASSENGER_BOARD:
elevator_id = event.data.get("elevator") elevator_id = event.data.get("elevator")
@@ -385,6 +397,22 @@ class ElevatorController(ABC):
floor_proxy = ProxyFloor(floor_id, self.api_client) floor_proxy = ProxyFloor(floor_id, self.api_client)
self.on_passenger_alight(elevator_proxy, passenger_proxy, floor_proxy) self.on_passenger_alight(elevator_proxy, passenger_proxy, floor_proxy)
elif event.type == EventType.ELEVATOR_MOVE:
elevator_id = event.data.get("elevator")
from_position = event.data.get("from_position")
to_position = event.data.get("to_position")
direction = event.data.get("direction")
status = event.data.get("status")
if (
elevator_id is not None
and from_position is not None
and to_position is not None
and direction is not None
and status is not None
):
elevator_proxy = ProxyElevator(elevator_id, self.api_client)
self.on_elevator_move(elevator_proxy, from_position, to_position, direction, status)
def _reset_and_reinit(self) -> None: def _reset_and_reinit(self) -> None:
"""重置并重新初始化""" """重置并重新初始化"""
try: try:

View File

@@ -41,6 +41,7 @@ class ElevatorBusExampleController(ElevatorController):
def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None: def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str) -> None:
self.all_passengers.append(passenger) self.all_passengers.append(passenger)
print(f"乘客 {passenger.id} F{floor.floor} 请求 {passenger.origin} -> {passenger.destination} ({direction})")
pass pass
def on_elevator_idle(self, elevator: ProxyElevator) -> None: def on_elevator_idle(self, elevator: ProxyElevator) -> None:
@@ -60,10 +61,10 @@ class ElevatorBusExampleController(ElevatorController):
elevator.go_to_floor(elevator.current_floor - 1) elevator.go_to_floor(elevator.current_floor - 1)
def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None: def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None:
pass print(f" 乘客{passenger.id} E{elevator.id}⬆️ F{elevator.current_floor} -> F{passenger.destination}")
def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None: def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None:
pass print(f" 乘客{passenger.id} E{elevator.id}⬇️ F{floor.floor}")
def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None: def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass pass
@@ -71,6 +72,11 @@ class ElevatorBusExampleController(ElevatorController):
def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None: def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None:
pass pass
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
pass
if __name__ == "__main__": if __name__ == "__main__":
algorithm = ElevatorBusExampleController() algorithm = ElevatorBusExampleController()

View File

@@ -139,6 +139,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True) elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}") print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__": if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True) algorithm = ElevatorBusController(debug=True)

View File

@@ -8,7 +8,7 @@ import uuid
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
# 类型变量 # 类型变量
T = TypeVar("T", bound="SerializableModel") T = TypeVar("T", bound="SerializableModel")
@@ -55,6 +55,7 @@ class EventType(Enum):
IDLE = "idle" IDLE = "idle"
PASSENGER_BOARD = "passenger_board" PASSENGER_BOARD = "passenger_board"
PASSENGER_ALIGHT = "passenger_alight" PASSENGER_ALIGHT = "passenger_alight"
ELEVATOR_MOVE = "elevator_move" # 电梯移动事件
class SerializableModel: class SerializableModel:
@@ -113,7 +114,7 @@ class Position(SerializableModel):
@property @property
def current_floor_float(self) -> float: def current_floor_float(self) -> float:
return self.current_floor + self.floor_up_position / 10 return round(self.current_floor + self.floor_up_position / 10, 1)
def floor_up_position_add(self, num: int) -> int: def floor_up_position_add(self, num: int) -> int:
self.floor_up_position += num self.floor_up_position += num
@@ -174,13 +175,13 @@ class PassengerInfo(SerializableModel):
return PassengerStatus.WAITING return PassengerStatus.WAITING
@property @property
def wait_time(self) -> int: def floor_wait_time(self) -> int:
"""等待时间""" """在楼层等待时间(从到达到上电梯)"""
return self.pickup_tick - self.arrive_tick return self.pickup_tick - self.arrive_tick
@property @property
def system_time(self) -> int: def arrival_wait_time(self) -> int:
"""系统时间(总时间""" """总等待时间(从到达到下电梯"""
return self.dropoff_tick - self.arrive_tick return self.dropoff_tick - self.arrive_tick
@property @property
@@ -331,10 +332,10 @@ class PerformanceMetrics(SerializableModel):
completed_passengers: int = 0 completed_passengers: int = 0
total_passengers: int = 0 total_passengers: int = 0
average_wait_time: float = 0.0 average_floor_wait_time: float = 0.0
p95_wait_time: float = 0.0 p95_floor_wait_time: float = 0.0
average_system_time: float = 0.0 average_arrival_wait_time: float = 0.0
p95_system_time: float = 0.0 p95_arrival_wait_time: float = 0.0
# total_energy_consumption: float = 0.0 # total_energy_consumption: float = 0.0
@property @property

View File

@@ -138,6 +138,17 @@ class ElevatorBusController(ElevatorController):
elevator.go_to_floor(elevator.target_floor + 1, immediate=True) elevator.go_to_floor(elevator.target_floor + 1, immediate=True)
print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}") print(f" 不让0号电梯上行停站设定新目标楼层 {elevator.target_floor + 1}")
def on_elevator_move(
self, elevator: ProxyElevator, from_position: float, to_position: float, direction: str, status: str
) -> None:
"""
电梯移动时的回调
可以在这里记录电梯移动信息,用于调试或性能分析
"""
# 取消注释以显示电梯移动信息
# print(f"🚀 电梯 E{elevator.id} 移动: {from_position:.1f} -> {to_position:.1f} ({direction}, {status})")
pass
if __name__ == "__main__": if __name__ == "__main__":
algorithm = ElevatorBusController(debug=True) algorithm = ElevatorBusController(debug=True)

View File

@@ -10,7 +10,7 @@ import threading
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, cast from typing import Any, Dict, List, cast
from flask import Flask, Response, request from flask import Flask, Response, request
@@ -22,6 +22,7 @@ from elevator_saga.core.models import (
FloorState, FloorState,
PassengerInfo, PassengerInfo,
PassengerStatus, PassengerStatus,
PerformanceMetrics,
SerializableModel, SerializableModel,
SimulationEvent, SimulationEvent,
SimulationState, SimulationState,
@@ -89,19 +90,6 @@ def json_response(data: Any, status: int = 200) -> Response | tuple[Response, in
return response, status return response, status
@dataclass
class MetricsResponse(SerializableModel):
"""性能指标响应"""
done: int
total: int
avg_wait: float
p95_wait: float
avg_system: float
p95_system: float
energy_total: float
@dataclass @dataclass
class PassengerSummary(SerializableModel): class PassengerSummary(SerializableModel):
"""乘客摘要""" """乘客摘要"""
@@ -120,7 +108,7 @@ class SimulationStateResponse(SerializableModel):
elevators: List[ElevatorState] elevators: List[ElevatorState]
floors: List[FloorState] floors: List[FloorState]
passengers: Dict[int, PassengerInfo] passengers: Dict[int, PassengerInfo]
metrics: MetricsResponse metrics: PerformanceMetrics
class ElevatorSimulation: class ElevatorSimulation:
@@ -286,44 +274,42 @@ class ElevatorSimulation:
# 2. Move elevators # 2. Move elevators
self._move_elevators() self._move_elevators()
# 3. Process elevator stops and passenger boarding/alighting # 3. Process elevator stops and passenger alighting
self._process_elevator_stops() self._process_elevator_stops()
# Return events generated this tick # Return events generated this tick
return self.state.events[events_start:] return self.state.events[events_start:]
def _process_passenger_in(self) -> None: def _process_passenger_in(self, elevator: ElevatorState) -> None:
for elevator in self.elevators: current_floor = elevator.current_floor
current_floor = elevator.current_floor # 处于Stopped状态方向也已经清空说明没有调度。
# 处于Stopped状态方向也已经清空说明没有调度。 floor = self.floors[current_floor]
floor = self.floors[current_floor] passengers_to_board: List[int] = []
passengers_to_board: List[int] = [] available_capacity = elevator.max_capacity - len(elevator.passengers)
available_capacity = elevator.max_capacity - len(elevator.passengers) # Board passengers going up (if up indicator is on or no direction set)
# Board passengers going up (if up indicator is on or no direction set) if elevator.target_floor_direction == Direction.UP:
if elevator.target_floor_direction == Direction.UP: passengers_to_board.extend(floor.up_queue[:available_capacity])
passengers_to_board.extend(floor.up_queue[:available_capacity]) floor.up_queue = floor.up_queue[available_capacity:]
floor.up_queue = floor.up_queue[available_capacity:]
# Board passengers going down (if down indicator is on or no direction set) # Board passengers going down (if down indicator is on or no direction set)
if elevator.target_floor_direction == Direction.DOWN: if elevator.target_floor_direction == Direction.DOWN:
passengers_to_board.extend(floor.down_queue[:available_capacity]) passengers_to_board.extend(floor.down_queue[:available_capacity])
floor.down_queue = floor.down_queue[available_capacity:] floor.down_queue = floor.down_queue[available_capacity:]
# Process boarding # Process boarding
for passenger_id in passengers_to_board: for passenger_id in passengers_to_board:
passenger = self.passengers[passenger_id] passenger = self.passengers[passenger_id]
passenger.pickup_tick = self.tick passenger.pickup_tick = self.tick
passenger.elevator_id = elevator.id passenger.elevator_id = elevator.id
elevator.passengers.append(passenger_id) elevator.passengers.append(passenger_id)
self._emit_event( self._emit_event(
EventType.PASSENGER_BOARD, EventType.PASSENGER_BOARD,
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id}, {"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
) )
def _update_elevator_status(self) -> None: def _update_elevator_status(self) -> None:
"""更新电梯运行状态""" """更新电梯运行状态"""
for elevator in self.elevators: for elevator in self.elevators:
current_floor = elevator.position.current_floor
target_floor = elevator.target_floor target_floor = elevator.target_floor
old_status = elevator.run_status.value old_status = elevator.run_status.value
# 没有移动方向,说明电梯已经到达目标楼层 # 没有移动方向,说明电梯已经到达目标楼层
@@ -331,7 +317,7 @@ class ElevatorSimulation:
if elevator.next_target_floor is not None: if elevator.next_target_floor is not None:
self._set_elevator_target_floor(elevator, elevator.next_target_floor) self._set_elevator_target_floor(elevator, elevator.next_target_floor)
self._process_passenger_in() self._process_passenger_in(elevator)
elevator.next_target_floor = None elevator.next_target_floor = None
else: else:
continue continue
@@ -391,6 +377,7 @@ class ElevatorSimulation:
# 根据状态和方向调整移动距离 # 根据状态和方向调整移动距离
elevator.last_tick_direction = elevator.target_floor_direction elevator.last_tick_direction = elevator.target_floor_direction
old_position = elevator.position.current_floor_float
if elevator.target_floor_direction == Direction.UP: if elevator.target_floor_direction == Direction.UP:
new_floor = elevator.position.floor_up_position_add(movement_speed) new_floor = elevator.position.floor_up_position_add(movement_speed)
elif elevator.target_floor_direction == Direction.DOWN: elif elevator.target_floor_direction == Direction.DOWN:
@@ -399,6 +386,19 @@ class ElevatorSimulation:
# 之前的状态已经是到站了,清空上一次到站的方向 # 之前的状态已经是到站了,清空上一次到站的方向
pass pass
# 发送电梯移动事件
if elevator.target_floor_direction != Direction.STOPPED:
self._emit_event(
EventType.ELEVATOR_MOVE,
{
"elevator": elevator.id,
"from_position": old_position,
"to_position": elevator.position.current_floor_float,
"direction": elevator.target_floor_direction.value,
"status": elevator.run_status.value,
},
)
# 移动后检测是否即将到站,从匀速状态切换到减速 # 移动后检测是否即将到站,从匀速状态切换到减速
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: if elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
# 检查是否需要开始减速这里加速减速设置路程为1匀速路程为2这样能够保证不会匀速恰好到达必须加减速 # 检查是否需要开始减速这里加速减速设置路程为1匀速路程为2这样能够保证不会匀速恰好到达必须加减速
@@ -411,7 +411,7 @@ class ElevatorSimulation:
EventType.ELEVATOR_APPROACHING, EventType.ELEVATOR_APPROACHING,
{ {
"elevator": elevator.id, "elevator": elevator.id,
"floor": elevator.target_floor, "floor": int(round(elevator.position.current_floor_float)),
"direction": elevator.target_floor_direction.value, "direction": elevator.target_floor_direction.value,
}, },
) )
@@ -478,7 +478,6 @@ class ElevatorSimulation:
[SERVER-DEBUG] 电梯 E0 被设定为前往 F1 [SERVER-DEBUG] 电梯 E0 被设定为前往 F1
说明电梯处于stop状态这个tick直接采用下一个目的地运行了 说明电梯处于stop状态这个tick直接采用下一个目的地运行了
""" """
original_target_floor = elevator.target_floor
elevator.position.target_floor = floor elevator.position.target_floor = floor
server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}") server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}")
new_target_floor_should_accel = self._should_start_deceleration(elevator) new_target_floor_should_accel = self._should_start_deceleration(elevator)
@@ -547,41 +546,45 @@ class ElevatorSimulation:
metrics=metrics, metrics=metrics,
) )
def _calculate_metrics(self) -> MetricsResponse: def _calculate_metrics(self) -> PerformanceMetrics:
"""Calculate performance metrics""" """Calculate performance metrics"""
# 直接从state中筛选已完成的乘客 # 直接从state中筛选已完成的乘客
completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED] completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED]
total_passengers = len(self.state.passengers) total_passengers = len(self.state.passengers)
if not completed: if not completed:
return MetricsResponse( return PerformanceMetrics(
done=0, completed_passengers=0,
total=total_passengers, total_passengers=total_passengers,
avg_wait=0, average_floor_wait_time=0,
p95_wait=0, p95_floor_wait_time=0,
avg_system=0, average_arrival_wait_time=0,
p95_system=0, p95_arrival_wait_time=0,
energy_total=sum(e.energy_consumed for e in self.elevators),
) )
wait_times = [float(p.wait_time) for p in completed] floor_wait_times = [float(p.floor_wait_time) for p in completed]
system_times = [float(p.system_time) for p in completed] arrival_wait_times = [float(p.arrival_wait_time) for p in completed]
def percentile(data: List[float], p: int) -> float: def average_excluding_top_percent(data: List[float], exclude_percent: int) -> float:
"""计算排除掉最长的指定百分比后的平均值"""
if not data: if not data:
return 0.0 return 0.0
sorted_data = sorted(data) sorted_data = sorted(data)
index = int(len(sorted_data) * p / 100) # 计算要保留的数据数量(排除掉最长的 exclude_percent
return sorted_data[min(index, len(sorted_data) - 1)] keep_count = int(len(sorted_data) * (100 - exclude_percent) / 100)
if keep_count == 0:
return 0.0
# 只保留前 keep_count 个数据,排除最长的部分
kept_data = sorted_data[:keep_count]
return sum(kept_data) / len(kept_data)
return MetricsResponse( return PerformanceMetrics(
done=len(completed), completed_passengers=len(completed),
total=total_passengers, total_passengers=total_passengers,
avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0, average_floor_wait_time=sum(floor_wait_times) / len(floor_wait_times) if floor_wait_times else 0,
p95_wait=percentile(wait_times, 95), p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
avg_system=sum(system_times) / len(system_times) if system_times else 0, average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
p95_system=percentile(system_times, 95), p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
energy_total=sum(e.energy_consumed for e in self.elevators),
) )
def get_events(self, since_tick: int = 0) -> List[SimulationEvent]: def get_events(self, since_tick: int = 0) -> List[SimulationEvent]: