fix downstairs

This commit is contained in:
Xuwznln
2025-09-28 20:23:11 +08:00
parent 16d2fec0ac
commit 6a3b312e41
6 changed files with 203 additions and 173 deletions

View File

@@ -124,7 +124,7 @@ class SimulationStateResponse(SerializableModel):
class ElevatorSimulation:
traffic_queue: List[TrafficEntry]
traffic_queue: List[TrafficEntry] # type: ignore
next_passenger_id: int
max_duration_ticks: int
@@ -184,7 +184,7 @@ class ElevatorSimulation:
traffic_file = self.traffic_files[self.current_traffic_index]
server_debug_log(f"Loading traffic from {traffic_file.name}")
try:
with open(traffic_file, "r", encoding="utf-8")as f:
with open(traffic_file, "r", encoding="utf-8") as f:
file_data = json.load(f)
building_config = file_data["building"]
server_debug_log(f"Building config: {building_config}")
@@ -208,7 +208,7 @@ class ElevatorSimulation:
except Exception as e:
server_debug_log(f"Error loading traffic file {traffic_file}: {e}")
def next_traffic_round(self) -> bool:
def next_traffic_round(self, full_reset: bool = False) -> bool:
"""切换到下一个流量文件,返回是否成功切换"""
if not self.traffic_files:
return False
@@ -216,7 +216,10 @@ class ElevatorSimulation:
# 检查是否还有下一个文件
next_index = self.current_traffic_index + 1
if next_index >= len(self.traffic_files):
return False # 没有更多流量文件,停止模拟
if full_reset:
self.current_traffic_index = -1
return self.next_traffic_round()
return False # 没有更多测试案例,停止模拟
self.current_traffic_index = next_index
self.load_current_traffic() # 加载新的流量文件
@@ -229,7 +232,7 @@ class ElevatorSimulation:
server_debug_log(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries")
self.traffic_queue = []
self.traffic_queue: List[TrafficEntry] = [] # type: ignore[reportRedeclaration]
for entry in traffic_data:
# Create TrafficEntry from JSON data
traffic_entry = TrafficEntry(
@@ -245,39 +248,6 @@ class ElevatorSimulation:
self.traffic_queue.sort(key=lambda p: p.tick)
server_debug_log(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}")
def add_passenger(
self,
origin: int,
destination: int,
tick: Optional[int] = None,
) -> int:
"""Add a passenger to the simulation using unified data models"""
if tick is None:
tick = self.tick
passenger = PassengerInfo(
id=self.next_passenger_id,
origin=origin,
destination=destination,
arrive_tick=tick,
)
self.passengers[passenger.id] = passenger
# Add to floor queue
floor_state = self.state.get_floor_by_number(origin)
if floor_state:
if destination > origin:
floor_state.add_waiting_passenger(passenger.id, Direction.UP)
self._emit_event(EventType.UP_BUTTON_PRESSED, {"floor": origin})
else:
floor_state.add_waiting_passenger(passenger.id, Direction.DOWN)
self._emit_event(EventType.DOWN_BUTTON_PRESSED, {"floor": origin})
self.next_passenger_id += 1
server_debug_log(f"Added passenger {passenger.id}: {origin}{destination} at tick {tick}")
return passenger.id
def _emit_event(self, event_type: EventType, data: Dict[str, Any]) -> None:
"""Emit an event to be sent to clients using unified data models"""
self.state.add_event(event_type, data)
@@ -286,9 +256,9 @@ class ElevatorSimulation:
def step(self, num_ticks: int = 1) -> List[SimulationEvent]:
with self.lock:
new_events: List[SimulationEvent] = []
for i in range(num_ticks):
for _ in range(num_ticks):
self.state.tick += 1
# server_debug_log(f"Processing tick {self.tick} (step {i+1}/{num_ticks})") # currently one tick per step
# server_debug_log(f"Processing tick {self.tick}") # currently one tick per step
tick_events = self._process_tick()
new_events.extend(tick_events)
# server_debug_log(f"Tick {self.tick} completed - Generated {len(tick_events)} events") # currently one tick per step
@@ -345,13 +315,26 @@ class ElevatorSimulation:
target_pos = elevator.target_floor * 10
return abs(target_pos - current_pos)
def _calculate_distance_to_near_stop(self, elevator: ElevatorState) -> float:
"""计算到最近楼层的距离以floor_up_position为单位"""
if elevator.position.floor_up_position < 0:
return 10 + elevator.position.floor_up_position
elif elevator.position.floor_up_position > 0:
return 10 - elevator.position.floor_up_position
else:
return 0
def _should_start_deceleration(self, elevator: ElevatorState) -> bool:
"""判断是否应该开始减速
减速需要1个tick移动1个位置单位所以当距离目标<=3时开始减速
这样可以保证有一个完整的减速周期
"""
distance = self._calculate_distance_to_target(elevator)
return distance <= 3
return distance == 1
def _near_next_stop(self, elevator: ElevatorState) -> bool:
distance = self._calculate_distance_to_near_stop(elevator)
return distance == 1
def _get_movement_speed(self, elevator: ElevatorState) -> int:
"""根据电梯状态获取移动速度"""
@@ -375,15 +358,27 @@ class ElevatorSimulation:
return
if elevator.run_status == ElevatorStatus.STOPPED:
# 从停止状态启动
# 从停止状态启动 - 注意START_UP表示启动加速状态不表示方向
# 实际移动方向由target_floor_direction决定
elevator.run_status = ElevatorStatus.START_UP
elif elevator.run_status == ElevatorStatus.START_UP:
# 从启动状态切换到匀速
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
elif elevator.run_status == ElevatorStatus.CONSTANT_SPEED:
# 检查是否需要开始减速
# 检查是否需要开始减速这里加速减速设置路程为1匀速路程为2这样能够保证不会匀速恰好到达必须加减速
# 如果速度超出,则预期的逻辑是,恰好到达/超出0等会强制触发start_down多走一次才能stop目前没有实现这部分逻辑
if self._should_start_deceleration(elevator):
elevator.run_status = ElevatorStatus.START_DOWN
# 发送电梯即将经过某层楼事件
if self._near_next_stop(elevator):
self._emit_event(
EventType.ELEVATOR_APPROACHING,
{
"elevator": elevator.id,
"floor": elevator.target_floor,
"direction": elevator.target_floor_direction.value,
},
)
# START_DOWN状态会在到达目标时自动切换为STOPPED
def _move_elevators(self) -> None:
@@ -403,9 +398,6 @@ class ElevatorSimulation:
elevator.run_status = ElevatorStatus.STOPPED
continue
# 更新电梯状态
self._update_elevator_status(elevator)
# 获取移动速度
movement_speed = self._get_movement_speed(elevator)
@@ -423,8 +415,11 @@ class ElevatorSimulation:
# 向下移动
new_floor = elevator.position.floor_up_position_add(-movement_speed)
# 更新电梯状态
old_status = elevator.run_status.value
self._update_elevator_status(elevator)
server_debug_log(
f"电梯{elevator.id} 状态:{elevator.run_status.value} 速度:{movement_speed} "
f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} 速度:{movement_speed} "
f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}"
)
@@ -444,7 +439,7 @@ class ElevatorSimulation:
if target_floor == new_floor and elevator.position.floor_up_position == 0:
elevator.run_status = ElevatorStatus.STOPPED
self._emit_event(EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor})
elevator.indicators.set_direction(elevator.target_floor_direction) # 抵达目标楼层说明该取消floor了
# elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5
def _process_elevator_stops(self) -> None:
@@ -474,7 +469,7 @@ class ElevatorSimulation:
passengers_to_board: List[int] = []
if not elevator.indicators.up and not elevator.indicators.down:
if elevator.next_target_floor is not None:
elevator.position.target_floor = elevator.next_target_floor
self._set_elevator_target_floor(elevator, elevator.next_target_floor)
elevator.next_target_floor = None
elevator.indicators.set_direction(elevator.target_floor_direction)
@@ -508,14 +503,37 @@ class ElevatorSimulation:
{"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id},
)
def _set_elevator_target_floor(self, elevator: ElevatorState, floor: int):
original_target_floor = elevator.target_floor
elevator.position.target_floor = floor
server_debug_log(f"电梯 E{elevator.id} 被设定为立刻前往 F{floor}")
new_target_floor_should_accel = self._should_start_deceleration(elevator)
if not new_target_floor_should_accel:
if elevator.run_status == ElevatorStatus.START_DOWN: # 不应该加速但是加了
elevator.run_status = ElevatorStatus.CONSTANT_SPEED
server_debug_log(f"电梯 E{elevator.id} 被设定为匀速")
elif new_target_floor_should_accel:
if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: # 应该减速了,但是之前是匀速
elevator.run_status = ElevatorStatus.START_DOWN
server_debug_log(f"电梯 E{elevator.id} 被设定为减速")
if elevator.current_floor != floor or elevator.position.floor_up_position != 0:
old_status = elevator.run_status.value
self._update_elevator_status(elevator)
server_debug_log(
f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} "
f"位置:{elevator.position.current_floor_float:.1f} 目标:{floor}"
)
def elevator_go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> None:
"""Command elevator to go to specified floor"""
if 0 <= elevator_id < len(self.elevators) and 0 <= floor < len(self.floors):
elevator = self.elevators[elevator_id]
if immediate:
elevator.position.target_floor = floor
self._set_elevator_target_floor(elevator, floor)
elevator.indicators.set_direction(elevator.target_floor_direction)
else:
elevator.next_target_floor = floor
server_debug_log(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}")
def elevator_set_indicators(self, elevator_id: int, up: Optional[bool] = None, down: Optional[bool] = None) -> None:
"""Set elevator direction indicators"""
@@ -590,15 +608,14 @@ class ElevatorSimulation:
def force_complete_remaining_passengers(self) -> int:
"""强制完成所有未完成的乘客,返回完成的乘客数量"""
with self.lock:
completed_count = 0
current_tick = self.tick
for passenger in self.state.passengers.values():
if passenger.dropoff_tick == 0:
passenger.dropoff_tick = current_tick
if passenger.pickup_tick == 0:
passenger.pickup_tick = current_tick
return completed_count
completed_count = 0
current_tick = self.tick
for passenger in self.state.passengers.values():
if passenger.dropoff_tick == 0:
passenger.dropoff_tick = current_tick
if passenger.pickup_tick == 0:
passenger.pickup_tick = current_tick
return completed_count
def reset(self) -> None:
"""Reset simulation to initial state"""
@@ -641,13 +658,14 @@ def step_simulation() -> Response | tuple[Response, int]:
try:
data: Dict[str, Any] = request.get_json() or {}
ticks = data.get("ticks", 1)
server_debug_log(f"HTTP /api/step request - ticks: {ticks}")
server_debug_log("")
server_debug_log(f"HTTP /api/step request ----- ticks: {ticks}")
events = simulation.step(ticks)
server_debug_log(f"HTTP /api/step response - tick: {simulation.tick}, events: {len(events)}")
server_debug_log(f"HTTP /api/step response ----- tick: {simulation.tick}, events: {len(events)}\n")
return json_response(
{
"tick": simulation.tick,
"events": [{"tick": e.tick, "type": e.type.value, "data": e.data} for e in events],
"events": events,
}
)
except Exception as e:
@@ -691,7 +709,8 @@ def elevator_set_indicators(elevator_id: int) -> Response | tuple[Response, int]
def next_traffic_round() -> Response | tuple[Response, int]:
"""切换到下一个流量文件"""
try:
success = simulation.next_traffic_round()
full_reset = request.get_json()["full_reset"]
success = simulation.next_traffic_round(full_reset)
if success:
return json_response({"success": True})
else: