mirror of
https://github.com/ZGCA-Forge/Elevator.git
synced 2025-12-14 13:04:41 +00:00
Feat: add energy rate for elevators
This commit is contained in:
@@ -211,6 +211,7 @@ class ElevatorState(SerializableModel):
|
||||
indicators: ElevatorIndicators = field(default_factory=ElevatorIndicators)
|
||||
passenger_destinations: Dict[int, int] = field(default_factory=dict) # 乘客ID -> 目的地楼层映射
|
||||
energy_consumed: float = 0.0
|
||||
energy_rate: float = 1.0 # 能耗率:每tick消耗的能量单位
|
||||
last_update_tick: int = 0
|
||||
|
||||
@property
|
||||
@@ -337,7 +338,7 @@ class PerformanceMetrics(SerializableModel):
|
||||
p95_floor_wait_time: float = 0.0
|
||||
average_arrival_wait_time: float = 0.0
|
||||
p95_arrival_wait_time: float = 0.0
|
||||
# total_energy_consumption: float = 0.0
|
||||
total_energy_consumption: float = 0.0
|
||||
|
||||
@property
|
||||
def completion_rate(self) -> float:
|
||||
@@ -346,13 +347,6 @@ class PerformanceMetrics(SerializableModel):
|
||||
return 0.0
|
||||
return self.completed_passengers / self.total_passengers
|
||||
|
||||
# @property
|
||||
# def energy_per_passenger(self) -> float:
|
||||
# """每位乘客能耗"""
|
||||
# if self.completed_passengers == 0:
|
||||
# return 0.0
|
||||
# return self.total_energy_consumption / self.completed_passengers
|
||||
|
||||
|
||||
@dataclass
|
||||
class SimulationState(SerializableModel):
|
||||
|
||||
@@ -180,6 +180,14 @@ class ElevatorSimulation:
|
||||
building_config["elevators"], building_config["floors"], building_config["elevator_capacity"]
|
||||
)
|
||||
self.reset()
|
||||
|
||||
# 设置电梯能耗率
|
||||
elevator_energy_rates = building_config.get("elevator_energy_rates", [1.0] * building_config["elevators"])
|
||||
for i, elevator in enumerate(self.state.elevators):
|
||||
if i < len(elevator_energy_rates):
|
||||
elevator.energy_rate = elevator_energy_rates[i]
|
||||
server_debug_log(f"电梯 E{elevator.id} 能耗率设置为: {elevator.energy_rate}")
|
||||
|
||||
self.max_duration_ticks = building_config["duration"]
|
||||
traffic_data: list[Dict[str, Any]] = file_data["traffic"]
|
||||
traffic_data.sort(key=lambda t: cast(int, t["tick"]))
|
||||
@@ -380,8 +388,12 @@ class ElevatorSimulation:
|
||||
old_position = elevator.position.current_floor_float
|
||||
if elevator.target_floor_direction == Direction.UP:
|
||||
new_floor = elevator.position.floor_up_position_add(movement_speed)
|
||||
# 电梯移动时增加能耗,每tick增加电梯的能耗率
|
||||
elevator.energy_consumed += elevator.energy_rate
|
||||
elif elevator.target_floor_direction == Direction.DOWN:
|
||||
new_floor = elevator.position.floor_up_position_add(-movement_speed)
|
||||
# 电梯移动时增加能耗,每tick增加电梯的能耗率
|
||||
elevator.energy_consumed += elevator.energy_rate
|
||||
else:
|
||||
# 之前的状态已经是到站了,清空上一次到站的方向
|
||||
pass
|
||||
@@ -435,7 +447,6 @@ class ElevatorSimulation:
|
||||
self._emit_event(
|
||||
EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor, "reason": "move_reached"}
|
||||
)
|
||||
# elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5
|
||||
|
||||
def _process_elevator_stops(self) -> None:
|
||||
"""
|
||||
@@ -553,6 +564,10 @@ class ElevatorSimulation:
|
||||
completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED]
|
||||
|
||||
total_passengers = len(self.state.passengers)
|
||||
|
||||
# 计算总能耗
|
||||
total_energy = sum(elevator.energy_consumed for elevator in self.state.elevators)
|
||||
|
||||
if not completed:
|
||||
return PerformanceMetrics(
|
||||
completed_passengers=0,
|
||||
@@ -561,6 +576,7 @@ class ElevatorSimulation:
|
||||
p95_floor_wait_time=0,
|
||||
average_arrival_wait_time=0,
|
||||
p95_arrival_wait_time=0,
|
||||
total_energy_consumption=total_energy,
|
||||
)
|
||||
|
||||
floor_wait_times = [float(p.floor_wait_time) for p in completed]
|
||||
@@ -586,6 +602,7 @@ class ElevatorSimulation:
|
||||
p95_floor_wait_time=average_excluding_top_percent(floor_wait_times, 5),
|
||||
average_arrival_wait_time=sum(arrival_wait_times) / len(arrival_wait_times) if arrival_wait_times else 0,
|
||||
p95_arrival_wait_time=average_excluding_top_percent(arrival_wait_times, 5),
|
||||
total_energy_consumption=total_energy,
|
||||
)
|
||||
|
||||
def get_events(self, since_tick: int = 0) -> List[SimulationEvent]:
|
||||
|
||||
@@ -290,7 +290,7 @@ def generate_fire_evacuation_traffic(
|
||||
for floor in range(1, floors):
|
||||
# 每层随机数量的人需要疏散
|
||||
num_people = random.randint(people_per_floor[0], people_per_floor[1])
|
||||
for i in range(num_people):
|
||||
for _ in range(num_people):
|
||||
# 在10个tick内陆续到达,模拟疏散的紧急性
|
||||
arrival_tick = alarm_tick + random.randint(0, min(10, duration - alarm_tick - 1))
|
||||
if arrival_tick < duration:
|
||||
@@ -791,10 +791,12 @@ def generate_traffic_file(scenario: str, output_file: str, scale: Optional[str]
|
||||
traffic_data = generator_func(**generator_params)
|
||||
|
||||
# 准备building配置
|
||||
num_elevators = params["elevators"]
|
||||
building_config = {
|
||||
"floors": params["floors"],
|
||||
"elevators": params["elevators"],
|
||||
"elevators": num_elevators,
|
||||
"elevator_capacity": params["elevator_capacity"],
|
||||
"elevator_energy_rates": [1.0] * num_elevators, # 每台电梯的能耗率,默认为1.0
|
||||
"scenario": scenario,
|
||||
"scale": scale,
|
||||
"description": f"{config['description']} ({scale}规模)",
|
||||
@@ -835,7 +837,7 @@ def generate_scaled_traffic_files(
|
||||
if custom_building:
|
||||
floors = custom_building.get("floors", BUILDING_SCALES[scale]["floors"][0])
|
||||
elevators = custom_building.get("elevators", BUILDING_SCALES[scale]["elevators"][0])
|
||||
elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0])
|
||||
_elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0])
|
||||
|
||||
# 重新确定规模
|
||||
detected_scale = determine_building_scale(floors, elevators)
|
||||
|
||||
Reference in New Issue
Block a user