From 6a3b312e4168cb19059830ed4f47b70d16165028 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sun, 28 Sep 2025 20:23:11 +0800 Subject: [PATCH] fix downstairs --- elevator_saga/client/api_client.py | 26 +++- elevator_saga/client/base_controller.py | 93 ++++++----- elevator_saga/core/models.py | 52 ++----- .../scripts/client_examples/simple_example.py | 52 ++++--- elevator_saga/server/simulator.py | 145 ++++++++++-------- elevator_saga/traffic/inter_floor.json | 8 +- 6 files changed, 203 insertions(+), 173 deletions(-) diff --git a/elevator_saga/client/api_client.py b/elevator_saga/client/api_client.py index 461be1b..ac53388 100644 --- a/elevator_saga/client/api_client.py +++ b/elevator_saga/client/api_client.py @@ -72,7 +72,7 @@ class ElevatorAPIClient: p95_wait_time=metrics_data.get("p95_wait", 0), average_system_time=metrics_data.get("avg_system", 0), p95_system_time=metrics_data.get("p95_system", 0), - total_energy_consumption=metrics_data.get("energy_total", 0), + # total_energy_consumption=metrics_data.get("energy_total", 0), ) else: metrics = PerformanceMetrics() @@ -106,7 +106,20 @@ class ElevatorAPIClient: if "error" not in response_data: # 使用服务端返回的真实数据 events_data = response_data.get("events", []) - events = [SimulationEvent.from_dict(event) for event in events_data] + events = [] + for event_data in events_data: + # 手动转换type字段从字符串到EventType枚举 + event_dict = event_data.copy() + if "type" in event_dict and isinstance(event_dict["type"], str): + # 尝试将字符串转换为EventType枚举 + try: + from elevator_saga.core.models import EventType + + event_dict["type"] = EventType(event_dict["type"]) + except ValueError: + debug_log(f"Unknown event type: {event_dict['type']}") + continue + events.append(SimulationEvent.from_dict(event_dict)) step_response = StepResponse( success=True, @@ -122,7 +135,7 @@ class ElevatorAPIClient: def send_elevator_command(self, command: Union[GoToFloorCommand, SetIndicatorsCommand]) -> bool: """发送电梯命令""" endpoint = self._get_elevator_endpoint(command) - debug_log(f"Sending elevator command: {command.command_type} to elevator {command.elevator_id}") + debug_log(f"Sending elevator command: {command.command_type} to elevator {command.elevator_id} To:F{command.floor}") response_data = self._send_post_request(endpoint, command.parameters) @@ -191,10 +204,10 @@ class ElevatorAPIClient: debug_log(f"Reset failed: {e}") return False - def next_traffic_round(self) -> bool: + def next_traffic_round(self, full_reset = False) -> bool: """切换到下一个流量文件""" try: - response_data = self._send_post_request("/api/traffic/next", {}) + response_data = self._send_post_request("/api/traffic/next", {"full_reset": full_reset}) success = response_data.get("success", False) if success: # 清空缓存,因为流量文件已切换,状态会改变 @@ -211,7 +224,6 @@ class ElevatorAPIClient: """获取当前流量文件信息""" try: response_data = self._send_get_request("/api/traffic/info") - debug_log(str()) if "error" not in response_data: return response_data else: @@ -231,7 +243,7 @@ class ElevatorAPIClient: req = urllib.request.Request(url, data=request_body, headers={"Content-Type": "application/json"}) try: - with urllib.request.urlopen(req, timeout=60) as response: + with urllib.request.urlopen(req, timeout=600) as response: response_data = json.loads(response.read().decode("utf-8")) # debug_log(f"POST {url} -> {response.status}") return response_data diff --git a/elevator_saga/client/base_controller.py b/elevator_saga/client/base_controller.py index 1ae4c22..1f5e7b7 100644 --- a/elevator_saga/client/base_controller.py +++ b/elevator_saga/client/base_controller.py @@ -3,6 +3,8 @@ Elevator Controller Base Class 电梯调度基础控制器类 - 提供面向对象的算法开发接口 """ +import os +import time from abc import ABC, abstractmethod from pprint import pprint from typing import Any, Dict, List, Optional @@ -34,9 +36,9 @@ class ElevatorController(ABC): self.debug = debug self.elevators: List[Any] = [] self.floors: List[Any] = [] - self.current_tick = -2 + self.current_tick = 0 self.is_running = False - self.current_traffic_max_tick: Optional[int] = None + self.current_traffic_max_tick: int = 0 # 初始化API客户端 self.api_client = ElevatorAPIClient(server_url) @@ -91,7 +93,7 @@ class ElevatorController(ABC): print(f"停止 {self.__class__.__name__} 算法") @abstractmethod - def on_passenger_call(self, floor: ProxyFloor, direction: str): + def on_passenger_call(self, passenger: ProxyPassenger, floor: ProxyFloor, direction: str): """ 乘客呼叫时的回调 - 可选实现 @@ -125,7 +127,7 @@ class ElevatorController(ABC): @abstractmethod def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger): """ - 乘客上车时的回调 - 可选实现 + 乘客上梯时的回调 - 可选实现 Args: elevator: 电梯代理对象 @@ -173,7 +175,7 @@ class ElevatorController(ABC): """内部初始化方法""" self.elevators = elevators self.floors = floors - self.current_tick = -2 + self.current_tick = 0 # 调用用户的初始化方法 self.on_init(elevators, floors) @@ -213,25 +215,37 @@ class ElevatorController(ABC): def _run_event_driven_simulation(self): """运行事件驱动的模拟""" try: - # 获取初始状态并初始化 - state = self.api_client.get_state() + # 获取初始状态并初始化,默认从0开始 + try: + state = self.api_client.get_state() + except ConnectionResetError as ex: + print(f"模拟器可能并没有开启,请检查模拟器是否启动 {self.api_client.base_url}") + os._exit(1) + if state.tick > 0: + print("模拟器可能已经开始了一次模拟,执行重置...") + self.api_client.reset() + time.sleep(0.3) + return self._run_event_driven_simulation() self._update_wrappers(state, init=True) # 获取当前流量文件的最大tick数 self._update_traffic_info() - if self.current_tick >= self.current_traffic_max_tick: - return + if self.current_traffic_max_tick == 0: + print("模拟器接收到的最大tick时间为0,可能所有的测试案例已用完,请求重置...") + self.api_client.next_traffic_round(full_reset=True) + time.sleep(0.3) + return self._run_event_driven_simulation() + # if self.current_tick >= self.current_traffic_max_tick: + # return self._internal_init(self.elevators, self.floors) - - tick_count = 0 - + self.api_client.mark_tick_processed() while self.is_running: # 检查是否达到最大tick数 - if tick_count >= self.current_traffic_max_tick: + if self.current_tick >= self.current_traffic_max_tick: break - # 执行一个tick的模拟 + # 执行一个tick的模拟,从1开始 step_response = self.api_client.step(1) # 更新当前状态 self.current_tick = step_response.tick @@ -256,28 +270,21 @@ class ElevatorController(ABC): # 事件执行后回调 self.on_event_execute_end(self.current_tick, events, self.elevators, self.floors) - # 标记tick处理完成,使API客户端缓存失效 self.api_client.mark_tick_processed() - - tick_count += 1 - # 检查是否需要切换流量文件 if self.current_tick >= self.current_traffic_max_tick: pprint(state.metrics.to_dict()) if not self.api_client.next_traffic_round(): - # 如果没有更多流量文件,退出 break - # 重置并重新初始化 self._reset_and_reinit() - tick_count = 0 except Exception as e: print(f"模拟运行错误: {e}") raise - def _update_wrappers(self, state: SimulationState, init=False) -> None: + def _update_wrappers(self, state: SimulationState, init: bool = False) -> None: """更新电梯和楼层代理对象""" self.current_tick = state.tick # 检查电梯数量是否发生变化,只有变化时才重新创建 @@ -301,42 +308,46 @@ class ElevatorController(ABC): debug_log(f"Updated traffic info - max_tick: {self.current_traffic_max_tick}") else: debug_log("Failed to get traffic info") - self.current_traffic_max_tick = None + self.current_traffic_max_tick = 0 except Exception as e: debug_log(f"Error updating traffic info: {e}") - self.current_traffic_max_tick = None + self.current_traffic_max_tick = 0 def _handle_single_event(self, event: SimulationEvent): """处理单个事件""" - if event.type == EventType.UP_BUTTON_PRESSED.value: - floor_id = event.data.get("floor") + if event.type == EventType.UP_BUTTON_PRESSED: + floor_id = event.data["floor"] + passenger_id = event.data["passenger"] if floor_id is not None: floor_proxy = ProxyFloor(floor_id, self.api_client) - self.on_passenger_call(floor_proxy, "up") + passenger_proxy = ProxyPassenger(passenger_id, self.api_client) + self.on_passenger_call(passenger_proxy, floor_proxy, "up") - elif event.type == EventType.DOWN_BUTTON_PRESSED.value: - floor_id = event.data.get("floor") + elif event.type == EventType.DOWN_BUTTON_PRESSED: + floor_id = event.data["floor"] + passenger_id = event.data["passenger"] if floor_id is not None: floor_proxy = ProxyFloor(floor_id, self.api_client) - self.on_passenger_call(floor_proxy, "down") + passenger_proxy = ProxyPassenger(passenger_id, self.api_client) + self.on_passenger_call(passenger_proxy, floor_proxy, "down") - elif event.type == EventType.STOPPED_AT_FLOOR.value: + elif event.type == EventType.STOPPED_AT_FLOOR: elevator_id = event.data.get("elevator") - floor_id = event.data.get("floor") + floor_id = event.data["floor"] if elevator_id is not None and floor_id is not None: elevator_proxy = ProxyElevator(elevator_id, self.api_client) floor_proxy = ProxyFloor(floor_id, self.api_client) self.on_elevator_stopped(elevator_proxy, floor_proxy) - elif event.type == EventType.IDLE.value: + elif event.type == EventType.IDLE: elevator_id = event.data.get("elevator") if elevator_id is not None: elevator_proxy = ProxyElevator(elevator_id, self.api_client) self.on_elevator_idle(elevator_proxy) - elif event.type == EventType.PASSING_FLOOR.value: + elif event.type == EventType.PASSING_FLOOR: elevator_id = event.data.get("elevator") - floor_id = event.data.get("floor") + floor_id = event.data["floor"] direction = event.data.get("direction") if elevator_id is not None and floor_id is not None and direction is not None: elevator_proxy = ProxyElevator(elevator_id, self.api_client) @@ -345,9 +356,9 @@ class ElevatorController(ABC): direction_str = direction if isinstance(direction, str) else direction.value self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction_str) - elif event.type == EventType.ELEVATOR_APPROACHING.value: + elif event.type == EventType.ELEVATOR_APPROACHING: elevator_id = event.data.get("elevator") - floor_id = event.data.get("floor") + floor_id = event.data["floor"] direction = event.data.get("direction") if elevator_id is not None and floor_id is not None and direction is not None: elevator_proxy = ProxyElevator(elevator_id, self.api_client) @@ -356,7 +367,7 @@ class ElevatorController(ABC): direction_str = direction if isinstance(direction, str) else direction.value self.on_elevator_approaching(elevator_proxy, floor_proxy, direction_str) - elif event.type == EventType.PASSENGER_BOARD.value: + elif event.type == EventType.PASSENGER_BOARD: elevator_id = event.data.get("elevator") passenger_id = event.data.get("passenger") if elevator_id is not None and passenger_id is not None: @@ -364,10 +375,10 @@ class ElevatorController(ABC): passenger_proxy = ProxyPassenger(passenger_id, self.api_client) self.on_passenger_board(elevator_proxy, passenger_proxy) - elif event.type == EventType.PASSENGER_ALIGHT.value: + elif event.type == EventType.PASSENGER_ALIGHT: elevator_id = event.data.get("elevator") passenger_id = event.data.get("passenger") - floor_id = event.data.get("floor") + floor_id = event.data["floor"] if elevator_id is not None and passenger_id is not None and floor_id is not None: elevator_proxy = ProxyElevator(elevator_id, self.api_client) passenger_proxy = ProxyPassenger(passenger_id, self.api_client) @@ -379,7 +390,7 @@ class ElevatorController(ABC): try: # 重置服务器状态 self.api_client.reset() - + self.current_tick = 0 # 获取新的初始状态 state = self.api_client.get_state() self._update_wrappers(state) diff --git a/elevator_saga/core/models.py b/elevator_saga/core/models.py index 50ca7d5..f652adf 100644 --- a/elevator_saga/core/models.py +++ b/elevator_saga/core/models.py @@ -33,13 +33,15 @@ class PassengerStatus(Enum): class ElevatorStatus(Enum): # OK """ - 电梯stopped -1tick> (回调启动) start_up -1tick> constant_speed -?tick> start_down -1tick> 结束 + 电梯运行状态机:stopped -1tick> start_up -1tick> constant_speed -?tick> start_down -1tick> stopped + 注意:START_UP/START_DOWN表示加速/减速状态,不表示移动方向 + 实际移动方向由target_floor_direction属性决定 """ - START_UP = "start_up" # 即将启动,tick结束时会从stopped转换成start_up - START_DOWN = "start_down" # 即将到站,tick结束时会触发may passing floor - CONSTANT_SPEED = "constant_speed" - STOPPED = "stopped" + START_UP = "start_up" # 启动加速状态(不表示向上方向) + START_DOWN = "start_down" # 减速状态(不表示向下方向) + CONSTANT_SPEED = "constant_speed" # 匀速状态 + STOPPED = "stopped" # 停止状态 class EventType(Enum): @@ -49,7 +51,7 @@ class EventType(Enum): DOWN_BUTTON_PRESSED = "down_button_pressed" PASSING_FLOOR = "passing_floor" STOPPED_AT_FLOOR = "stopped_at_floor" - ELEVATOR_APPROACHING = "elevator_approaching" # 电梯即将到达(START_DOWN状态) + ELEVATOR_APPROACHING = "elevator_approaching" # 电梯即将经过某层楼(START_DOWN状态) IDLE = "idle" PASSENGER_BOARD = "passenger_board" PASSENGER_ALIGHT = "passenger_alight" @@ -117,7 +119,7 @@ class Position(SerializableModel): self.floor_up_position -= 10 # 处理向下楼层跨越 - while self.floor_up_position < 0: + while self.floor_up_position <= -10: self.current_floor -= 1 self.floor_up_position += 10 @@ -260,29 +262,10 @@ class ElevatorState(SerializableModel): """按下的楼层(基于当前乘客的目的地动态计算)""" return sorted(list(set(self.passenger_destinations.values()))) - def add_destination(self, floor: int, immediate: bool = False): - """添加目标楼层""" - self.next_target_floor = floor - def clear_destinations(self): """清空目标队列""" self.next_target_floor = None - def add_passenger(self, passenger_id: int, destination_floor: int): - """添加乘客""" - if not self.is_full: - self.passengers.append(passenger_id) - self.passenger_destinations[passenger_id] = destination_floor - return True - return False - - def remove_passenger(self, passenger_id: int, floor: int): - """移除乘客""" - if passenger_id in self.passengers: - self.passengers.remove(passenger_id) - # 从乘客目的地映射中移除 - self.passenger_destinations.pop(passenger_id, None) - @dataclass class FloorState(SerializableModel): @@ -346,10 +329,7 @@ class PerformanceMetrics(SerializableModel): p95_wait_time: float = 0.0 average_system_time: float = 0.0 p95_system_time: float = 0.0 - total_energy_consumption: float = 0.0 - total_movement_distance: float = 0.0 - total_stops: int = 0 - efficiency_score: float = 0.0 + # total_energy_consumption: float = 0.0 @property def completion_rate(self) -> float: @@ -358,12 +338,12 @@ class PerformanceMetrics(SerializableModel): return 0.0 return self.completed_passengers / self.total_passengers - @property - def energy_per_passenger(self) -> float: - """每位乘客能耗""" - if self.completed_passengers == 0: - return 0.0 - return self.total_energy_consumption / self.completed_passengers + # @property + # def energy_per_passenger(self) -> float: + # """每位乘客能耗""" + # if self.completed_passengers == 0: + # return 0.0 + # return self.total_energy_consumption / self.completed_passengers @dataclass diff --git a/elevator_saga/scripts/client_examples/simple_example.py b/elevator_saga/scripts/client_examples/simple_example.py index f36ea5e..89d8cc0 100644 --- a/elevator_saga/scripts/client_examples/simple_example.py +++ b/elevator_saga/scripts/client_examples/simple_example.py @@ -7,7 +7,7 @@ from typing import Dict, List from elevator_saga.client.base_controller import ElevatorController from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger -from elevator_saga.core.models import SimulationEvent +from elevator_saga.core.models import SimulationEvent, Direction class ElevatorBusController(ElevatorController): @@ -43,9 +43,9 @@ class ElevatorBusController(ElevatorController): self, tick: int, events: List[SimulationEvent], elevators: List[ProxyElevator], floors: List[ProxyFloor] ) -> None: """事件执行前的回调""" - print(f"Tick {tick}: 即将处理 {len(events)} 个事件 {[e.type for e in events]}", end="") + print(f"Tick {tick}: 即将处理 {len(events)} 个事件 {[e.type.value for e in events]}") for i in elevators: - print(f"电梯{i.id}[{i.target_floor_direction.value}] 位置{i.current_floor_float}/{i.target_floor}, ", end="") + print(f"\t{i.id}[{i.target_floor_direction.value},{i.current_floor_float}/{i.target_floor}]" + "👦" * len(i.passengers), end="") print() def on_event_execute_end( @@ -55,29 +55,31 @@ class ElevatorBusController(ElevatorController): # print(f"✅ Tick {tick}: 已处理 {len(events)} 个事件") pass - def on_passenger_call(self, floor: ProxyFloor, direction: str) -> None: + def on_passenger_call(self, passenger:ProxyPassenger, floor: ProxyFloor, direction: str) -> None: """ 乘客呼叫时的回调 公交车模式下,电梯已经在循环运行,无需特别响应呼叫 """ - print(f"📞 楼层 {floor.floor} 有乘客呼叫 ({direction}) - 公交车将按既定路线服务") + print(f"乘客 {passenger.id} F{floor.floor} 请求 {passenger.origin} -> {passenger.destination} ({direction})") def on_elevator_idle(self, elevator: ProxyElevator) -> None: """ 电梯空闲时的回调 让空闲的电梯继续执行公交车循环路线,每次移动一层楼 """ - print(f"🛑 电梯 {elevator.id} 空闲 {elevator.current_floor} = {elevator.target_floor}") + print(f"🛑 电梯 E{elevator.id} 在 F{elevator.current_floor} 层空闲") # 设置指示器让乘客知道电梯的行进方向 if self.elevator_directions[elevator.id] == "down" and elevator.current_floor != 0: elevator.go_to_floor(elevator.current_floor - 1, immediate=True) - current_direction = self.elevator_directions[elevator.id] - if current_direction == "up": - elevator.set_up_indicator(True) - elevator.set_down_indicator(False) - else: - elevator.set_up_indicator(False) - elevator.set_down_indicator(True) + # elevator.set_up_indicator(True) + elevator.go_to_floor(1) + # current_direction = self.elevator_directions[elevator.id] + # if current_direction == "up": + # elevator.set_up_indicator(True) + # elevator.set_down_indicator(False) + # else: + # elevator.set_up_indicator(False) + # elevator.set_down_indicator(True) def on_elevator_stopped(self, elevator: ProxyElevator, floor: ProxyFloor) -> None: """ @@ -85,7 +87,7 @@ class ElevatorBusController(ElevatorController): 公交车模式下,在每一层都停下,然后继续下一站 需要注意的是,stopped会比idle先触发 """ - print(f"🛑 电梯 {elevator.id} 停靠在 {floor.floor} 楼") + print(f"🛑 电梯 E{elevator.id} 停靠在 F{floor.floor}") if self.elevator_directions[elevator.id] == "up" and elevator.current_floor == self.max_floor: elevator.go_to_floor(elevator.current_floor - 1, immediate=True) self.elevator_directions[elevator.id] = "down" @@ -93,41 +95,47 @@ class ElevatorBusController(ElevatorController): elevator.go_to_floor(elevator.current_floor + 1, immediate=True) self.elevator_directions[elevator.id] = "up" elif self.elevator_directions[elevator.id] == "up": + if elevator.id == 0: + raise ValueError("这里故意要求0号电梯不可能触发非两端停止,通过on_elevator_approaching实现") elevator.go_to_floor(elevator.current_floor + 1, immediate=True) # 这里故意少写下降的情况,用于了解stopped会先于idle触发 # elif self.elevator_directions[elevator.id] == "down": # elevator.go_to_floor(elevator.current_floor - 1, immediate=True) # self.elevator_directions[elevator.id] = "down" - def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None: """ - 乘客上车时的回调 - 打印乘客上车信息 + 乘客上梯时的回调 + 打印乘客上梯信息 """ - print(f"⬆️ 乘客 {passenger.id} 上车 - 电梯 {elevator.id} - 楼层 {elevator.current_floor} - 目标楼层: {passenger.destination}") + print( + f" 乘客{passenger.id} E{elevator.id}⬆️ F{elevator.current_floor} -> F{passenger.destination}" + ) def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None: """ 乘客下车时的回调 打印乘客下车信息 """ - print(f"⬇️ 乘客 {passenger.id} 在 {floor.floor} 楼下车 - 电梯 {elevator.id}") + print(f" 乘客{passenger.id} E{elevator.id}⬇️ F{floor.floor}") def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None: """ 电梯经过楼层时的回调 打印经过楼层的信息 """ - print(f"🔄 电梯 {elevator.id} 经过 {floor.floor} 楼 (方向: {direction})") + print(f"🔄 电梯 E{elevator.id} 经过 F{floor.floor} (方向: {direction})") def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None: """ 电梯即将到达时的回调 (START_DOWN事件) 电梯开始减速,即将到达目标楼层 """ - print(f"🎯 电梯 {elevator.id} 即将到达 {floor.floor} 楼 (方向: {direction})") - + print(f"🎯 电梯 E{elevator.id} 即将到达 F{floor.floor} (方向: {direction})") + if elevator.target_floor == floor.floor and elevator.target_floor_direction == Direction.UP: # 电梯的目标楼层就是即将停靠的楼层 + if elevator.id == 0: # 这里为了测试,让0号电梯往上一层就新加一层,上行永远不会开门 + elevator.go_to_floor(elevator.target_floor + 1, immediate=True) + print(f" 不让0号电梯上行停站,设定新目标楼层 {elevator.target_floor + 1}") if __name__ == "__main__": algorithm = ElevatorBusController(debug=True) diff --git a/elevator_saga/server/simulator.py b/elevator_saga/server/simulator.py index 9ef6180..d3d8565 100644 --- a/elevator_saga/server/simulator.py +++ b/elevator_saga/server/simulator.py @@ -124,7 +124,7 @@ class SimulationStateResponse(SerializableModel): class ElevatorSimulation: - traffic_queue: List[TrafficEntry] + traffic_queue: List[TrafficEntry] # type: ignore next_passenger_id: int max_duration_ticks: int @@ -184,7 +184,7 @@ class ElevatorSimulation: traffic_file = self.traffic_files[self.current_traffic_index] server_debug_log(f"Loading traffic from {traffic_file.name}") try: - with open(traffic_file, "r", encoding="utf-8")as f: + with open(traffic_file, "r", encoding="utf-8") as f: file_data = json.load(f) building_config = file_data["building"] server_debug_log(f"Building config: {building_config}") @@ -208,7 +208,7 @@ class ElevatorSimulation: except Exception as e: server_debug_log(f"Error loading traffic file {traffic_file}: {e}") - def next_traffic_round(self) -> bool: + def next_traffic_round(self, full_reset: bool = False) -> bool: """切换到下一个流量文件,返回是否成功切换""" if not self.traffic_files: return False @@ -216,7 +216,10 @@ class ElevatorSimulation: # 检查是否还有下一个文件 next_index = self.current_traffic_index + 1 if next_index >= len(self.traffic_files): - return False # 没有更多流量文件,停止模拟 + if full_reset: + self.current_traffic_index = -1 + return self.next_traffic_round() + return False # 没有更多测试案例,停止模拟 self.current_traffic_index = next_index self.load_current_traffic() # 加载新的流量文件 @@ -229,7 +232,7 @@ class ElevatorSimulation: server_debug_log(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries") - self.traffic_queue = [] + self.traffic_queue: List[TrafficEntry] = [] # type: ignore[reportRedeclaration] for entry in traffic_data: # Create TrafficEntry from JSON data traffic_entry = TrafficEntry( @@ -245,39 +248,6 @@ class ElevatorSimulation: self.traffic_queue.sort(key=lambda p: p.tick) server_debug_log(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}") - def add_passenger( - self, - origin: int, - destination: int, - tick: Optional[int] = None, - ) -> int: - """Add a passenger to the simulation using unified data models""" - if tick is None: - tick = self.tick - - passenger = PassengerInfo( - id=self.next_passenger_id, - origin=origin, - destination=destination, - arrive_tick=tick, - ) - - self.passengers[passenger.id] = passenger - - # Add to floor queue - floor_state = self.state.get_floor_by_number(origin) - if floor_state: - if destination > origin: - floor_state.add_waiting_passenger(passenger.id, Direction.UP) - self._emit_event(EventType.UP_BUTTON_PRESSED, {"floor": origin}) - else: - floor_state.add_waiting_passenger(passenger.id, Direction.DOWN) - self._emit_event(EventType.DOWN_BUTTON_PRESSED, {"floor": origin}) - - self.next_passenger_id += 1 - server_debug_log(f"Added passenger {passenger.id}: {origin}→{destination} at tick {tick}") - return passenger.id - def _emit_event(self, event_type: EventType, data: Dict[str, Any]) -> None: """Emit an event to be sent to clients using unified data models""" self.state.add_event(event_type, data) @@ -286,9 +256,9 @@ class ElevatorSimulation: def step(self, num_ticks: int = 1) -> List[SimulationEvent]: with self.lock: new_events: List[SimulationEvent] = [] - for i in range(num_ticks): + for _ in range(num_ticks): self.state.tick += 1 - # server_debug_log(f"Processing tick {self.tick} (step {i+1}/{num_ticks})") # currently one tick per step + # server_debug_log(f"Processing tick {self.tick}") # currently one tick per step tick_events = self._process_tick() new_events.extend(tick_events) # server_debug_log(f"Tick {self.tick} completed - Generated {len(tick_events)} events") # currently one tick per step @@ -345,13 +315,26 @@ class ElevatorSimulation: target_pos = elevator.target_floor * 10 return abs(target_pos - current_pos) + def _calculate_distance_to_near_stop(self, elevator: ElevatorState) -> float: + """计算到最近楼层的距离(以floor_up_position为单位)""" + if elevator.position.floor_up_position < 0: + return 10 + elevator.position.floor_up_position + elif elevator.position.floor_up_position > 0: + return 10 - elevator.position.floor_up_position + else: + return 0 + def _should_start_deceleration(self, elevator: ElevatorState) -> bool: """判断是否应该开始减速 减速需要1个tick(移动1个位置单位),所以当距离目标<=3时开始减速 这样可以保证有一个完整的减速周期 """ distance = self._calculate_distance_to_target(elevator) - return distance <= 3 + return distance == 1 + + def _near_next_stop(self, elevator: ElevatorState) -> bool: + distance = self._calculate_distance_to_near_stop(elevator) + return distance == 1 def _get_movement_speed(self, elevator: ElevatorState) -> int: """根据电梯状态获取移动速度""" @@ -375,15 +358,27 @@ class ElevatorSimulation: return if elevator.run_status == ElevatorStatus.STOPPED: - # 从停止状态启动 + # 从停止状态启动 - 注意:START_UP表示启动加速状态,不表示方向 + # 实际移动方向由target_floor_direction决定 elevator.run_status = ElevatorStatus.START_UP elif elevator.run_status == ElevatorStatus.START_UP: # 从启动状态切换到匀速 elevator.run_status = ElevatorStatus.CONSTANT_SPEED elif elevator.run_status == ElevatorStatus.CONSTANT_SPEED: - # 检查是否需要开始减速 + # 检查是否需要开始减速,这里加速减速设置路程为1,匀速路程为2,这样能够保证不会匀速恰好到达,必须加减速 + # 如果速度超出,则预期的逻辑是,恰好到达/超出0等,会强制触发start_down,多走一次才能stop,目前没有实现这部分逻辑 if self._should_start_deceleration(elevator): elevator.run_status = ElevatorStatus.START_DOWN + # 发送电梯即将经过某层楼事件 + if self._near_next_stop(elevator): + self._emit_event( + EventType.ELEVATOR_APPROACHING, + { + "elevator": elevator.id, + "floor": elevator.target_floor, + "direction": elevator.target_floor_direction.value, + }, + ) # START_DOWN状态会在到达目标时自动切换为STOPPED def _move_elevators(self) -> None: @@ -403,9 +398,6 @@ class ElevatorSimulation: elevator.run_status = ElevatorStatus.STOPPED continue - # 更新电梯状态 - self._update_elevator_status(elevator) - # 获取移动速度 movement_speed = self._get_movement_speed(elevator) @@ -423,8 +415,11 @@ class ElevatorSimulation: # 向下移动 new_floor = elevator.position.floor_up_position_add(-movement_speed) + # 更新电梯状态 + old_status = elevator.run_status.value + self._update_elevator_status(elevator) server_debug_log( - f"电梯{elevator.id} 状态:{elevator.run_status.value} 速度:{movement_speed} " + f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} 速度:{movement_speed} " f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}" ) @@ -444,7 +439,7 @@ class ElevatorSimulation: if target_floor == new_floor and elevator.position.floor_up_position == 0: elevator.run_status = ElevatorStatus.STOPPED self._emit_event(EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor}) - + elevator.indicators.set_direction(elevator.target_floor_direction) # 抵达目标楼层,说明该取消floor了 # elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5 def _process_elevator_stops(self) -> None: @@ -474,7 +469,7 @@ class ElevatorSimulation: passengers_to_board: List[int] = [] if not elevator.indicators.up and not elevator.indicators.down: if elevator.next_target_floor is not None: - elevator.position.target_floor = elevator.next_target_floor + self._set_elevator_target_floor(elevator, elevator.next_target_floor) elevator.next_target_floor = None elevator.indicators.set_direction(elevator.target_floor_direction) @@ -508,14 +503,37 @@ class ElevatorSimulation: {"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id}, ) + def _set_elevator_target_floor(self, elevator: ElevatorState, floor: int): + original_target_floor = elevator.target_floor + elevator.position.target_floor = floor + server_debug_log(f"电梯 E{elevator.id} 被设定为立刻前往 F{floor}") + new_target_floor_should_accel = self._should_start_deceleration(elevator) + if not new_target_floor_should_accel: + if elevator.run_status == ElevatorStatus.START_DOWN: # 不应该加速但是加了 + elevator.run_status = ElevatorStatus.CONSTANT_SPEED + server_debug_log(f"电梯 E{elevator.id} 被设定为匀速") + elif new_target_floor_should_accel: + if elevator.run_status == ElevatorStatus.CONSTANT_SPEED: # 应该减速了,但是之前是匀速 + elevator.run_status = ElevatorStatus.START_DOWN + server_debug_log(f"电梯 E{elevator.id} 被设定为减速") + if elevator.current_floor != floor or elevator.position.floor_up_position != 0: + old_status = elevator.run_status.value + self._update_elevator_status(elevator) + server_debug_log( + f"电梯{elevator.id} 状态:{old_status}->{elevator.run_status.value} 方向:{elevator.target_floor_direction.value} " + f"位置:{elevator.position.current_floor_float:.1f} 目标:{floor}" + ) + def elevator_go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> None: """Command elevator to go to specified floor""" if 0 <= elevator_id < len(self.elevators) and 0 <= floor < len(self.floors): elevator = self.elevators[elevator_id] if immediate: - elevator.position.target_floor = floor + self._set_elevator_target_floor(elevator, floor) + elevator.indicators.set_direction(elevator.target_floor_direction) else: elevator.next_target_floor = floor + server_debug_log(f"电梯 E{elevator_id} 下一目的地设定为 F{floor}") def elevator_set_indicators(self, elevator_id: int, up: Optional[bool] = None, down: Optional[bool] = None) -> None: """Set elevator direction indicators""" @@ -590,15 +608,14 @@ class ElevatorSimulation: def force_complete_remaining_passengers(self) -> int: """强制完成所有未完成的乘客,返回完成的乘客数量""" - with self.lock: - completed_count = 0 - current_tick = self.tick - for passenger in self.state.passengers.values(): - if passenger.dropoff_tick == 0: - passenger.dropoff_tick = current_tick - if passenger.pickup_tick == 0: - passenger.pickup_tick = current_tick - return completed_count + completed_count = 0 + current_tick = self.tick + for passenger in self.state.passengers.values(): + if passenger.dropoff_tick == 0: + passenger.dropoff_tick = current_tick + if passenger.pickup_tick == 0: + passenger.pickup_tick = current_tick + return completed_count def reset(self) -> None: """Reset simulation to initial state""" @@ -641,13 +658,14 @@ def step_simulation() -> Response | tuple[Response, int]: try: data: Dict[str, Any] = request.get_json() or {} ticks = data.get("ticks", 1) - server_debug_log(f"HTTP /api/step request - ticks: {ticks}") + server_debug_log("") + server_debug_log(f"HTTP /api/step request ----- ticks: {ticks}") events = simulation.step(ticks) - server_debug_log(f"HTTP /api/step response - tick: {simulation.tick}, events: {len(events)}") + server_debug_log(f"HTTP /api/step response ----- tick: {simulation.tick}, events: {len(events)}\n") return json_response( { "tick": simulation.tick, - "events": [{"tick": e.tick, "type": e.type.value, "data": e.data} for e in events], + "events": events, } ) except Exception as e: @@ -691,7 +709,8 @@ def elevator_set_indicators(elevator_id: int) -> Response | tuple[Response, int] def next_traffic_round() -> Response | tuple[Response, int]: """切换到下一个流量文件""" try: - success = simulation.next_traffic_round() + full_reset = request.get_json()["full_reset"] + success = simulation.next_traffic_round(full_reset) if success: return json_response({"success": True}) else: diff --git a/elevator_saga/traffic/inter_floor.json b/elevator_saga/traffic/inter_floor.json index 614700b..9ea8fe9 100644 --- a/elevator_saga/traffic/inter_floor.json +++ b/elevator_saga/traffic/inter_floor.json @@ -7,14 +7,14 @@ "scale": "large", "description": "楼层间流量 - 适合小建筑 (large规模)", "expected_passengers": 76, - "duration": 300 + "duration": 100 }, "traffic": [ { "id": 1, - "origin": 3, - "destination": 8, - "tick": 7 + "origin": 0, + "destination": 1, + "tick": 1 }, { "id": 2,