mirror of
https://github.com/ZGCA-Forge/Elevator.git
synced 2025-12-16 20:41:04 +00:00
added other examples
This commit is contained in:
@@ -273,8 +273,13 @@ class ElevatorSimulation:
|
||||
return new_events
|
||||
|
||||
def _process_tick(self) -> List[SimulationEvent]:
|
||||
"""Process one simulation tick"""
|
||||
"""
|
||||
Process one simulation tick
|
||||
每个tick先发生事件,再发生动作
|
||||
"""
|
||||
events_start = len(self.state.events)
|
||||
self._update_elevator_status()
|
||||
|
||||
# 1. Add new passengers from traffic queue
|
||||
self._process_arrivals()
|
||||
|
||||
@@ -287,7 +292,65 @@ class ElevatorSimulation:
|
||||
# Return events generated this tick
|
||||
return self.state.events[events_start:]
|
||||
|
||||
def _process_arrivals(self) -> None:
|
||||
def _process_passenger_in(self) -> None:
|
||||
for elevator in self.elevators:
|
||||
current_floor = elevator.current_floor
|
||||
# 处于Stopped状态,方向也已经清空,说明没有调度。
|
||||
floor = self.floors[current_floor]
|
||||
passengers_to_board: List[int] = []
|
||||
available_capacity = elevator.max_capacity - len(elevator.passengers)
|
||||
# Board passengers going up (if up indicator is on or no direction set)
|
||||
if elevator.target_floor_direction == Direction.UP:
|
||||
passengers_to_board.extend(floor.up_queue[:available_capacity])
|
||||
floor.up_queue = floor.up_queue[available_capacity:]
|
||||
|
||||
# Board passengers going down (if down indicator is on or no direction set)
|
||||
if elevator.target_floor_direction == Direction.DOWN:
|
||||
passengers_to_board.extend(floor.down_queue[:available_capacity])
|
||||
floor.down_queue = floor.down_queue[available_capacity:]
|
||||
|
||||
# Process boarding
|
||||
for passenger_id in passengers_to_board:
|
||||
passenger = self.passengers[passenger_id]
|
||||
passenger.pickup_tick = self.tick
|
||||
passenger.elevator_id = elevator.id
|
||||
elevator.passengers.append(passenger_id)
|
||||
self._emit_event(
|
||||
EventType.PASSENGER_BOARD,
|
||||
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
|
||||
)
|
||||
|
||||
def _update_elevator_status(self) -> None:
|
||||
"""更新电梯运行状态"""
|
||||
for elevator in self.elevators:
|
||||
current_floor = elevator.position.current_floor
|
||||
target_floor = elevator.target_floor
|
||||
old_status = elevator.run_status.value
|
||||
# 没有移动方向,说明电梯已经到达目标楼层
|
||||
if elevator.target_floor_direction == Direction.STOPPED:
|
||||
if elevator.next_target_floor is not None:
|
||||
self._set_elevator_target_floor(elevator, elevator.next_target_floor)
|
||||
|
||||
self._process_passenger_in()
|
||||
elevator.next_target_floor = None
|
||||
else:
|
||||
continue
|
||||
# 有移动方向,但是需要启动了
|
||||
if elevator.run_status == ElevatorStatus.STOPPED:
|
||||
# 从停止状态启动 - 注意:START_UP表示启动加速状态,不表示方向
|
||||
# 实际移动方向由target_floor_direction决定
|
||||
elevator.run_status = ElevatorStatus.START_UP
|
||||
# 从启动状态切换到匀速
|
||||
elif elevator.run_status == ElevatorStatus.START_UP:
|
||||
# 从启动状态切换到匀速
|
||||
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
|
||||
server_debug_log(
|
||||
f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} "
|
||||
f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}"
|
||||
)
|
||||
# START_DOWN状态会在到达目标时在_move_elevators中切换为STOPPED
|
||||
|
||||
def _process_arrivals(self) -> None: # OK
|
||||
"""Process new passenger arrivals"""
|
||||
while self.traffic_queue and self.traffic_queue[0].tick <= self.tick:
|
||||
traffic_entry = self.traffic_queue.pop(0)
|
||||
@@ -309,6 +372,130 @@ class ElevatorSimulation:
|
||||
self.floors[passenger.origin].down_queue.append(passenger.id)
|
||||
self._emit_event(EventType.DOWN_BUTTON_PRESSED, {"floor": passenger.origin, "passenger": passenger.id})
|
||||
|
||||
def _move_elevators(self) -> None:
|
||||
"""
|
||||
Move all elevators towards their destinations with acceleration/deceleration
|
||||
上一步已经处理了当前电梯的状态,这里只做移动
|
||||
"""
|
||||
for elevator in self.elevators:
|
||||
target_floor = elevator.target_floor
|
||||
new_floor = old_floor = elevator.position.current_floor
|
||||
# 获取移动速度
|
||||
movement_speed = 0
|
||||
if elevator.run_status == ElevatorStatus.START_UP:
|
||||
movement_speed = 1
|
||||
elif elevator.run_status == ElevatorStatus.START_DOWN:
|
||||
movement_speed = 1
|
||||
elif elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
|
||||
movement_speed = 2
|
||||
if movement_speed == 0:
|
||||
continue
|
||||
|
||||
# 根据状态和方向调整移动距离
|
||||
elevator.last_tick_direction = elevator.target_floor_direction
|
||||
if elevator.target_floor_direction == Direction.UP:
|
||||
new_floor = elevator.position.floor_up_position_add(movement_speed)
|
||||
elif elevator.target_floor_direction == Direction.DOWN:
|
||||
new_floor = elevator.position.floor_up_position_add(-movement_speed)
|
||||
else:
|
||||
# 之前的状态已经是到站了,清空上一次到站的方向
|
||||
pass
|
||||
|
||||
# 移动后检测是否即将到站,从匀速状态切换到减速
|
||||
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
|
||||
# 检查是否需要开始减速,这里加速减速设置路程为1,匀速路程为2,这样能够保证不会匀速恰好到达,必须加减速
|
||||
# 如果速度超出,则预期的逻辑是,恰好到达/超出0等,会强制触发start_down,多走一次才能stop,目前没有实现这部分逻辑
|
||||
if self._should_start_deceleration(elevator):
|
||||
elevator.run_status = ElevatorStatus.START_DOWN
|
||||
# 发送电梯即将经过某层楼事件
|
||||
if self._near_next_stop(elevator):
|
||||
self._emit_event(
|
||||
EventType.ELEVATOR_APPROACHING,
|
||||
{
|
||||
"elevator": elevator.id,
|
||||
"floor": elevator.target_floor,
|
||||
"direction": elevator.target_floor_direction.value,
|
||||
},
|
||||
)
|
||||
|
||||
# 处理楼层变化事件
|
||||
if old_floor != new_floor:
|
||||
if new_floor != target_floor:
|
||||
self._emit_event(
|
||||
EventType.PASSING_FLOOR,
|
||||
{
|
||||
"elevator": elevator.id,
|
||||
"floor": new_floor,
|
||||
"direction": elevator.target_floor_direction.value,
|
||||
},
|
||||
)
|
||||
|
||||
# 检查是否到达目标楼层
|
||||
if target_floor == new_floor and elevator.position.floor_up_position == 0:
|
||||
elevator.run_status = ElevatorStatus.STOPPED
|
||||
# 刚进入Stopped状态,可以通过last_direction识别
|
||||
self._emit_event(EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor, "reason": "move_reached"})
|
||||
# elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5
|
||||
|
||||
def _process_elevator_stops(self) -> None:
|
||||
"""
|
||||
处理Stopped电梯,上下客,新target处理等。
|
||||
"""
|
||||
for elevator in self.elevators:
|
||||
current_floor = elevator.current_floor
|
||||
# 处于Stopped状态,方向也已经清空,说明没有调度。
|
||||
if elevator.last_tick_direction == Direction.STOPPED:
|
||||
self._emit_event(EventType.IDLE, {"elevator": elevator.id, "floor": current_floor})
|
||||
continue
|
||||
# 其他处于STOPPED状态,刚进入stop,到站要进行上下客
|
||||
if not elevator.run_status == ElevatorStatus.STOPPED:
|
||||
continue
|
||||
|
||||
# Let passengers alight
|
||||
passengers_to_remove: List[int] = []
|
||||
for passenger_id in elevator.passengers:
|
||||
passenger = self.passengers[passenger_id]
|
||||
if passenger.destination == current_floor:
|
||||
passenger.dropoff_tick = self.tick
|
||||
passengers_to_remove.append(passenger_id)
|
||||
|
||||
# Remove passengers who alighted
|
||||
for passenger_id in passengers_to_remove:
|
||||
elevator.passengers.remove(passenger_id)
|
||||
self._emit_event(
|
||||
EventType.PASSENGER_ALIGHT,
|
||||
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
|
||||
)
|
||||
# Board waiting passengers (if indicators allow)
|
||||
if elevator.next_target_floor is not None:
|
||||
self._set_elevator_target_floor(elevator, elevator.next_target_floor)
|
||||
elevator.next_target_floor = None
|
||||
|
||||
def _set_elevator_target_floor(self, elevator: ElevatorState, floor: int):
|
||||
"""
|
||||
同一个tick内提示
|
||||
[SERVER-DEBUG] 电梯 E0 下一目的地设定为 F1
|
||||
[SERVER-DEBUG] 电梯 E0 被设定为前往 F1
|
||||
说明电梯处于stop状态,这个tick直接采用下一个目的地运行了
|
||||
"""
|
||||
original_target_floor = elevator.target_floor
|
||||
elevator.position.target_floor = floor
|
||||
server_debug_log(f"电梯 E{elevator.id} 被设定为前往 F{floor}")
|
||||
new_target_floor_should_accel = self._should_start_deceleration(elevator)
|
||||
if not new_target_floor_should_accel:
|
||||
if elevator.run_status == ElevatorStatus.START_DOWN: # 不应该加速但是加了
|
||||
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
|
||||
server_debug_log(f"电梯 E{elevator.id} 被设定为匀速")
|
||||
elif new_target_floor_should_accel:
|
||||
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: # 应该减速了,但是之前是匀速
|
||||
elevator.run_status = ElevatorStatus.START_DOWN
|
||||
server_debug_log(f"电梯 E{elevator.id} 被设定为减速")
|
||||
if elevator.current_floor != floor or elevator.position.floor_up_position != 0:
|
||||
old_status = elevator.run_status.value
|
||||
server_debug_log(
|
||||
f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value}"
|
||||
)
|
||||
|
||||
def _calculate_distance_to_target(self, elevator: ElevatorState) -> float:
|
||||
"""计算到目标楼层的距离(以floor_up_position为单位)"""
|
||||
current_pos = elevator.position.current_floor * 10 + elevator.position.floor_up_position
|
||||
@@ -336,214 +523,18 @@ class ElevatorSimulation:
|
||||
distance = self._calculate_distance_to_near_stop(elevator)
|
||||
return distance == 1
|
||||
|
||||
def _get_movement_speed(self, elevator: ElevatorState) -> int:
|
||||
"""根据电梯状态获取移动速度"""
|
||||
if elevator.run_status == ElevatorStatus.START_UP:
|
||||
return 1
|
||||
elif elevator.run_status == ElevatorStatus.START_DOWN:
|
||||
return 1
|
||||
elif elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
|
||||
return 2
|
||||
else: # STOPPED
|
||||
return 0
|
||||
|
||||
def _update_elevator_status(self, elevator: ElevatorState) -> None:
|
||||
"""更新电梯运行状态"""
|
||||
current_floor = elevator.position.current_floor
|
||||
target_floor = elevator.target_floor
|
||||
|
||||
if current_floor == target_floor and elevator.position.floor_up_position == 0:
|
||||
# 已到达目标楼层
|
||||
elevator.run_status = ElevatorStatus.STOPPED
|
||||
return
|
||||
|
||||
if elevator.run_status == ElevatorStatus.STOPPED:
|
||||
# 从停止状态启动 - 注意:START_UP表示启动加速状态,不表示方向
|
||||
# 实际移动方向由target_floor_direction决定
|
||||
elevator.run_status = ElevatorStatus.START_UP
|
||||
elif elevator.run_status == ElevatorStatus.START_UP:
|
||||
# 从启动状态切换到匀速
|
||||
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
|
||||
elif elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
|
||||
# 检查是否需要开始减速,这里加速减速设置路程为1,匀速路程为2,这样能够保证不会匀速恰好到达,必须加减速
|
||||
# 如果速度超出,则预期的逻辑是,恰好到达/超出0等,会强制触发start_down,多走一次才能stop,目前没有实现这部分逻辑
|
||||
if self._should_start_deceleration(elevator):
|
||||
elevator.run_status = ElevatorStatus.START_DOWN
|
||||
# 发送电梯即将经过某层楼事件
|
||||
if self._near_next_stop(elevator):
|
||||
self._emit_event(
|
||||
EventType.ELEVATOR_APPROACHING,
|
||||
{
|
||||
"elevator": elevator.id,
|
||||
"floor": elevator.target_floor,
|
||||
"direction": elevator.target_floor_direction.value,
|
||||
},
|
||||
)
|
||||
# START_DOWN状态会在到达目标时自动切换为STOPPED
|
||||
|
||||
def _move_elevators(self) -> None:
|
||||
"""Move all elevators towards their destinations with acceleration/deceleration"""
|
||||
for elevator in self.elevators:
|
||||
target_floor = elevator.target_floor
|
||||
current_floor = elevator.position.current_floor
|
||||
|
||||
# 如果已在恰好目标楼层,标记为STOPPED,之后交给_process_elevator_stops处理
|
||||
if target_floor == current_floor:
|
||||
if elevator.next_target_floor is None:
|
||||
continue
|
||||
if elevator.position.floor_up_position == 0:
|
||||
server_debug_log(
|
||||
f"电梯{elevator.id}已在目标楼层,当前{elevator.position.current_floor_float} / 目标{target_floor}"
|
||||
)
|
||||
elevator.run_status = ElevatorStatus.STOPPED
|
||||
continue
|
||||
|
||||
# 获取移动速度
|
||||
movement_speed = self._get_movement_speed(elevator)
|
||||
|
||||
if movement_speed == 0:
|
||||
continue
|
||||
|
||||
# Move towards target
|
||||
old_floor = current_floor
|
||||
|
||||
# 根据状态和方向调整移动速度
|
||||
if elevator.target_floor_direction == Direction.UP:
|
||||
# 向上移动
|
||||
new_floor = elevator.position.floor_up_position_add(movement_speed)
|
||||
else:
|
||||
# 向下移动
|
||||
new_floor = elevator.position.floor_up_position_add(-movement_speed)
|
||||
|
||||
# 更新电梯状态
|
||||
old_status = elevator.run_status.value
|
||||
self._update_elevator_status(elevator)
|
||||
server_debug_log(
|
||||
f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} 速度:{movement_speed} "
|
||||
f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}"
|
||||
)
|
||||
|
||||
# 处理楼层变化事件
|
||||
if old_floor != new_floor:
|
||||
if new_floor != target_floor:
|
||||
self._emit_event(
|
||||
EventType.PASSING_FLOOR,
|
||||
{
|
||||
"elevator": elevator.id,
|
||||
"floor": new_floor,
|
||||
"direction": elevator.target_floor_direction.value,
|
||||
},
|
||||
)
|
||||
|
||||
# 检查是否到达目标楼层
|
||||
if target_floor == new_floor and elevator.position.floor_up_position == 0:
|
||||
elevator.run_status = ElevatorStatus.STOPPED
|
||||
self._emit_event(EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor})
|
||||
elevator.indicators.set_direction(elevator.target_floor_direction) # 抵达目标楼层,说明该取消floor了
|
||||
# elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5
|
||||
|
||||
def _process_elevator_stops(self) -> None:
|
||||
"""Handle passenger boarding and alighting at elevator stops"""
|
||||
for elevator in self.elevators:
|
||||
if not elevator.run_status == ElevatorStatus.STOPPED:
|
||||
continue
|
||||
current_floor = elevator.current_floor
|
||||
|
||||
# Let passengers alight
|
||||
passengers_to_remove: List[int] = []
|
||||
for passenger_id in elevator.passengers:
|
||||
passenger = self.passengers[passenger_id]
|
||||
if passenger.destination == current_floor:
|
||||
passenger.dropoff_tick = self.tick
|
||||
passengers_to_remove.append(passenger_id)
|
||||
|
||||
# Remove passengers who alighted
|
||||
for passenger_id in passengers_to_remove:
|
||||
elevator.passengers.remove(passenger_id)
|
||||
self._emit_event(
|
||||
EventType.PASSENGER_ALIGHT,
|
||||
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
|
||||
)
|
||||
# Board waiting passengers (if indicators allow)
|
||||
floor = self.floors[current_floor]
|
||||
passengers_to_board: List[int] = []
|
||||
if not elevator.indicators.up and not elevator.indicators.down:
|
||||
if elevator.next_target_floor is not None:
|
||||
self._set_elevator_target_floor(elevator, elevator.next_target_floor)
|
||||
elevator.next_target_floor = None
|
||||
elevator.indicators.set_direction(elevator.target_floor_direction)
|
||||
|
||||
# Board passengers going up (if up indicator is on or no direction set)
|
||||
if elevator.indicators.up:
|
||||
available_capacity = elevator.max_capacity - len(elevator.passengers)
|
||||
passengers_to_board.extend(floor.up_queue[:available_capacity])
|
||||
floor.up_queue = floor.up_queue[available_capacity:]
|
||||
|
||||
# Board passengers going down (if down indicator is on or no direction set)
|
||||
if elevator.indicators.down:
|
||||
# 先临时计算长度
|
||||
remaining_capacity = elevator.max_capacity - len(elevator.passengers) - len(passengers_to_board)
|
||||
if remaining_capacity > 0:
|
||||
down_passengers = floor.down_queue[:remaining_capacity]
|
||||
passengers_to_board.extend(down_passengers)
|
||||
floor.down_queue = floor.down_queue[remaining_capacity:]
|
||||
|
||||
# 没有上下指示的时候,触发等待,会消耗一个tick
|
||||
if not elevator.indicators.up and not elevator.indicators.down:
|
||||
self._emit_event(EventType.IDLE, {"elevator": elevator.id, "floor": current_floor})
|
||||
continue
|
||||
# Process boarding
|
||||
for passenger_id in passengers_to_board:
|
||||
passenger = self.passengers[passenger_id]
|
||||
passenger.pickup_tick = self.tick
|
||||
passenger.elevator_id = elevator.id
|
||||
elevator.passengers.append(passenger_id)
|
||||
self._emit_event(
|
||||
EventType.PASSENGER_BOARD,
|
||||
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
|
||||
)
|
||||
|
||||
def _set_elevator_target_floor(self, elevator: ElevatorState, floor: int):
|
||||
original_target_floor = elevator.target_floor
|
||||
elevator.position.target_floor = floor
|
||||
server_debug_log(f"电梯 E{elevator.id} 被设定为立刻前往 F{floor}")
|
||||
new_target_floor_should_accel = self._should_start_deceleration(elevator)
|
||||
if not new_target_floor_should_accel:
|
||||
if elevator.run_status == ElevatorStatus.START_DOWN: # 不应该加速但是加了
|
||||
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
|
||||
server_debug_log(f"电梯 E{elevator.id} 被设定为匀速")
|
||||
elif new_target_floor_should_accel:
|
||||
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: # 应该减速了,但是之前是匀速
|
||||
elevator.run_status = ElevatorStatus.START_DOWN
|
||||
server_debug_log(f"电梯 E{elevator.id} 被设定为减速")
|
||||
if elevator.current_floor != floor or elevator.position.floor_up_position != 0:
|
||||
old_status = elevator.run_status.value
|
||||
self._update_elevator_status(elevator)
|
||||
server_debug_log(
|
||||
f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} "
|
||||
f"位置:{elevator.position.current_floor_float:.1f} 目标:{floor}"
|
||||
)
|
||||
|
||||
def elevator_go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> None:
|
||||
"""Command elevator to go to specified floor"""
|
||||
"""
|
||||
设置电梯去向,是生命周期开始,分配目的地
|
||||
"""
|
||||
if 0 <= elevator_id < len(self.elevators) and 0 <= floor < len(self.floors):
|
||||
elevator = self.elevators[elevator_id]
|
||||
if immediate:
|
||||
self._set_elevator_target_floor(elevator, floor)
|
||||
elevator.indicators.set_direction(elevator.target_floor_direction)
|
||||
else:
|
||||
elevator.next_target_floor = floor
|
||||
server_debug_log(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}")
|
||||
|
||||
def elevator_set_indicators(self, elevator_id: int, up: Optional[bool] = None, down: Optional[bool] = None) -> None:
|
||||
"""Set elevator direction indicators"""
|
||||
if 0 <= elevator_id < len(self.elevators):
|
||||
elevator = self.elevators[elevator_id]
|
||||
if up is not None:
|
||||
elevator.indicators.up = up
|
||||
if down is not None:
|
||||
elevator.indicators.down = down
|
||||
|
||||
def get_state(self) -> SimulationStateResponse:
|
||||
"""Get complete simulation state"""
|
||||
with self.lock:
|
||||
@@ -658,8 +649,8 @@ def step_simulation() -> Response | tuple[Response, int]:
|
||||
try:
|
||||
data: Dict[str, Any] = request.get_json() or {}
|
||||
ticks = data.get("ticks", 1)
|
||||
server_debug_log("")
|
||||
server_debug_log(f"HTTP /api/step request ----- ticks: {ticks}")
|
||||
# server_debug_log("")
|
||||
# server_debug_log(f"HTTP /api/step request ----- ticks: {ticks}")
|
||||
events = simulation.step(ticks)
|
||||
server_debug_log(f"HTTP /api/step response ----- tick: {simulation.tick}, events: {len(events)}\n")
|
||||
return json_response(
|
||||
@@ -693,18 +684,6 @@ def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]:
|
||||
return json_response({"error": str(e)}, 500)
|
||||
|
||||
|
||||
@app.route("/api/elevators/<int:elevator_id>/set_indicators", methods=["POST"])
|
||||
def elevator_set_indicators(elevator_id: int) -> Response | tuple[Response, int]:
|
||||
try:
|
||||
data: Dict[str, Any] = request.get_json() or {}
|
||||
up = data.get("up")
|
||||
down = data.get("down")
|
||||
simulation.elevator_set_indicators(elevator_id, up, down)
|
||||
return json_response({"success": True})
|
||||
except Exception as e:
|
||||
return json_response({"error": str(e)}, 500)
|
||||
|
||||
|
||||
@app.route("/api/traffic/next", methods=["POST"])
|
||||
def next_traffic_round() -> Response | tuple[Response, int]:
|
||||
"""切换到下一个流量文件"""
|
||||
|
||||
Reference in New Issue
Block a user