update example

This commit is contained in:
Xuwznln
2025-09-28 15:12:41 +08:00
parent 297001221d
commit 16d2fec0ac
3 changed files with 35 additions and 108 deletions

View File

@@ -221,25 +221,6 @@ class ElevatorAPIClient:
debug_log(f"Get traffic info failed: {e}")
return None
def force_complete_remaining_passengers(self) -> Optional[int]:
"""强制完成所有未完成的乘客,返回完成的乘客数量"""
try:
response_data = self._send_post_request("/api/force_complete", {})
if response_data.get("success"):
completed_count = response_data.get("completed_count", 0)
debug_log(f"Force completed {completed_count} passengers")
# 强制完成后清空缓存
self._cached_state = None
self._cached_tick = -1
self._tick_processed = False
return completed_count
else:
debug_log(f"Force complete failed: {response_data.get('error')}")
return None
except Exception as e:
debug_log(f"Force complete failed: {e}")
return None
def _send_post_request(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""发送POST请求"""
url = f"{self.base_url}{endpoint}"

View File

@@ -27,29 +27,23 @@ class ElevatorBusController(ElevatorController):
print("🚌 公交车式电梯算法初始化")
print(f" 管理 {len(elevators)} 部电梯")
print(f" 服务 {len(floors)} 层楼")
# 获取最大楼层数
self.max_floor = len(floors) - 1
# 初始化每个电梯的方向 - 开始都向上
for elevator in elevators:
self.elevator_directions[elevator.id] = "up"
# 简单的初始分布 - 均匀分散到不同楼层
for i, elevator in enumerate(elevators):
# 计算目标楼层 - 均匀分布在不同楼层
target_floor = (i * (len(floors) - 1)) // len(elevators)
# 立刻移动到目标位置并开始循环
elevator.go_to_floor(target_floor, immediate=True)
print(f" 🚌 电梯{elevator.id} -> {target_floor}楼 (开始公交循环)")
def on_event_execute_start(
self, tick: int, events: List[SimulationEvent], elevators: List[ProxyElevator], floors: List[ProxyFloor]
) -> None:
"""事件执行前的回调"""
print(f"Tick {tick}: 即将处理 {len(events)} 个事件", end="")
print(f"Tick {tick}: 即将处理 {len(events)} 个事件 {[e.type for e in events]}", end="")
for i in elevators:
print(f"电梯{i.id}[{i.target_floor_direction.value}] 位置{i.current_floor_float}/{i.target_floor}, ", end="")
print()
@@ -71,19 +65,13 @@ class ElevatorBusController(ElevatorController):
def on_elevator_idle(self, elevator: ProxyElevator) -> None:
"""
电梯空闲时的回调
让空闲的电梯继续执行公交车循环路线
让空闲的电梯继续执行公交车循环路线,每次移动一层楼
"""
print(f"⏸️ 电梯 {elevator.id} 空闲,继续公交循环")
def on_elevator_stopped(self, elevator: ProxyElevator, floor: ProxyFloor) -> None:
"""
电梯停靠时的回调
公交车模式下,在每一层都停下,然后继续下一站
"""
print(f"🛑 电梯 {elevator.id} 停靠在 {floor.floor}")
print(f"🛑 电梯 {elevator.id} 空闲 {elevator.current_floor} = {elevator.target_floor}")
# 设置指示器让乘客知道电梯的行进方向
current_direction = self.elevator_directions.get(elevator.id, "up")
if self.elevator_directions[elevator.id] == "down" and elevator.current_floor != 0:
elevator.go_to_floor(elevator.current_floor - 1, immediate=True)
current_direction = self.elevator_directions[elevator.id]
if current_direction == "up":
elevator.set_up_indicator(True)
elevator.set_down_indicator(False)
@@ -91,6 +79,27 @@ class ElevatorBusController(ElevatorController):
elevator.set_up_indicator(False)
elevator.set_down_indicator(True)
def on_elevator_stopped(self, elevator: ProxyElevator, floor: ProxyFloor) -> None:
"""
电梯停靠时的回调
公交车模式下,在每一层都停下,然后继续下一站
需要注意的是stopped会比idle先触发
"""
print(f"🛑 电梯 {elevator.id} 停靠在 {floor.floor}")
if self.elevator_directions[elevator.id] == "up" and elevator.current_floor == self.max_floor:
elevator.go_to_floor(elevator.current_floor - 1, immediate=True)
self.elevator_directions[elevator.id] = "down"
elif self.elevator_directions[elevator.id] == "down" and elevator.current_floor == 0:
elevator.go_to_floor(elevator.current_floor + 1, immediate=True)
self.elevator_directions[elevator.id] = "up"
elif self.elevator_directions[elevator.id] == "up":
elevator.go_to_floor(elevator.current_floor + 1, immediate=True)
# 这里故意少写下降的情况用于了解stopped会先于idle触发
# elif self.elevator_directions[elevator.id] == "down":
# elevator.go_to_floor(elevator.current_floor - 1, immediate=True)
# self.elevator_directions[elevator.id] = "down"
def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None:
"""
乘客上车时的回调

View File

@@ -127,7 +127,6 @@ class ElevatorSimulation:
traffic_queue: List[TrafficEntry]
next_passenger_id: int
max_duration_ticks: int
_force_completed: bool
def __init__(self, traffic_dir: str, _init_only: bool = False):
if _init_only:
@@ -137,7 +136,6 @@ class ElevatorSimulation:
self.current_traffic_index = 0
self.traffic_files: List[Path] = []
self.state: SimulationState = create_empty_simulation_state(2, 1, 1)
self._force_completed = False
self._load_traffic_files()
@property
@@ -195,7 +193,6 @@ class ElevatorSimulation:
)
self.reset()
self.max_duration_ticks = building_config["duration"]
self._force_completed = False # 重置强制完成标志
traffic_data: list[Dict[str, Any]] = file_data["traffic"]
traffic_data.sort(key=lambda t: cast(int, t["tick"]))
for entry in traffic_data:
@@ -291,19 +288,14 @@ class ElevatorSimulation:
new_events: List[SimulationEvent] = []
for i in range(num_ticks):
self.state.tick += 1
server_debug_log(f"Processing tick {self.tick} (step {i+1}/{num_ticks})")
# server_debug_log(f"Processing tick {self.tick} (step {i+1}/{num_ticks})") # currently one tick per step
tick_events = self._process_tick()
new_events.extend(tick_events)
server_debug_log(f"Tick {self.tick} completed - Generated {len(tick_events)} events")
# server_debug_log(f"Tick {self.tick} completed - Generated {len(tick_events)} events") # currently one tick per step
# 如果到达最大时长且尚未强制完成,强制完成剩余乘客
if (
hasattr(self, "max_duration_ticks")
and self.tick >= self.max_duration_ticks
and not self._force_completed
):
# 如果到达最大时长,强制完成剩余乘客
if self.tick >= self.max_duration_ticks:
completed_count = self.force_complete_remaining_passengers()
self._force_completed = True
if completed_count > 0:
server_debug_log(f"模拟结束,强制完成了 {completed_count} 个乘客")
@@ -601,55 +593,11 @@ class ElevatorSimulation:
with self.lock:
completed_count = 0
current_tick = self.tick
server_debug_log(f"强制完成未完成乘客当前tick: {current_tick}")
# 收集需要强制完成的乘客ID使用set提高查找效率
passengers_to_complete = set()
for passenger_id, passenger in self.state.passengers.items():
for passenger in self.state.passengers.values():
if passenger.dropoff_tick == 0:
passengers_to_complete.add(passenger_id)
server_debug_log(f"找到 {len(passengers_to_complete)} 个需要强制完成的乘客")
# 批量处理:先从电梯中移除所有需要完成的乘客
for elevator in self.elevators:
# 使用列表推导式创建新的乘客列表避免多次remove操作
original_count = len(elevator.passengers)
elevator.passengers = [pid for pid in elevator.passengers if pid not in passengers_to_complete]
removed_count = original_count - len(elevator.passengers)
# 清理乘客目的地映射
for passenger_id in passengers_to_complete:
elevator.passenger_destinations.pop(passenger_id, None)
if removed_count > 0:
server_debug_log(f"从电梯 {elevator.id} 移除了 {removed_count} 个强制完成的乘客")
# 批量处理:从楼层等待队列中移除乘客
for floor in self.floors:
# 优化队列清理
original_up = len(floor.up_queue)
original_down = len(floor.down_queue)
floor.up_queue = [pid for pid in floor.up_queue if pid not in passengers_to_complete]
floor.down_queue = [pid for pid in floor.down_queue if pid not in passengers_to_complete]
removed_up = original_up - len(floor.up_queue)
removed_down = original_down - len(floor.down_queue)
if removed_up > 0 or removed_down > 0:
server_debug_log(
f"从楼层 {floor.floor} 移除了 {removed_up}(上行) + {removed_down}(下行) 个等待乘客"
)
# 最后设置乘客完成状态
for passenger_id in passengers_to_complete:
passenger = self.state.passengers[passenger_id]
passenger.dropoff_tick = current_tick
completed_count += 1
server_debug_log(f"强制完成了 {completed_count} 个乘客")
passenger.dropoff_tick = current_tick
if passenger.pickup_tick == 0:
passenger.pickup_tick = current_tick
return completed_count
def reset(self) -> None:
@@ -661,7 +609,6 @@ class ElevatorSimulation:
self.traffic_queue: List[TrafficEntry] = []
self.max_duration_ticks = 0
self.next_passenger_id = 1
self._force_completed = False
# Global simulation instance for Flask routes
@@ -763,16 +710,6 @@ def get_traffic_info() -> Response | tuple[Response, int]:
return json_response({"error": str(e)}, 500)
@app.route("/api/force_complete", methods=["POST"])
def force_complete_passengers() -> Response | tuple[Response, int]:
"""强制完成所有未完成的乘客"""
try:
completed_count = simulation.force_complete_remaining_passengers()
return json_response({"success": True, "completed_count": completed_count})
except Exception as e:
return json_response({"error": str(e)}, 500)
def main() -> None:
global simulation