From 48eb91d7895265d6855ce2f010fb7bd48a1168ff Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sun, 28 Sep 2025 13:16:41 +0800 Subject: [PATCH] init version --- .gitignore | 3 + MANIFEST.in | 12 + elevator_saga/__init__.py | 10 + elevator_saga/client/__init__.py | 3 + elevator_saga/client/api_client.py | 258 +++++ elevator_saga/client/base_controller.py | 395 +++++++ elevator_saga/client/proxy_models.py | 173 +++ elevator_saga/core/__init__.py | 3 + elevator_saga/core/models.py | 571 ++++++++++ elevator_saga/scripts/__init__.py | 0 .../scripts/client_examples/__init__.py | 4 + .../scripts/client_examples/simple_example.py | 125 +++ elevator_saga/server/__init__.py | 3 + elevator_saga/server/simulator.py | 805 ++++++++++++++ elevator_saga/traffic/__init__.py | 3 + elevator_saga/traffic/generators.py | 984 ++++++++++++++++++ elevator_saga/traffic/inter_floor.json | 470 +++++++++ elevator_saga/utils/__init__.py | 0 elevator_saga/utils/debug.py | 25 + pyrightconfig.json | 46 + setup.py | 60 ++ 21 files changed, 3953 insertions(+) create mode 100644 .gitignore create mode 100644 MANIFEST.in create mode 100644 elevator_saga/__init__.py create mode 100644 elevator_saga/client/__init__.py create mode 100644 elevator_saga/client/api_client.py create mode 100644 elevator_saga/client/base_controller.py create mode 100644 elevator_saga/client/proxy_models.py create mode 100644 elevator_saga/core/__init__.py create mode 100644 elevator_saga/core/models.py create mode 100644 elevator_saga/scripts/__init__.py create mode 100644 elevator_saga/scripts/client_examples/__init__.py create mode 100644 elevator_saga/scripts/client_examples/simple_example.py create mode 100644 elevator_saga/server/__init__.py create mode 100644 elevator_saga/server/simulator.py create mode 100644 elevator_saga/traffic/__init__.py create mode 100644 elevator_saga/traffic/generators.py create mode 100644 elevator_saga/traffic/inter_floor.json create mode 100644 elevator_saga/utils/__init__.py create mode 100644 elevator_saga/utils/debug.py create mode 100644 pyrightconfig.json create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bf6da6e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.vscode +.idea +elevator_saga.egg-info \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..3f44b2d --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,12 @@ +include README_CN.md +include setup.py +include MANIFEST.in +include elevator_saga/py.typed +recursive-include elevator_saga *.py +recursive-include elevator_saga *.json +recursive-include elevator_saga *.md +exclude test_package.py +exclude *.pyc +exclude elevator_saga/__pycache__/* +global-exclude __pycache__ +global-exclude *.py[co] diff --git a/elevator_saga/__init__.py b/elevator_saga/__init__.py new file mode 100644 index 0000000..7a454c9 --- /dev/null +++ b/elevator_saga/__init__.py @@ -0,0 +1,10 @@ +""" +Elevator Saga Python Package +基于事件驱动的电梯调度模拟系统 + +A Python implementation of the Elevator Saga game with event-driven architecture +realistic elevator dispatch algorithm development and testing. +""" + +__version__ = "1.0.0" +__author__ = "ZGCA Team" diff --git a/elevator_saga/client/__init__.py b/elevator_saga/client/__init__.py new file mode 100644 index 0000000..142d944 --- /dev/null +++ b/elevator_saga/client/__init__.py @@ -0,0 +1,3 @@ +""" +Elevator scheduling client and algorithms +""" diff --git a/elevator_saga/client/api_client.py b/elevator_saga/client/api_client.py new file mode 100644 index 0000000..a8d6652 --- /dev/null +++ b/elevator_saga/client/api_client.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +""" +Unified API Client for Elevator Saga +使用统一数据模型的客户端API封装 +""" +import json +import urllib.error +import urllib.request +from typing import Any, Dict, Optional, Union + +from elevator_saga.core.models import ( + ElevatorState, + FloorState, + GoToFloorCommand, + PassengerInfo, + PerformanceMetrics, + SetIndicatorsCommand, + SimulationEvent, + SimulationState, + StepResponse, +) +from elevator_saga.utils.debug import debug_log + + +class ElevatorAPIClient: + """统一的电梯API客户端""" + + def __init__(self, base_url: str): + self.base_url = base_url.rstrip("/") + # 缓存相关字段 + self._cached_state: Optional[SimulationState] = None + self._cached_tick: int = -1 + self._tick_processed: bool = False # 标记当前tick是否已处理完成 + debug_log(f"API Client initialized for {self.base_url}") + + def get_state(self, force_reload: bool = False) -> SimulationState: + """获取模拟状态 + + Args: + force_reload: 是否强制重新加载,忽略缓存 + """ + # 如果不强制重载且缓存有效(当前tick未处理完成),返回缓存 + if not force_reload and self._cached_state is not None and not self._tick_processed: + return self._cached_state + + # debug_log(f"Fetching new state (force_reload={force_reload}, tick_processed={self._tick_processed})") + response_data = self._send_get_request("/api/state") + if "error" not in response_data: + # 直接使用服务端返回的真实数据创建SimulationState + elevators = [ElevatorState.from_dict(e) for e in response_data.get("elevators", [])] + floors = [FloorState.from_dict(f) for f in response_data.get("floors", [])] + + # 使用服务端返回的passengers和metrics数据 + passengers_data = response_data.get("passengers", {}) + if isinstance(passengers_data, dict) and "completed" in passengers_data: + # 如果是PassengerSummary格式,则创建空的passengers字典 + passengers: Dict[int, PassengerInfo] = {} + else: + # 如果是真实的passengers数据,则转换 + passengers = { + int(k): PassengerInfo.from_dict(v) for k, v in passengers_data.items() if isinstance(v, dict) + } + + # 使用服务端返回的metrics数据 + metrics_data = response_data.get("metrics", {}) + if metrics_data: + # 转换为PerformanceMetrics格式 + metrics = PerformanceMetrics( + completed_passengers=metrics_data.get("done", 0), + total_passengers=metrics_data.get("total", 0), + average_wait_time=metrics_data.get("avg_wait", 0), + p95_wait_time=metrics_data.get("p95_wait", 0), + average_system_time=metrics_data.get("avg_system", 0), + p95_system_time=metrics_data.get("p95_system", 0), + total_energy_consumption=metrics_data.get("energy_total", 0), + ) + else: + metrics = PerformanceMetrics() + + simulation_state = SimulationState( + tick=response_data.get("tick", 0), + elevators=elevators, + floors=floors, + passengers=passengers, + metrics=metrics, + events=[], + ) + + # 更新缓存 + self._cached_state = simulation_state + self._cached_tick = simulation_state.tick + self._tick_processed = False # 重置处理标志,表示新tick开始 + + return simulation_state + else: + raise RuntimeError(f"Failed to get state: {response_data.get('error')}") + + def mark_tick_processed(self) -> None: + """标记当前tick处理完成,使缓存在下次get_state时失效""" + self._tick_processed = True + + def step(self, ticks: int = 1) -> StepResponse: + """执行步进""" + response_data = self._send_post_request("/api/step", {"ticks": ticks}) + + if "error" not in response_data: + # 使用服务端返回的真实数据 + events_data = response_data.get("events", []) + events = [SimulationEvent.from_dict(event) for event in events_data] + + step_response = StepResponse( + success=True, + tick=response_data.get("tick", 0), + events=events, + ) + + # debug_log(f"Step response: tick={step_response.tick}, events={len(events)}") + return step_response + else: + raise RuntimeError(f"Step failed: {response_data.get('error')}") + + def send_elevator_command(self, command: Union[GoToFloorCommand, SetIndicatorsCommand]) -> bool: + """发送电梯命令""" + endpoint = self._get_elevator_endpoint(command) + debug_log(f"Sending elevator command: {command.command_type} to elevator {command.elevator_id}") + + response_data = self._send_post_request(endpoint, command.parameters) + + if response_data.get("success"): + return response_data["success"] + else: + raise RuntimeError(f"Command failed: {response_data.get('error_message')}") + + def go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> bool: + """电梯前往指定楼层""" + command = GoToFloorCommand(elevator_id=elevator_id, floor=floor, immediate=immediate) + + try: + response = self.send_elevator_command(command) + return response + except Exception as e: + debug_log(f"Go to floor failed: {e}") + return False + + def set_indicators(self, elevator_id: int, up: Optional[bool] = None, down: Optional[bool] = None) -> bool: + """设置电梯指示灯""" + command = SetIndicatorsCommand(elevator_id=elevator_id, up=up, down=down) + + try: + response = self.send_elevator_command(command) + return response + except Exception as e: + debug_log(f"Set indicators failed: {e}") + return False + + def _get_elevator_endpoint(self, command: Union[GoToFloorCommand, SetIndicatorsCommand]) -> str: + """获取电梯命令端点""" + base = f"/api/elevators/{command.elevator_id}" + + if isinstance(command, GoToFloorCommand): + return f"{base}/go_to_floor" + else: # SetIndicatorsCommand + return f"{base}/set_indicators" + + def _send_get_request(self, endpoint: str) -> Dict[str, Any]: + """发送GET请求""" + url = f"{self.base_url}{endpoint}" + # debug_log(f"GET {url}") + + try: + with urllib.request.urlopen(url, timeout=60) as response: + data = json.loads(response.read().decode("utf-8")) + # debug_log(f"GET {url} -> {response.status}") + return data + except urllib.error.URLError as e: + raise RuntimeError(f"GET {url} failed: {e}") + + def reset(self) -> bool: + """重置模拟""" + try: + response_data = self._send_post_request("/api/reset", {}) + success = response_data.get("success", False) + if success: + # 清空缓存,因为状态已重置 + self._cached_state = None + self._cached_tick = -1 + self._tick_processed = False + debug_log("Cache cleared after reset") + return success + except Exception as e: + debug_log(f"Reset failed: {e}") + return False + + def next_traffic_round(self) -> bool: + """切换到下一个流量文件""" + try: + response_data = self._send_post_request("/api/traffic/next", {}) + success = response_data.get("success", False) + if success: + # 清空缓存,因为流量文件已切换,状态会改变 + self._cached_state = None + self._cached_tick = -1 + self._tick_processed = False + debug_log("Cache cleared after traffic round switch") + return success + except Exception as e: + debug_log(f"Next traffic round failed: {e}") + return False + + def get_traffic_info(self) -> Optional[Dict[str, Any]]: + """获取当前流量文件信息""" + try: + response_data = self._send_get_request("/api/traffic/info") + debug_log(str()) + if "error" not in response_data: + return response_data + else: + debug_log(f"Get traffic info failed: {response_data.get('error')}") + return None + except Exception as e: + debug_log(f"Get traffic info failed: {e}") + return None + + def force_complete_remaining_passengers(self) -> Optional[int]: + """强制完成所有未完成的乘客,返回完成的乘客数量""" + try: + response_data = self._send_post_request("/api/force_complete", {}) + if response_data.get("success"): + completed_count = response_data.get("completed_count", 0) + debug_log(f"Force completed {completed_count} passengers") + # 强制完成后清空缓存 + self._cached_state = None + self._cached_tick = -1 + self._tick_processed = False + return completed_count + else: + debug_log(f"Force complete failed: {response_data.get('error')}") + return None + except Exception as e: + debug_log(f"Force complete failed: {e}") + return None + + def _send_post_request(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]: + """发送POST请求""" + url = f"{self.base_url}{endpoint}" + request_body = json.dumps(data).encode("utf-8") + + # debug_log(f"POST {url} with data: {data}") + + req = urllib.request.Request(url, data=request_body, headers={"Content-Type": "application/json"}) + + try: + with urllib.request.urlopen(req, timeout=60) as response: + response_data = json.loads(response.read().decode("utf-8")) + # debug_log(f"POST {url} -> {response.status}") + return response_data + except urllib.error.URLError as e: + raise RuntimeError(f"POST {url} failed: {e}") diff --git a/elevator_saga/client/base_controller.py b/elevator_saga/client/base_controller.py new file mode 100644 index 0000000..242509f --- /dev/null +++ b/elevator_saga/client/base_controller.py @@ -0,0 +1,395 @@ +#!/usr/bin/env python3 +""" +Elevator Controller Base Class +电梯调度基础控制器类 - 提供面向对象的算法开发接口 +""" +from abc import ABC, abstractmethod +from pprint import pprint +from typing import Any, Dict, List, Optional + +from elevator_saga.client.api_client import ElevatorAPIClient +from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger +from elevator_saga.core.models import EventType, SimulationEvent, SimulationState + +# 避免循环导入,使用运行时导入 +from elevator_saga.utils.debug import debug_log + + +class ElevatorController(ABC): + """ + 电梯调度控制器基类 + + 用户通过继承此类并实现 abstract 方法来创建自己的调度算法 + """ + + def __init__(self, server_url: str = "http://127.0.0.1:8000", debug: bool = False): + """ + 初始化控制器 + + Args: + server_url: 服务器URL + debug: 是否启用debug模式 + """ + self.server_url = server_url + self.debug = debug + self.elevators: List[Any] = [] + self.floors: List[Any] = [] + self.current_tick = 0 + self.is_running = False + self.current_traffic_max_tick: Optional[int] = None + + # 初始化API客户端 + self.api_client = ElevatorAPIClient(server_url) + + @abstractmethod + def on_init(self, elevators: List[Any], floors: List[Any]): + """ + 算法初始化方法 - 必须由子类实现 + + Args: + elevators: 电梯列表 + floors: 楼层列表 + """ + pass + + @abstractmethod + def on_event_execute_start(self, tick: int, events: List[Any], elevators: List[Any], floors: List[Any]): + """ + 事件执行前的回调 - 必须由子类实现 + + Args: + tick: 当前时间tick + events: 即将执行的事件列表 + elevators: 电梯列表 + floors: 楼层列表 + """ + pass + + @abstractmethod + def on_event_execute_end(self, tick: int, events: List[Any], elevators: List[Any], floors: List[Any]): + """ + 事件执行后的回调 - 必须由子类实现 + + Args: + tick: 当前时间tick + events: 已执行的事件列表 + elevators: 电梯列表 + floors: 楼层列表 + """ + pass + + def on_start(self): + """ + 算法启动前的回调 - 可选实现 + """ + print(f"启动 {self.__class__.__name__} 算法") + + def on_stop(self): + """ + 算法停止后的回调 - 可选实现 + """ + print(f"停止 {self.__class__.__name__} 算法") + + @abstractmethod + def on_passenger_call(self, floor: ProxyFloor, direction: str): + """ + 乘客呼叫时的回调 - 可选实现 + + Args: + floor: 呼叫楼层代理对象 + direction: 方向 ("up" 或 "down") + """ + pass + + @abstractmethod + def on_elevator_idle(self, elevator: ProxyElevator): + """ + 电梯空闲时的回调 - 可选实现 + + Args: + elevator: 空闲的电梯代理对象 + """ + pass + + @abstractmethod + def on_elevator_stopped(self, elevator: ProxyElevator, floor: ProxyFloor): + """ + 电梯停靠时的回调 - 可选实现 + + Args: + elevator: 停靠的电梯代理对象 + floor: 停靠楼层代理对象 + """ + pass + + @abstractmethod + def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger): + """ + 乘客上车时的回调 - 可选实现 + + Args: + elevator: 电梯代理对象 + passenger: 乘客代理对象 + """ + pass + + @abstractmethod + def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor): + """ + 乘客下车时的回调 - 可选实现 + + Args: + elevator: 电梯代理对象 + passenger: 乘客代理对象 + floor: 下车楼层代理对象 + """ + pass + + @abstractmethod + def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str): + """ + 电梯经过楼层时的回调 - 可选实现 + + Args: + elevator: 电梯代理对象 + floor: 经过的楼层代理对象 + direction: 移动方向 + """ + pass + + @abstractmethod + def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str): + """ + 电梯即将到达时的回调 - 可选实现 + + Args: + elevator: 电梯代理对象 + floor: 即将到达的楼层代理对象 + direction: 移动方向 + """ + pass + + def _internal_init(self, elevators: List[Any], floors: List[Any]): + """内部初始化方法""" + self.elevators = elevators + self.floors = floors + self.current_tick = 0 + + # 调用用户的初始化方法 + self.on_init(elevators, floors) + + def start(self): + """ + 启动控制器 + """ + self.on_start() + self.is_running = True + + try: + self._run_event_driven_simulation() + except KeyboardInterrupt: + print("\n用户中断了算法运行") + except Exception as e: + print(f"算法运行出错: {e}") + raise + finally: + self.is_running = False + self.on_stop() + + def stop(self): + """停止控制器""" + self.is_running = False + print(f"停止 {self.__class__.__name__}") + + def on_simulation_complete(self, final_state: Dict[str, Any]): + """ + 模拟完成时的回调 - 可选实现 + + Args: + final_state: 最终状态数据 + """ + pass + + def _run_event_driven_simulation(self): + """运行事件驱动的模拟""" + try: + # 获取初始状态并初始化 + state = self.api_client.get_state() + self._update_wrappers(state, init=True) + + # 获取当前流量文件的最大tick数 + self._update_traffic_info() + if self.current_tick >= self.current_traffic_max_tick: + return + + self._internal_init(self.elevators, self.floors) + + tick_count = 0 + + while self.is_running: + # 检查是否达到最大tick数 + if tick_count >= self.current_traffic_max_tick: + break + + # 执行一个tick的模拟 + step_response = self.api_client.step(1) + # 更新当前状态 + self.current_tick = step_response.tick + # 获取事件列表 + events = step_response.events + + # 获取当前状态 + state = self.api_client.get_state() + self._update_wrappers(state) + + # 事件执行前回调 + self.on_event_execute_start(self.current_tick, events, self.elevators, self.floors) + + # 处理事件 + if events: + for event in events: + self._handle_single_event(event) + + # 获取更新后的状态 + state = self.api_client.get_state() + self._update_wrappers(state) + + # 事件执行后回调 + self.on_event_execute_end(self.current_tick, events, self.elevators, self.floors) + + # 标记tick处理完成,使API客户端缓存失效 + self.api_client.mark_tick_processed() + + tick_count += 1 + + # 检查是否需要切换流量文件 + if self.current_tick >= self.current_traffic_max_tick: + pprint(state.metrics.to_dict()) + if not self.api_client.next_traffic_round(): + # 如果没有更多流量文件,退出 + break + + # 重置并重新初始化 + self._reset_and_reinit() + tick_count = 0 + + except Exception as e: + print(f"模拟运行错误: {e}") + raise + + def _update_wrappers(self, state: SimulationState, init=False) -> None: + """更新电梯和楼层代理对象""" + self.current_tick = state.tick + # 检查电梯数量是否发生变化,只有变化时才重新创建 + if len(self.elevators) != len(state.elevators): + if not init: + raise ValueError(f"Elevator number mismatch: {len(self.elevators)} != {len(state.elevators)}") + self.elevators = [ProxyElevator(elevator_state.id, self.api_client) for elevator_state in state.elevators] + + # 检查楼层数量是否发生变化,只有变化时才重新创建 + if len(self.floors) != len(state.floors): + if not init: + raise ValueError(f"Floor number mismatch: {len(self.floors)} != {len(state.floors)}") + self.floors = [ProxyFloor(floor_state.floor, self.api_client) for floor_state in state.floors] + + def _update_traffic_info(self) -> None: + """更新当前流量文件信息""" + try: + traffic_info = self.api_client.get_traffic_info() + if traffic_info: + self.current_traffic_max_tick = traffic_info["max_tick"] + debug_log(f"Updated traffic info - max_tick: {self.current_traffic_max_tick}") + else: + debug_log("Failed to get traffic info") + self.current_traffic_max_tick = None + except Exception as e: + debug_log(f"Error updating traffic info: {e}") + self.current_traffic_max_tick = None + + def _handle_single_event(self, event: SimulationEvent): + """处理单个事件""" + if event.type == EventType.UP_BUTTON_PRESSED.value: + floor_id = event.data.get("floor") + if floor_id is not None: + floor_proxy = ProxyFloor(floor_id, self.api_client) + self.on_passenger_call(floor_proxy, "up") + + elif event.type == EventType.DOWN_BUTTON_PRESSED.value: + floor_id = event.data.get("floor") + if floor_id is not None: + floor_proxy = ProxyFloor(floor_id, self.api_client) + self.on_passenger_call(floor_proxy, "down") + + elif event.type == EventType.STOPPED_AT_FLOOR.value: + elevator_id = event.data.get("elevator") + floor_id = event.data.get("floor") + if elevator_id is not None and floor_id is not None: + elevator_proxy = ProxyElevator(elevator_id, self.api_client) + floor_proxy = ProxyFloor(floor_id, self.api_client) + self.on_elevator_stopped(elevator_proxy, floor_proxy) + + elif event.type == EventType.IDLE.value: + elevator_id = event.data.get("elevator") + if elevator_id is not None: + elevator_proxy = ProxyElevator(elevator_id, self.api_client) + self.on_elevator_idle(elevator_proxy) + + elif event.type == EventType.PASSING_FLOOR.value: + elevator_id = event.data.get("elevator") + floor_id = event.data.get("floor") + direction = event.data.get("direction") + if elevator_id is not None and floor_id is not None and direction is not None: + elevator_proxy = ProxyElevator(elevator_id, self.api_client) + floor_proxy = ProxyFloor(floor_id, self.api_client) + # 服务端发送的direction是字符串,直接使用 + direction_str = direction if isinstance(direction, str) else direction.value + self.on_elevator_passing_floor(elevator_proxy, floor_proxy, direction_str) + + elif event.type == EventType.ELEVATOR_APPROACHING.value: + elevator_id = event.data.get("elevator") + floor_id = event.data.get("floor") + direction = event.data.get("direction") + if elevator_id is not None and floor_id is not None and direction is not None: + elevator_proxy = ProxyElevator(elevator_id, self.api_client) + floor_proxy = ProxyFloor(floor_id, self.api_client) + # 服务端发送的direction是字符串,直接使用 + direction_str = direction if isinstance(direction, str) else direction.value + self.on_elevator_approaching(elevator_proxy, floor_proxy, direction_str) + + elif event.type == EventType.PASSENGER_BOARD.value: + elevator_id = event.data.get("elevator") + passenger_id = event.data.get("passenger") + if elevator_id is not None and passenger_id is not None: + elevator_proxy = ProxyElevator(elevator_id, self.api_client) + passenger_proxy = ProxyPassenger(passenger_id, self.api_client) + self.on_passenger_board(elevator_proxy, passenger_proxy) + + elif event.type == EventType.PASSENGER_ALIGHT.value: + elevator_id = event.data.get("elevator") + passenger_id = event.data.get("passenger") + floor_id = event.data.get("floor") + if elevator_id is not None and passenger_id is not None and floor_id is not None: + elevator_proxy = ProxyElevator(elevator_id, self.api_client) + passenger_proxy = ProxyPassenger(passenger_id, self.api_client) + floor_proxy = ProxyFloor(floor_id, self.api_client) + self.on_passenger_alight(elevator_proxy, passenger_proxy, floor_proxy) + + def _reset_and_reinit(self): + """重置并重新初始化""" + try: + # 重置服务器状态 + self.api_client.reset() + + # 获取新的初始状态 + state = self.api_client.get_state() + self._update_wrappers(state) + + # 更新流量信息(切换到新流量文件后需要重新获取最大tick) + self._update_traffic_info() + + # 重新初始化用户算法 + self._internal_init(self.elevators, self.floors) + + except Exception as e: + debug_log(f"重置失败: {e}") + raise diff --git a/elevator_saga/client/proxy_models.py b/elevator_saga/client/proxy_models.py new file mode 100644 index 0000000..2d054c6 --- /dev/null +++ b/elevator_saga/client/proxy_models.py @@ -0,0 +1,173 @@ +from typing import Any + +from elevator_saga.client.api_client import ElevatorAPIClient +from elevator_saga.core.models import ElevatorState, FloorState, PassengerInfo + + +class ProxyFloor(FloorState): + """ + 楼层动态代理类 + 直接使用 FloorState 数据模型实例,提供完整的类型安全访问 + """ + + init_ok = False + + def __init__(self, floor_id: int, api_client: ElevatorAPIClient): + self._floor_id = floor_id + self._api_client = api_client + self._cached_instance = None + self.init_ok = True + + def _get_floor_state(self) -> FloorState: + """获取 FloorState 实例""" + # 获取当前状态 + state = self._api_client.get_state() + floor_data = next((f for f in state.floors if f.floor == self._floor_id), None) + + if floor_data is None: + raise AttributeError(f"Floor {self._floor_id} not found") + + # 如果是字典,转换为 FloorState 实例 + if isinstance(floor_data, dict): + return FloorState.from_dict(floor_data) + else: + # 如果已经是 FloorState 实例,直接返回 + return floor_data + + def __getattr__(self, name: str) -> Any: + """动态获取楼层属性""" + floor_state = self._get_floor_state() + try: + if hasattr(floor_state, name): + attr = getattr(floor_state, name) + # 如果是 property 或方法,调用并返回结果 + if callable(attr): + return attr() + else: + return attr + except AttributeError: + + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + def __setattr__(self, name: str, value: Any) -> None: + """禁止修改属性,保持只读特性""" + if not self.init_ok: + object.__setattr__(self, name, value) + else: + raise AttributeError(f"Cannot modify read-only attribute '{name}'") + + def __repr__(self) -> str: + return f"ProxyFloor(floor={self._floor_id})" + + +class ProxyElevator(ElevatorState): + """ + 电梯动态代理类 + 直接使用 ElevatorState 数据模型实例,提供完整的类型安全访问和操作方法 + """ + + init_ok = False + + def __init__(self, elevator_id: int, api_client: ElevatorAPIClient): + self._elevator_id = elevator_id + self._api_client = api_client + self.init_ok = True + + def _get_elevator_state(self) -> ElevatorState: + """获取 ElevatorState 实例""" + # 获取当前状态 + state = self._api_client.get_state() + elevator_data = next((e for e in state.elevators if e.id == self._elevator_id), None) + + if elevator_data is None: + raise AttributeError(f"Elevator {self._elevator_id} not found") + + # 如果是字典,转换为 ElevatorState 实例 + if isinstance(elevator_data, dict): + return ElevatorState.from_dict(elevator_data) + else: + # 如果已经是 ElevatorState 实例,直接返回 + return elevator_data + + def __getattr__(self, name: str) -> Any: + """动态获取电梯属性""" + try: + elevator_state = self._get_elevator_state() + # 直接从 ElevatorState 实例获取属性 + return getattr(elevator_state, name) + + except AttributeError: + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + def go_to_floor(self, floor: int, immediate: bool = False) -> bool: + """前往指定楼层""" + return self._api_client.go_to_floor(self._elevator_id, floor, immediate) + + def set_up_indicator(self, value: bool) -> None: + """设置上行指示器""" + self._api_client.set_indicators(self._elevator_id, up=value) + + def set_down_indicator(self, value: bool) -> None: + """设置下行指示器""" + self._api_client.set_indicators(self._elevator_id, down=value) + + def __setattr__(self, name: str, value: Any) -> None: + """禁止修改属性,保持只读特性""" + if not self.init_ok: + object.__setattr__(self, name, value) + else: + raise AttributeError(f"Cannot modify read-only attribute '{name}'") + + def __repr__(self) -> str: + return f"ProxyElevator(id={self._elevator_id})" + + +class ProxyPassenger(PassengerInfo): + """ + 乘客动态代理类 + 直接使用 PassengerInfo 数据模型实例,提供完整的类型安全访问 + """ + + init_ok = False + + def __init__(self, passenger_id: int, api_client: ElevatorAPIClient): + self._passenger_id = passenger_id + self._api_client = api_client + self.init_ok = True + + def _get_passenger_info(self) -> PassengerInfo: + """获取 PassengerInfo 实例""" + # 获取当前状态 + state = self._api_client.get_state() + + # 在乘客字典中查找 + passenger_data = state.passengers.get(self._passenger_id) + if passenger_data is None: + raise AttributeError(f"Passenger {self._passenger_id} not found") + + # 如果是字典,转换为 PassengerInfo 实例 + if isinstance(passenger_data, dict): + return PassengerInfo.from_dict(passenger_data) + else: + # 如果已经是 PassengerInfo 实例,直接返回 + return passenger_data + + def __getattr__(self, name: str) -> Any: + """动态获取乘客属性""" + try: + passenger_info = self._get_passenger_info() + # 直接从 PassengerInfo 实例获取属性 + return getattr(passenger_info, name) + + except AttributeError: + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + def __setattr__(self, name: str, value: Any) -> None: + """禁止修改属性,保持只读特性""" + if not self.init_ok: + object.__setattr__(self, name, value) + else: + raise AttributeError(f"Cannot modify read-only attribute '{name}'") + + def __repr__(self) -> str: + return f"ProxyPassenger(id={self._passenger_id})" diff --git a/elevator_saga/core/__init__.py b/elevator_saga/core/__init__.py new file mode 100644 index 0000000..b823959 --- /dev/null +++ b/elevator_saga/core/__init__.py @@ -0,0 +1,3 @@ +""" +Core engine and event system for Elevator Saga +""" diff --git a/elevator_saga/core/models.py b/elevator_saga/core/models.py new file mode 100644 index 0000000..50ca7d5 --- /dev/null +++ b/elevator_saga/core/models.py @@ -0,0 +1,571 @@ +#!/usr/bin/env python3 +""" +Elevator Saga Data Models +统一的数据模型定义,用于客户端和服务器的类型一致性和序列化 +""" +import json +import uuid +from dataclasses import asdict, dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union + +# 类型变量 +T = TypeVar("T", bound="SerializableModel") + + +class Direction(Enum): + """电梯方向枚举""" + + UP = "up" + DOWN = "down" + STOPPED = "stopped" + + +class PassengerStatus(Enum): + """乘客状态枚举""" + + WAITING = "waiting" + IN_ELEVATOR = "in_elevator" + COMPLETED = "completed" + CANCELLED = "cancelled" + + +class ElevatorStatus(Enum): # OK + """ + 电梯stopped -1tick> (回调启动) start_up -1tick> constant_speed -?tick> start_down -1tick> 结束 + """ + + START_UP = "start_up" # 即将启动,tick结束时会从stopped转换成start_up + START_DOWN = "start_down" # 即将到站,tick结束时会触发may passing floor + CONSTANT_SPEED = "constant_speed" + STOPPED = "stopped" + + +class EventType(Enum): + """事件类型枚举""" + + UP_BUTTON_PRESSED = "up_button_pressed" + DOWN_BUTTON_PRESSED = "down_button_pressed" + PASSING_FLOOR = "passing_floor" + STOPPED_AT_FLOOR = "stopped_at_floor" + ELEVATOR_APPROACHING = "elevator_approaching" # 电梯即将到达(START_DOWN状态) + IDLE = "idle" + PASSENGER_BOARD = "passenger_board" + PASSENGER_ALIGHT = "passenger_alight" + + +class SerializableModel: + """可序列化模型基类""" + + def to_dict(self) -> Dict[str, Any]: + """转换为字典""" + return asdict(self) # type: ignore + + def to_json(self) -> str: + """转换为JSON字符串""" + return json.dumps(self.to_dict(), default=self._json_serializer) + + @classmethod + def from_dict(cls: Type[T], data: Dict[str, Any]) -> T: + """从字典创建实例""" + # 过滤掉init=False的字段 + import inspect + + sig = inspect.signature(cls.__init__) + valid_params = set(sig.parameters.keys()) - {"self"} + + filtered_data = {k: v for k, v in data.items() if k in valid_params} + return cls(**filtered_data) + + @classmethod + def from_json(cls: Type[T], json_str: str) -> T: + """从JSON字符串创建实例""" + data = json.loads(json_str) + return cls.from_dict(data) + + @staticmethod + def _json_serializer(obj: Any) -> Union[Any, str]: + """JSON序列化器,处理特殊类型""" + if isinstance(obj, Enum): + return obj.value + elif isinstance(obj, datetime): + return obj.isoformat() + elif hasattr(obj, "to_dict"): + return obj.to_dict() + return str(obj) + + +@dataclass +class Position(SerializableModel): + """位置信息""" + + current_floor: int = 0 + target_floor: int = 0 + floor_up_position: int = 0 + + @property + def current_floor_float(self): + return self.current_floor + self.floor_up_position / 10 + + def floor_up_position_add(self, num: int): + self.floor_up_position += num + + # 处理向上楼层跨越 + while self.floor_up_position >= 10: + self.current_floor += 1 + self.floor_up_position -= 10 + + # 处理向下楼层跨越 + while self.floor_up_position < 0: + self.current_floor -= 1 + self.floor_up_position += 10 + + return self.current_floor + + +@dataclass +class ElevatorIndicators(SerializableModel): + """电梯指示灯状态""" + + up: bool = False + down: bool = False + + def set_direction(self, direction: Direction): + """根据方向设置指示灯""" + if direction == Direction.UP: + self.up = True + self.down = False + elif direction == Direction.DOWN: + self.up = False + self.down = True + else: + self.up = False + self.down = False + + +@dataclass +class PassengerInfo(SerializableModel): + """乘客信息""" + + id: int + origin: int + destination: int + arrive_tick: int + pickup_tick: int = 0 + dropoff_tick: int = 0 + elevator_id: Optional[int] = None + + @property + def status(self) -> PassengerStatus: + """乘客状态""" + if self.dropoff_tick > 0: + return PassengerStatus.COMPLETED + elif self.pickup_tick > 0: + return PassengerStatus.IN_ELEVATOR + else: + return PassengerStatus.WAITING + + @property + def wait_time(self) -> int: + """等待时间""" + return self.pickup_tick - self.arrive_tick + + @property + def system_time(self) -> int: + """系统时间(总时间)""" + return self.dropoff_tick - self.arrive_tick + + @property + def travel_direction(self) -> Direction: + """移动方向""" + if self.destination > self.origin: + return Direction.UP + elif self.destination < self.origin: + return Direction.DOWN + else: + return Direction.STOPPED + + +@dataclass +class ElevatorState(SerializableModel): + """电梯状态""" + + id: int + position: Position + next_target_floor: Optional[int] = None + passengers: List[int] = field(default_factory=list) # type: ignore[reportUnknownVariableType] 乘客ID列表 + max_capacity: int = 10 + speed_pre_tick: float = 0.5 + run_status: ElevatorStatus = ElevatorStatus.STOPPED + indicators: ElevatorIndicators = field(default_factory=ElevatorIndicators) + passenger_destinations: Dict[int, int] = field(default_factory=dict) # type: ignore[reportUnknownVariableType] 乘客ID -> 目的地楼层映射 + energy_consumed: float = 0.0 + last_update_tick: int = 0 + + @property + def current_floor(self) -> int: + """当前楼层""" + if isinstance(self.position, dict): + self.position = Position.from_dict(self.position) + return self.position.current_floor + + @property + def current_floor_float(self) -> float: + """当前楼层""" + if isinstance(self.position, dict): + self.position = Position.from_dict(self.position) + return self.position.current_floor_float + + @property + def target_floor(self) -> int: + """当前楼层""" + if isinstance(self.position, dict): + self.position = Position.from_dict(self.position) + return self.position.target_floor + + @property + def load_factor(self) -> float: + """载重系数""" + return len(self.passengers) / self.max_capacity + + @property + def target_floor_direction(self) -> Direction: + """目标方向""" + next_floor = self.target_floor + if next_floor > self.current_floor: + return Direction.UP + elif next_floor < self.current_floor: + return Direction.DOWN + else: + return Direction.STOPPED + + @property + def is_idle(self) -> bool: + """是否空闲""" + return self.run_status == ElevatorStatus.STOPPED + + @property + def is_full(self) -> bool: + """是否满载""" + return len(self.passengers) >= self.max_capacity + + @property + def is_running(self) -> bool: + """是否正在运行""" + return self.run_status in [ElevatorStatus.START_UP, ElevatorStatus.START_DOWN, ElevatorStatus.CONSTANT_SPEED] + + @property + def pressed_floors(self) -> List[int]: + """按下的楼层(基于当前乘客的目的地动态计算)""" + return sorted(list(set(self.passenger_destinations.values()))) + + def add_destination(self, floor: int, immediate: bool = False): + """添加目标楼层""" + self.next_target_floor = floor + + def clear_destinations(self): + """清空目标队列""" + self.next_target_floor = None + + def add_passenger(self, passenger_id: int, destination_floor: int): + """添加乘客""" + if not self.is_full: + self.passengers.append(passenger_id) + self.passenger_destinations[passenger_id] = destination_floor + return True + return False + + def remove_passenger(self, passenger_id: int, floor: int): + """移除乘客""" + if passenger_id in self.passengers: + self.passengers.remove(passenger_id) + # 从乘客目的地映射中移除 + self.passenger_destinations.pop(passenger_id, None) + + +@dataclass +class FloorState(SerializableModel): + """楼层状态""" + + floor: int + up_queue: List[int] = field(default_factory=list) # type: ignore[reportUnknownVariableType] 等待上行的乘客ID + down_queue: List[int] = field(default_factory=list) # type: ignore[reportUnknownVariableType] 等待下行的乘客ID + + @property + def has_waiting_passengers(self) -> bool: + """是否有等待的乘客""" + return len(self.up_queue) > 0 or len(self.down_queue) > 0 + + @property + def total_waiting(self) -> int: + """总等待人数""" + return len(self.up_queue) + len(self.down_queue) + + def add_waiting_passenger(self, passenger_id: int, direction: Direction): + """添加等待乘客""" + if direction == Direction.UP: + if passenger_id not in self.up_queue: + self.up_queue.append(passenger_id) + elif direction == Direction.DOWN: + if passenger_id not in self.down_queue: + self.down_queue.append(passenger_id) + + def remove_waiting_passenger(self, passenger_id: int) -> bool: + """移除等待乘客""" + if passenger_id in self.up_queue: + self.up_queue.remove(passenger_id) + return True + if passenger_id in self.down_queue: + self.down_queue.remove(passenger_id) + return True + return False + + +@dataclass +class SimulationEvent(SerializableModel): + """模拟事件""" + + tick: int + type: EventType + data: Dict[str, Any] + timestamp: Optional[str] = None + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = datetime.now().isoformat() + + +@dataclass +class PerformanceMetrics(SerializableModel): + """性能指标""" + + completed_passengers: int = 0 + total_passengers: int = 0 + average_wait_time: float = 0.0 + p95_wait_time: float = 0.0 + average_system_time: float = 0.0 + p95_system_time: float = 0.0 + total_energy_consumption: float = 0.0 + total_movement_distance: float = 0.0 + total_stops: int = 0 + efficiency_score: float = 0.0 + + @property + def completion_rate(self) -> float: + """完成率""" + if self.total_passengers == 0: + return 0.0 + return self.completed_passengers / self.total_passengers + + @property + def energy_per_passenger(self) -> float: + """每位乘客能耗""" + if self.completed_passengers == 0: + return 0.0 + return self.total_energy_consumption / self.completed_passengers + + +@dataclass +class SimulationState(SerializableModel): + """模拟状态""" + + tick: int + elevators: List[ElevatorState] + floors: List[FloorState] + passengers: Dict[int, PassengerInfo] = field(default_factory=dict) # type: ignore[reportUnknownVariableType] + metrics: PerformanceMetrics = field(default_factory=PerformanceMetrics) + events: List[SimulationEvent] = field(default_factory=list) # type: ignore[reportUnknownVariableType] + + def get_elevator_by_id(self, elevator_id: int) -> Optional[ElevatorState]: + """根据ID获取电梯""" + for elevator in self.elevators: + if elevator.id == elevator_id: + return elevator + return None + + def get_floor_by_number(self, floor_number: int) -> Optional[FloorState]: + """根据楼层号获取楼层""" + for floor in self.floors: + if floor.floor == floor_number: + return floor + return None + + def get_passengers_by_status(self, status: PassengerStatus) -> List[PassengerInfo]: + """根据状态获取乘客""" + return [p for p in self.passengers.values() if p.status == status] + + def add_event(self, event_type: EventType, data: Dict[str, Any]): + """添加事件""" + event = SimulationEvent(tick=self.tick, type=event_type, data=data) + self.events.append(event) + + +# ==================== HTTP API 数据模型 ==================== + + +@dataclass +class APIRequest(SerializableModel): + """API请求基类""" + + request_id: str = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class APIResponse(SerializableModel): + """API响应基类""" + + success: bool + request_id: Optional[str] = None + error_message: Optional[str] = None + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class StepRequest(APIRequest): + """步进请求""" + + ticks: int = 1 + + +@dataclass +class StepResponse(SerializableModel): + """步进响应""" + + success: bool + tick: int + events: List[SimulationEvent] = field(default_factory=list) # type: ignore[reportUnknownVariableType] + request_id: Optional[str] = None + error_message: Optional[str] = None + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class StateRequest(APIRequest): + """状态请求""" + + include_passengers: bool = True + include_events: bool = False + since_tick: Optional[int] = None + + +@dataclass +class ElevatorCommand(SerializableModel): + """电梯命令""" + + elevator_id: int + command_type: str # "go_to_floor", "stop", "set_indicators" + parameters: Dict[str, Any] = field(default_factory=dict) # type: ignore[reportUnknownVariableType] + request_id: str = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class ElevatorCommandResponse(SerializableModel): + """电梯命令响应""" + + success: bool + elevator_id: int + + +@dataclass +class GoToFloorCommand(SerializableModel): + """前往楼层命令""" + + elevator_id: int + floor: int + immediate: bool = False + command_type: str = field(default="go_to_floor", init=False) + request_id: str = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + @property + def parameters(self) -> Dict[str, Any]: + return {"floor": self.floor, "immediate": self.immediate} + + +@dataclass +class SetIndicatorsCommand(SerializableModel): + """设置指示灯命令""" + + elevator_id: int + up: Optional[bool] = None + down: Optional[bool] = None + command_type: str = field(default="set_indicators", init=False) + request_id: str = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + @property + def parameters(self) -> Dict[str, Any]: + return {"up": self.up, "down": self.down} + + +# ==================== 流量和配置数据模型 ==================== + + +@dataclass +class TrafficEntry(SerializableModel): + """流量条目""" + + id: int + origin: int + destination: int + tick: int + + +@dataclass +class TrafficPattern(SerializableModel): + """流量模式""" + + name: str + description: str + entries: List[TrafficEntry] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + + def add_entry(self, entry: TrafficEntry): + """添加流量条目""" + self.entries.append(entry) + + def get_entries_for_tick(self, tick: int) -> List[TrafficEntry]: + """获取指定tick的流量条目""" + return [entry for entry in self.entries if entry.tick == tick] + + @property + def total_passengers(self) -> int: + """总乘客数""" + return len(self.entries) + + @property + def duration(self) -> int: + """流量持续时间""" + if not self.entries: + return 0 + return max(entry.tick for entry in self.entries) + + +# ==================== 便捷构造函数 ==================== + + +def create_empty_simulation_state(elevators: int, floors: int, max_capacity: int) -> SimulationState: + """创建空的模拟状态""" + elevator_states = [ElevatorState(id=i, position=Position(), max_capacity=max_capacity) for i in range(elevators)] + floor_states = [FloorState(floor=i) for i in range(floors)] + return SimulationState(tick=0, elevators=elevator_states, floors=floor_states) + + +def create_simple_traffic_pattern(name: str, passengers: List[Tuple[int, int, int]]) -> TrafficPattern: + """创建简单流量模式 + + Args: + name: 模式名称 + passengers: [(origin, destination, tick), ...] + """ + entries = [] + for i, (origin, destination, tick) in enumerate(passengers, 1): + entry = TrafficEntry(id=i, origin=origin, destination=destination, tick=tick) + entries.append(entry) + + return TrafficPattern( + name=name, description=f"Simple traffic pattern with {len(passengers)} passengers", entries=entries + ) diff --git a/elevator_saga/scripts/__init__.py b/elevator_saga/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/elevator_saga/scripts/client_examples/__init__.py b/elevator_saga/scripts/client_examples/__init__.py new file mode 100644 index 0000000..166bd38 --- /dev/null +++ b/elevator_saga/scripts/client_examples/__init__.py @@ -0,0 +1,4 @@ +""" +Example elevator algorithms using the base controller class +示例电梯调度算法,展示如何使用基础控制器类 +""" diff --git a/elevator_saga/scripts/client_examples/simple_example.py b/elevator_saga/scripts/client_examples/simple_example.py new file mode 100644 index 0000000..0dc55a3 --- /dev/null +++ b/elevator_saga/scripts/client_examples/simple_example.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +""" +公交车式电梯调度算法示例 +电梯像公交车一样运营,按固定路线循环停靠每一层 +""" +from typing import Dict, List + +from elevator_saga.client.base_controller import ElevatorController +from elevator_saga.client.proxy_models import ProxyElevator, ProxyFloor, ProxyPassenger +from elevator_saga.core.models import SimulationEvent + + +class ElevatorBusController(ElevatorController): + """ + 公交车式电梯调度算法 + 电梯像公交车一样按固定路线循环运行,在每层都停 + """ + + def __init__(self, server_url: str = "http://127.0.0.1:8000", debug: bool = False): + """初始化控制器""" + super().__init__(server_url, debug) + self.elevator_directions: Dict[int, str] = {} # 记录每个电梯的当前方向 + self.max_floor = 0 # 最大楼层数 + + def on_init(self, elevators: List[ProxyElevator], floors: List[ProxyFloor]) -> None: + """初始化公交车式电梯算法""" + print("🚌 公交车式电梯算法初始化") + print(f" 管理 {len(elevators)} 部电梯") + print(f" 服务 {len(floors)} 层楼") + + # 获取最大楼层数 + self.max_floor = len(floors) - 1 + + # 初始化每个电梯的方向 - 开始都向上 + for elevator in elevators: + self.elevator_directions[elevator.id] = "up" + + # 简单的初始分布 - 均匀分散到不同楼层 + for i, elevator in enumerate(elevators): + # 计算目标楼层 - 均匀分布在不同楼层 + target_floor = (i * (len(floors) - 1)) // len(elevators) + + # 立刻移动到目标位置并开始循环 + elevator.go_to_floor(target_floor, immediate=True) + + print(f" 🚌 电梯{elevator.id} -> {target_floor}楼 (开始公交循环)") + + def on_event_execute_start( + self, tick: int, events: List[SimulationEvent], elevators: List[ProxyElevator], floors: List[ProxyFloor] + ) -> None: + """事件执行前的回调""" + print(f"⏰ Tick {tick}: 即将处理 {len(events)} 个事件", end="") + for i in elevators: + print(f"电梯{i.id}[{i.target_floor_direction.value}] 位置{i.current_floor_float}/{i.target_floor}, ", end="") + print() + + def on_event_execute_end( + self, tick: int, events: List[SimulationEvent], elevators: List[ProxyElevator], floors: List[ProxyFloor] + ) -> None: + """事件执行后的回调""" + # print(f"✅ Tick {tick}: 已处理 {len(events)} 个事件") + pass + + def on_passenger_call(self, floor: ProxyFloor, direction: str) -> None: + """ + 乘客呼叫时的回调 + 公交车模式下,电梯已经在循环运行,无需特别响应呼叫 + """ + print(f"📞 楼层 {floor.floor} 有乘客呼叫 ({direction}) - 公交车将按既定路线服务") + + def on_elevator_idle(self, elevator: ProxyElevator) -> None: + """ + 电梯空闲时的回调 + 让空闲的电梯继续执行公交车循环路线 + """ + print(f"⏸️ 电梯 {elevator.id} 空闲,继续公交循环") + + def on_elevator_stopped(self, elevator: ProxyElevator, floor: ProxyFloor) -> None: + """ + 电梯停靠时的回调 + 公交车模式下,在每一层都停下,然后继续下一站 + """ + print(f"🛑 电梯 {elevator.id} 停靠在 {floor.floor} 楼") + + # 设置指示器让乘客知道电梯的行进方向 + current_direction = self.elevator_directions.get(elevator.id, "up") + if current_direction == "up": + elevator.set_up_indicator(True) + elevator.set_down_indicator(False) + else: + elevator.set_up_indicator(False) + elevator.set_down_indicator(True) + + def on_passenger_board(self, elevator: ProxyElevator, passenger: ProxyPassenger) -> None: + """ + 乘客上车时的回调 + 打印乘客上车信息 + """ + print(f"⬆️ 乘客 {passenger.id} 上车 - 电梯 {elevator.id} - 楼层 {elevator.current_floor} - 目标楼层: {passenger.destination}") + + def on_passenger_alight(self, elevator: ProxyElevator, passenger: ProxyPassenger, floor: ProxyFloor) -> None: + """ + 乘客下车时的回调 + 打印乘客下车信息 + """ + print(f"⬇️ 乘客 {passenger.id} 在 {floor.floor} 楼下车 - 电梯 {elevator.id}") + + def on_elevator_passing_floor(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None: + """ + 电梯经过楼层时的回调 + 打印经过楼层的信息 + """ + print(f"🔄 电梯 {elevator.id} 经过 {floor.floor} 楼 (方向: {direction})") + + def on_elevator_approaching(self, elevator: ProxyElevator, floor: ProxyFloor, direction: str) -> None: + """ + 电梯即将到达时的回调 (START_DOWN事件) + 电梯开始减速,即将到达目标楼层 + """ + print(f"🎯 电梯 {elevator.id} 即将到达 {floor.floor} 楼 (方向: {direction})") + + +if __name__ == "__main__": + algorithm = ElevatorBusController(debug=True) + algorithm.start() diff --git a/elevator_saga/server/__init__.py b/elevator_saga/server/__init__.py new file mode 100644 index 0000000..5cb74e0 --- /dev/null +++ b/elevator_saga/server/__init__.py @@ -0,0 +1,3 @@ +""" +Elevator simulation server components +""" diff --git a/elevator_saga/server/simulator.py b/elevator_saga/server/simulator.py new file mode 100644 index 0000000..2ac514a --- /dev/null +++ b/elevator_saga/server/simulator.py @@ -0,0 +1,805 @@ +#!/usr/bin/env python3 +""" +Elevator simulation server - tick-based discrete event simulation +Provides HTTP API for controlling elevators and advancing simulation time +""" +import argparse +import json +import os.path +import threading +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional, cast + +from flask import Flask, Response, request + +from elevator_saga.core.models import ( + Direction, + ElevatorState, + ElevatorStatus, + EventType, + FloorState, + PassengerInfo, + PassengerStatus, + SerializableModel, + SimulationEvent, + SimulationState, + TrafficEntry, + create_empty_simulation_state, +) + +# Global debug flag for server +_SERVER_DEBUG_MODE = False + + +def set_server_debug_mode(enabled: bool): + """Enable or disable server debug logging""" + global _SERVER_DEBUG_MODE + globals()["_SERVER_DEBUG_MODE"] = enabled + + +def server_debug_log(message: str): + """Print server debug message if debug mode is enabled""" + if _SERVER_DEBUG_MODE: + print(f"[SERVER-DEBUG] {message}", flush=True) + + +class CustomJSONEncoder(json.JSONEncoder): + """ + 自定义JSON编码器,处理Enum和其他特殊类型的序列化 + """ + + def default(self, o: Any) -> Any: + """ + 重写默认序列化方法,处理特殊类型 + + Args: + o: 要序列化的对象 + + Returns: + 序列化后的值 + """ + if isinstance(o, Enum): + return o.value + elif hasattr(o, "to_dict"): + # 如果对象有to_dict方法,使用它 + return o.to_dict() + else: + # 调用父类的默认处理 + return super().default(o) + + +def json_response(data: Any, status: int = 200) -> Response | tuple[Response, int]: + """ + 创建JSON响应,使用自定义编码器处理Enum等特殊类型 + + Args: + data: 要序列化的数据 + status: HTTP状态码 + + Returns: + Flask Response对象,或者Response和状态码的元组(当状态码不是200时) + """ + json_str = json.dumps(data, cls=CustomJSONEncoder, ensure_ascii=False) + response = Response(json_str, status=status, mimetype="application/json") + if status == 200: + return response + else: + return response, status + + +@dataclass +class MetricsResponse(SerializableModel): + """性能指标响应""" + + done: int + total: int + avg_wait: float + p95_wait: float + avg_system: float + p95_system: float + energy_total: float + + +@dataclass +class PassengerSummary(SerializableModel): + """乘客摘要""" + + completed: int + waiting: int + in_transit: int + total: int + + +@dataclass +class SimulationStateResponse(SerializableModel): + """模拟状态响应""" + + tick: int + elevators: List[ElevatorState] + floors: List[FloorState] + passengers: Dict[int, PassengerInfo] + metrics: MetricsResponse + + +class ElevatorSimulation: + traffic_queue: List[TrafficEntry] + next_passenger_id: int + max_duration_ticks: int + _force_completed: bool + + def __init__(self, traffic_dir: str, _init_only: bool = False): + if _init_only: + return + self.lock = threading.Lock() + self.traffic_dir = Path(traffic_dir) + self.current_traffic_index = 0 + self.traffic_files: List[Path] = [] + self.state: SimulationState = create_empty_simulation_state(2, 1, 1) + self._force_completed = False + self._load_traffic_files() + + @property + def tick(self) -> int: + """当前tick""" + return self.state.tick + + @property + def elevators(self) -> List[ElevatorState]: + """电梯列表""" + return self.state.elevators + + @property + def floors(self) -> List[FloorState]: + """楼层列表""" + return self.state.floors + + @property + def passengers(self) -> Dict[int, PassengerInfo]: + """乘客字典""" + return self.state.passengers + + def _load_traffic_files(self) -> None: + """扫描traffic目录,加载所有json文件列表""" + # 查找所有json文件 + for file_path in self.traffic_dir.glob("*.json"): + if file_path.is_file(): + self.traffic_files.append(file_path) + # 按文件名排序 + self.traffic_files.sort() + server_debug_log(f"Found {len(self.traffic_files)} traffic files: {[f.name for f in self.traffic_files]}") + # 如果有文件,加载第一个 + if self.traffic_files: + self.load_current_traffic() + + def load_current_traffic(self) -> None: + """加载当前索引对应的流量文件""" + if not self.traffic_files: + server_debug_log("No traffic files available") + return + + if self.current_traffic_index >= len(self.traffic_files): + server_debug_log(f"Traffic index {self.current_traffic_index} out of range") + return + + traffic_file = self.traffic_files[self.current_traffic_index] + server_debug_log(f"Loading traffic from {traffic_file.name}") + try: + with open(traffic_file, "r") as f: + file_data = json.load(f) + building_config = file_data["building"] + server_debug_log(f"Building config: {building_config}") + self.state = create_empty_simulation_state( + building_config["elevators"], building_config["floors"], building_config["elevator_capacity"] + ) + self.reset() + self.max_duration_ticks = building_config["duration"] + self._force_completed = False # 重置强制完成标志 + traffic_data: list[Dict[str, Any]] = file_data["traffic"] + traffic_data.sort(key=lambda t: cast(int, t["tick"])) + for entry in traffic_data: + traffic_entry = TrafficEntry( + id=self.next_passenger_id, + origin=entry["origin"], + destination=entry["destination"], + tick=entry["tick"], + ) + self.traffic_queue.append(traffic_entry) + self.next_passenger_id += 1 + + except Exception as e: + server_debug_log(f"Error loading traffic file {traffic_file}: {e}") + + def next_traffic_round(self) -> bool: + """切换到下一个流量文件,返回是否成功切换""" + if not self.traffic_files: + return False + + # 检查是否还有下一个文件 + next_index = self.current_traffic_index + 1 + if next_index >= len(self.traffic_files): + return False # 没有更多流量文件,停止模拟 + + self.current_traffic_index = next_index + self.load_current_traffic() # 加载新的流量文件 + return True + + def load_traffic(self, traffic_file: str) -> None: + """Load passenger traffic from JSON file using unified data models""" + with open(traffic_file, "r") as f: + traffic_data = json.load(f) + + server_debug_log(f"Loading traffic from {traffic_file}, {len(traffic_data)} entries") + + self.traffic_queue = [] + for entry in traffic_data: + # Create TrafficEntry from JSON data + traffic_entry = TrafficEntry( + id=entry.get("id", self.next_passenger_id), + origin=entry["origin"], + destination=entry["destination"], + tick=entry["tick"], + ) + self.traffic_queue.append(traffic_entry) + self.next_passenger_id = max(self.next_passenger_id, traffic_entry.id + 1) + + # Sort by arrival time + self.traffic_queue.sort(key=lambda p: p.tick) + server_debug_log(f"Traffic loaded and sorted, next passenger ID: {self.next_passenger_id}") + + def add_passenger( + self, + origin: int, + destination: int, + tick: Optional[int] = None, + ) -> int: + """Add a passenger to the simulation using unified data models""" + if tick is None: + tick = self.tick + + passenger = PassengerInfo( + id=self.next_passenger_id, + origin=origin, + destination=destination, + arrive_tick=tick, + ) + + self.passengers[passenger.id] = passenger + + # Add to floor queue + floor_state = self.state.get_floor_by_number(origin) + if floor_state: + if destination > origin: + floor_state.add_waiting_passenger(passenger.id, Direction.UP) + self._emit_event(EventType.UP_BUTTON_PRESSED, {"floor": origin}) + else: + floor_state.add_waiting_passenger(passenger.id, Direction.DOWN) + self._emit_event(EventType.DOWN_BUTTON_PRESSED, {"floor": origin}) + + self.next_passenger_id += 1 + server_debug_log(f"Added passenger {passenger.id}: {origin}→{destination} at tick {tick}") + return passenger.id + + def _emit_event(self, event_type: EventType, data: Dict[str, Any]) -> None: + """Emit an event to be sent to clients using unified data models""" + self.state.add_event(event_type, data) + server_debug_log(f"Event emitted: {event_type.value} with data {data}") + + def step(self, num_ticks: int = 1) -> List[SimulationEvent]: + with self.lock: + new_events: List[SimulationEvent] = [] + for i in range(num_ticks): + self.state.tick += 1 + server_debug_log(f"Processing tick {self.tick} (step {i+1}/{num_ticks})") + tick_events = self._process_tick() + new_events.extend(tick_events) + server_debug_log(f"Tick {self.tick} completed - Generated {len(tick_events)} events") + + # 如果到达最大时长且尚未强制完成,强制完成剩余乘客 + if ( + hasattr(self, "max_duration_ticks") + and self.tick >= self.max_duration_ticks + and not self._force_completed + ): + completed_count = self.force_complete_remaining_passengers() + self._force_completed = True + if completed_count > 0: + server_debug_log(f"模拟结束,强制完成了 {completed_count} 个乘客") + + server_debug_log(f"Step completed - Final tick: {self.tick}, Total events: {len(new_events)}") + return new_events + + def _process_tick(self) -> List[SimulationEvent]: + """Process one simulation tick""" + events_start = len(self.state.events) + # 1. Add new passengers from traffic queue + self._process_arrivals() + + # 2. Move elevators + self._move_elevators() + + # 3. Process elevator stops and passenger boarding/alighting + self._process_elevator_stops() + + # Return events generated this tick + return self.state.events[events_start:] + + def _process_arrivals(self) -> None: + """Process new passenger arrivals""" + while self.traffic_queue and self.traffic_queue[0].tick <= self.tick: + traffic_entry = self.traffic_queue.pop(0) + passenger = PassengerInfo( + id=traffic_entry.id, + origin=traffic_entry.origin, + destination=traffic_entry.destination, + arrive_tick=self.tick, + ) + assert ( + traffic_entry.origin != traffic_entry.destination + ), f"乘客{passenger.id}目的地和起始地{traffic_entry.origin}重复" + self.passengers[passenger.id] = passenger + server_debug_log(f"乘客 {passenger.id:4}: 创建 | {passenger}") + if passenger.destination > passenger.origin: + self.floors[passenger.origin].up_queue.append(passenger.id) + self._emit_event(EventType.UP_BUTTON_PRESSED, {"floor": passenger.origin, "passenger": passenger.id}) + else: + self.floors[passenger.origin].down_queue.append(passenger.id) + self._emit_event(EventType.DOWN_BUTTON_PRESSED, {"floor": passenger.origin, "passenger": passenger.id}) + + def _calculate_distance_to_target(self, elevator: ElevatorState) -> float: + """计算到目标楼层的距离(以floor_up_position为单位)""" + current_pos = elevator.position.current_floor * 10 + elevator.position.floor_up_position + target_pos = elevator.target_floor * 10 + return abs(target_pos - current_pos) + + def _should_start_deceleration(self, elevator: ElevatorState) -> bool: + """判断是否应该开始减速 + 减速需要1个tick(移动1个位置单位),所以当距离目标<=3时开始减速 + 这样可以保证有一个完整的减速周期 + """ + distance = self._calculate_distance_to_target(elevator) + return distance <= 3 + + def _get_movement_speed(self, elevator: ElevatorState) -> int: + """根据电梯状态获取移动速度""" + if elevator.run_status == ElevatorStatus.START_UP: + return 1 + elif elevator.run_status == ElevatorStatus.START_DOWN: + return 1 + elif elevator.run_status == ElevatorStatus.CONSTANT_SPEED: + return 2 + else: # STOPPED + return 0 + + def _update_elevator_status(self, elevator: ElevatorState) -> None: + """更新电梯运行状态""" + current_floor = elevator.position.current_floor + target_floor = elevator.target_floor + + if current_floor == target_floor and elevator.position.floor_up_position == 0: + # 已到达目标楼层 + elevator.run_status = ElevatorStatus.STOPPED + return + + if elevator.run_status == ElevatorStatus.STOPPED: + # 从停止状态启动 + elevator.run_status = ElevatorStatus.START_UP + elif elevator.run_status == ElevatorStatus.START_UP: + # 从启动状态切换到匀速 + elevator.run_status = ElevatorStatus.CONSTANT_SPEED + elif elevator.run_status == ElevatorStatus.CONSTANT_SPEED: + # 检查是否需要开始减速 + if self._should_start_deceleration(elevator): + elevator.run_status = ElevatorStatus.START_DOWN + # START_DOWN状态会在到达目标时自动切换为STOPPED + + def _move_elevators(self) -> None: + """Move all elevators towards their destinations with acceleration/deceleration""" + for elevator in self.elevators: + target_floor = elevator.target_floor + current_floor = elevator.position.current_floor + + # 如果已在恰好目标楼层,标记为STOPPED,之后交给_process_elevator_stops处理 + if target_floor == current_floor: + if elevator.next_target_floor is None: + continue + if elevator.position.floor_up_position == 0: + server_debug_log( + f"电梯{elevator.id}已在目标楼层,当前{elevator.position.current_floor_float} / 目标{target_floor}" + ) + elevator.run_status = ElevatorStatus.STOPPED + continue + + # 更新电梯状态 + self._update_elevator_status(elevator) + + # 获取移动速度 + movement_speed = self._get_movement_speed(elevator) + + if movement_speed == 0: + continue + + # Move towards target + old_floor = current_floor + + # 根据状态和方向调整移动速度 + if elevator.target_floor_direction == Direction.UP: + # 向上移动 + new_floor = elevator.position.floor_up_position_add(movement_speed) + else: + # 向下移动 + new_floor = elevator.position.floor_up_position_add(-movement_speed) + + server_debug_log( + f"电梯{elevator.id} 状态:{elevator.run_status.value} 速度:{movement_speed} " + f"位置:{elevator.position.current_floor_float:.1f} 目标:{target_floor}" + ) + + # 处理楼层变化事件 + if old_floor != new_floor: + if new_floor != target_floor: + self._emit_event( + EventType.PASSING_FLOOR, + { + "elevator": elevator.id, + "floor": new_floor, + "direction": elevator.target_floor_direction.value, + }, + ) + + # 检查是否到达目标楼层 + if target_floor == new_floor and elevator.position.floor_up_position == 0: + elevator.run_status = ElevatorStatus.STOPPED + self._emit_event(EventType.STOPPED_AT_FLOOR, {"elevator": elevator.id, "floor": new_floor}) + + # elevator.energy_consumed += abs(direction * elevator.speed_pre_tick) * 0.5 + + def _process_elevator_stops(self) -> None: + """Handle passenger boarding and alighting at elevator stops""" + for elevator in self.elevators: + if not elevator.run_status == ElevatorStatus.STOPPED: + continue + current_floor = elevator.current_floor + + # Let passengers alight + passengers_to_remove: List[int] = [] + for passenger_id in elevator.passengers: + passenger = self.passengers[passenger_id] + if passenger.destination == current_floor: + passenger.dropoff_tick = self.tick + passengers_to_remove.append(passenger_id) + + # Remove passengers who alighted + for passenger_id in passengers_to_remove: + elevator.passengers.remove(passenger_id) + self._emit_event( + EventType.PASSENGER_ALIGHT, + {"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id}, + ) + # Board waiting passengers (if indicators allow) + floor = self.floors[current_floor] + passengers_to_board: List[int] = [] + if not elevator.indicators.up and not elevator.indicators.down: + if elevator.next_target_floor is not None: + elevator.position.target_floor = elevator.next_target_floor + elevator.next_target_floor = None + elevator.indicators.set_direction(elevator.target_floor_direction) + + # Board passengers going up (if up indicator is on or no direction set) + if elevator.indicators.up: + available_capacity = elevator.max_capacity - len(elevator.passengers) + passengers_to_board.extend(floor.up_queue[:available_capacity]) + floor.up_queue = floor.up_queue[available_capacity:] + + # Board passengers going down (if down indicator is on or no direction set) + if elevator.indicators.down: + # 先临时计算长度 + remaining_capacity = elevator.max_capacity - len(elevator.passengers) - len(passengers_to_board) + if remaining_capacity > 0: + down_passengers = floor.down_queue[:remaining_capacity] + passengers_to_board.extend(down_passengers) + floor.down_queue = floor.down_queue[remaining_capacity:] + + # 没有上下指示的时候,触发等待,会消耗一个tick + if not elevator.indicators.up and not elevator.indicators.down: + self._emit_event(EventType.IDLE, {"elevator": elevator.id, "floor": current_floor}) + continue + # Process boarding + for passenger_id in passengers_to_board: + passenger = self.passengers[passenger_id] + passenger.pickup_tick = self.tick + passenger.elevator_id = elevator.id + elevator.passengers.append(passenger_id) + self._emit_event( + EventType.PASSENGER_BOARD, + {"elevator": elevator.id, "floor": current_floor, "passenger": passenger_id}, + ) + + def elevator_go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> None: + """Command elevator to go to specified floor""" + if 0 <= elevator_id < len(self.elevators) and 0 <= floor < len(self.floors): + elevator = self.elevators[elevator_id] + if immediate: + elevator.position.target_floor = floor + else: + elevator.next_target_floor = floor + + def elevator_set_indicators(self, elevator_id: int, up: Optional[bool] = None, down: Optional[bool] = None) -> None: + """Set elevator direction indicators""" + if 0 <= elevator_id < len(self.elevators): + elevator = self.elevators[elevator_id] + if up is not None: + elevator.indicators.up = up + if down is not None: + elevator.indicators.down = down + + def get_state(self) -> SimulationStateResponse: + """Get complete simulation state""" + with self.lock: + # Calculate metrics + metrics = self._calculate_metrics() + + return SimulationStateResponse( + tick=self.tick, + elevators=self.elevators, + floors=self.floors, + passengers=self.passengers, + metrics=metrics, + ) + + def _calculate_metrics(self) -> MetricsResponse: + """Calculate performance metrics""" + # 直接从state中筛选已完成的乘客 + completed = [p for p in self.state.passengers.values() if p.status == PassengerStatus.COMPLETED] + + total_passengers = len(self.state.passengers) + if not completed: + return MetricsResponse( + done=0, + total=total_passengers, + avg_wait=0, + p95_wait=0, + avg_system=0, + p95_system=0, + energy_total=sum(e.energy_consumed for e in self.elevators), + ) + + wait_times = [float(p.wait_time) for p in completed] + system_times = [float(p.system_time) for p in completed] + + def percentile(data: List[float], p: int) -> float: + if not data: + return 0.0 + sorted_data = sorted(data) + index = int(len(sorted_data) * p / 100) + return sorted_data[min(index, len(sorted_data) - 1)] + + return MetricsResponse( + done=len(completed), + total=total_passengers, + avg_wait=sum(wait_times) / len(wait_times) if wait_times else 0, + p95_wait=percentile(wait_times, 95), + avg_system=sum(system_times) / len(system_times) if system_times else 0, + p95_system=percentile(system_times, 95), + energy_total=sum(e.energy_consumed for e in self.elevators), + ) + + def get_events(self, since_tick: int = 0) -> List[SimulationEvent]: + """Get events since specified tick""" + return [e for e in self.state.events if e.tick > since_tick] + + def get_traffic_info(self) -> Dict[str, Any]: + return { + "current_index": self.current_traffic_index, + "total_files": len(self.traffic_files), + "max_tick": self.max_duration_ticks, + } + + def force_complete_remaining_passengers(self) -> int: + """强制完成所有未完成的乘客,返回完成的乘客数量""" + with self.lock: + completed_count = 0 + current_tick = self.tick + + server_debug_log(f"强制完成未完成乘客,当前tick: {current_tick}") + + # 收集需要强制完成的乘客ID(使用set提高查找效率) + passengers_to_complete = set() + for passenger_id, passenger in self.state.passengers.items(): + if passenger.dropoff_tick == 0: + passengers_to_complete.add(passenger_id) + + server_debug_log(f"找到 {len(passengers_to_complete)} 个需要强制完成的乘客") + + # 批量处理:先从电梯中移除所有需要完成的乘客 + for elevator in self.elevators: + # 使用列表推导式创建新的乘客列表,避免多次remove操作 + original_count = len(elevator.passengers) + elevator.passengers = [pid for pid in elevator.passengers if pid not in passengers_to_complete] + removed_count = original_count - len(elevator.passengers) + + # 清理乘客目的地映射 + for passenger_id in passengers_to_complete: + elevator.passenger_destinations.pop(passenger_id, None) + + if removed_count > 0: + server_debug_log(f"从电梯 {elevator.id} 移除了 {removed_count} 个强制完成的乘客") + + # 批量处理:从楼层等待队列中移除乘客 + for floor in self.floors: + # 优化队列清理 + original_up = len(floor.up_queue) + original_down = len(floor.down_queue) + + floor.up_queue = [pid for pid in floor.up_queue if pid not in passengers_to_complete] + floor.down_queue = [pid for pid in floor.down_queue if pid not in passengers_to_complete] + + removed_up = original_up - len(floor.up_queue) + removed_down = original_down - len(floor.down_queue) + + if removed_up > 0 or removed_down > 0: + server_debug_log( + f"从楼层 {floor.floor} 移除了 {removed_up}(上行) + {removed_down}(下行) 个等待乘客" + ) + + # 最后设置乘客完成状态 + for passenger_id in passengers_to_complete: + passenger = self.state.passengers[passenger_id] + passenger.dropoff_tick = current_tick + completed_count += 1 + + server_debug_log(f"强制完成了 {completed_count} 个乘客") + return completed_count + + def reset(self) -> None: + """Reset simulation to initial state""" + with self.lock: + self.state = create_empty_simulation_state( + len(self.elevators), len(self.floors), self.elevators[0].max_capacity + ) + self.traffic_queue: List[TrafficEntry] = [] + self.max_duration_ticks = 0 + self.next_passenger_id = 1 + self._force_completed = False + + +# Global simulation instance for Flask routes +simulation: ElevatorSimulation = ElevatorSimulation("", _init_only=True) + +# Create Flask app +app = Flask(__name__) + + +# Configure CORS +@app.after_request +def after_request(response: Response) -> Response: + response.headers.add("Access-Control-Allow-Origin", "*") + response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization") + response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS") + return response + + +@app.route("/api/state", methods=["GET"]) +def get_state() -> Response | tuple[Response, int]: + try: + state = simulation.get_state() + return json_response(state) + except Exception as e: + return json_response({"error": str(e)}, 500) + + +@app.route("/api/step", methods=["POST"]) +def step_simulation() -> Response | tuple[Response, int]: + try: + data: Dict[str, Any] = request.get_json() or {} + ticks = data.get("ticks", 1) + server_debug_log(f"HTTP /api/step request - ticks: {ticks}") + events = simulation.step(ticks) + server_debug_log(f"HTTP /api/step response - tick: {simulation.tick}, events: {len(events)}") + return json_response( + { + "tick": simulation.tick, + "events": [{"tick": e.tick, "type": e.type.value, "data": e.data} for e in events], + } + ) + except Exception as e: + return json_response({"error": str(e)}, 500) + + +@app.route("/api/reset", methods=["POST"]) +def reset_simulation() -> Response | tuple[Response, int]: + try: + simulation.reset() + return json_response({"success": True}) + except Exception as e: + return json_response({"error": str(e)}, 500) + + +@app.route("/api/elevators//go_to_floor", methods=["POST"]) +def elevator_go_to_floor(elevator_id: int) -> Response | tuple[Response, int]: + try: + data: Dict[str, Any] = request.get_json() or {} + floor = data["floor"] + immediate = data.get("immediate", False) + simulation.elevator_go_to_floor(elevator_id, floor, immediate) + return json_response({"success": True}) + except Exception as e: + return json_response({"error": str(e)}, 500) + + +@app.route("/api/elevators//set_indicators", methods=["POST"]) +def elevator_set_indicators(elevator_id: int) -> Response | tuple[Response, int]: + try: + data: Dict[str, Any] = request.get_json() or {} + up = data.get("up") + down = data.get("down") + simulation.elevator_set_indicators(elevator_id, up, down) + return json_response({"success": True}) + except Exception as e: + return json_response({"error": str(e)}, 500) + + +@app.route("/api/traffic/next", methods=["POST"]) +def next_traffic_round() -> Response | tuple[Response, int]: + """切换到下一个流量文件""" + try: + success = simulation.next_traffic_round() + if success: + return json_response({"success": True}) + else: + return json_response({"success": False, "error": "No traffic files available"}, 400) + except Exception as e: + return json_response({"error": str(e)}, 500) + + +@app.route("/api/traffic/info", methods=["GET"]) +def get_traffic_info() -> Response | tuple[Response, int]: + """获取当前流量文件信息""" + try: + info = simulation.get_traffic_info() + return json_response(info) + except Exception as e: + return json_response({"error": str(e)}, 500) + + +@app.route("/api/force_complete", methods=["POST"]) +def force_complete_passengers() -> Response | tuple[Response, int]: + """强制完成所有未完成的乘客""" + try: + completed_count = simulation.force_complete_remaining_passengers() + return json_response({"success": True, "completed_count": completed_count}) + except Exception as e: + return json_response({"error": str(e)}, 500) + + +def main() -> None: + global simulation + + parser = argparse.ArgumentParser(description="Elevator Simulation Server") + parser.add_argument("--host", default="127.0.0.1", help="Server host") + parser.add_argument("--port", type=int, default=8000, help="Server port") + parser.add_argument("--debug", default=True, action="store_true", help="Enable debug logging") + + args = parser.parse_args() + + # Enable debug mode if requested + if args.debug: + set_server_debug_mode(True) + server_debug_log("Server debug mode enabled") + app.config["DEBUG"] = True + + # Create simulation with traffic directory + simulation = ElevatorSimulation(f"{os.path.join(os.path.dirname(__file__), '..', 'traffic')}") + + # Print traffic status + print(f"Elevator simulation server running on http://{args.host}:{args.port}") + + try: + app.run(host=args.host, port=args.port, debug=args.debug, threaded=True) + except KeyboardInterrupt: + print("\nShutting down server...") + + +if __name__ == "__main__": + main() diff --git a/elevator_saga/traffic/__init__.py b/elevator_saga/traffic/__init__.py new file mode 100644 index 0000000..ecaaeed --- /dev/null +++ b/elevator_saga/traffic/__init__.py @@ -0,0 +1,3 @@ +""" +Traffic pattern generators +""" diff --git a/elevator_saga/traffic/generators.py b/elevator_saga/traffic/generators.py new file mode 100644 index 0000000..dc0d8dd --- /dev/null +++ b/elevator_saga/traffic/generators.py @@ -0,0 +1,984 @@ +#!/usr/bin/env python3 +""" +Traffic Pattern Generators for Elevator Simulation +Generate JSON traffic files for different scenarios with scalable building sizes +From small (1 elevator, 3 floors, 10 people) to large (4 elevators, 12 floors, 200 people) +""" +import json +import math +import os.path +import random +from pathlib import Path +from typing import Any, Dict, List, Optional + +# 建筑规模配置 +BUILDING_SCALES = { + "small": { + "floors": (3, 5), + "elevators": (1, 2), + "capacity": (6, 8), + "max_people": (10, 40), + "duration_range": (120, 240), + "description": "小型建筑 - 1-2台电梯,3-5楼,10-40人", + }, + "medium": { + "floors": (6, 9), + "elevators": (2, 3), + "capacity": (8, 12), + "max_people": (40, 120), + "duration_range": (200, 400), + "description": "中型建筑 - 2-3台电梯,6-9楼,40-120人", + }, + "large": { + "floors": (10, 12), + "elevators": (3, 4), + "capacity": (10, 15), + "max_people": (120, 200), + "duration_range": (300, 600), + "description": "大型建筑 - 3-4台电梯,10-12楼,120-200人", + }, +} + + +def calculate_intensity_for_scale(base_intensity: float, floors: int, target_people: int, duration: int) -> float: + """根据建筑规模计算合适的流量强度""" + # 估算每tick平均产生的人数 + estimated_people_per_tick = base_intensity + total_estimated = estimated_people_per_tick * duration + + if total_estimated <= 0: + return base_intensity + + # 调整强度以达到目标人数 + adjustment_factor = target_people / total_estimated + return min(1.0, base_intensity * adjustment_factor) + + +def limit_traffic_count(traffic: List[Dict[str, Any]], max_people: int) -> List[Dict[str, Any]]: + """限制流量中的人数不超过最大值""" + if len(traffic) <= max_people: + return traffic + + # 按时间排序,优先保留早期的乘客 + traffic_sorted = sorted(traffic, key=lambda x: x["tick"]) + return traffic_sorted[:max_people] + + +def generate_up_peak_traffic( + floors: int = 10, duration: int = 300, intensity: float = 0.6, max_people: int = 100, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成上行高峰流量 - 主要从底层到高层""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 根据目标人数调整强度 + adjusted_intensity = calculate_intensity_for_scale(intensity, floors, max_people, duration) + + for tick in range(duration): + # 根据时间调整强度 - 早期高峰 + time_factor = 1.0 + 0.5 * math.sin(tick * math.pi / duration) + current_intensity = adjusted_intensity * time_factor + + if random.random() < current_intensity: + # 针对小建筑调整比例 - 小建筑大厅使用更频繁 + lobby_ratio = 0.95 if floors <= 5 else 0.9 + + if random.random() < lobby_ratio: + origin = 0 + destination = random.randint(1, floors - 1) + else: + # 其他楼层间流量 + if floors > 2: + origin = random.randint(1, min(floors - 2, floors - 1)) + destination = random.randint(origin + 1, floors - 1) + else: + origin = 0 + destination = floors - 1 + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_down_peak_traffic( + floors: int = 10, duration: int = 300, intensity: float = 0.6, max_people: int = 100, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成下行高峰流量 - 主要从高层到底层""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 根据目标人数调整强度 + adjusted_intensity = calculate_intensity_for_scale(intensity, floors, max_people, duration) + + for tick in range(duration): + # 根据时间调整强度 - 后期高峰 + time_factor = 1.0 + 0.5 * math.sin((tick + duration / 2) * math.pi / duration) + current_intensity = adjusted_intensity * time_factor + + if random.random() < current_intensity: + # 针对小建筑调整比例 - 小建筑到大厅更频繁 + lobby_ratio = 0.95 if floors <= 5 else 0.9 + + if random.random() < lobby_ratio: + origin = random.randint(1, floors - 1) + destination = 0 + else: + # 其他楼层间流量 + if floors > 2: + origin = random.randint(2, floors - 1) + destination = random.randint(1, origin - 1) + else: + origin = floors - 1 + destination = 0 + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_inter_floor_traffic( + floors: int = 10, duration: int = 400, intensity: float = 0.4, max_people: int = 80, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成楼层间流量 - 主要楼层间移动,适合小建筑""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 小建筑更适合这种场景,调整强度 + if floors <= 5: + adjusted_intensity = calculate_intensity_for_scale(intensity * 1.2, floors, max_people, duration) + else: + adjusted_intensity = calculate_intensity_for_scale(intensity, floors, max_people, duration) + + for tick in range(duration): + # 平稳的流量,轻微波动 + time_variation = 1.0 + 0.2 * math.sin(tick * 2 * math.pi / duration) + current_intensity = adjusted_intensity * time_variation + + if random.random() < current_intensity: + if floors <= 3: + # 超小建筑,允许包含大厅 + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + else: + # 其他建筑,避免大厅 + origin = random.randint(1, floors - 1) + destination = random.choice([f for f in range(1, floors) if f != origin]) + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_lunch_rush_traffic( + floors: int = 10, duration: int = 200, intensity: float = 0.7, max_people: int = 60, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成午餐时间流量 - 双向流量,适合中大型建筑""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 小建筑没有餐厅概念,生成简单的双向流量 + if floors <= 5: + # 小建筑简化为楼层间随机流量 + adjusted_intensity = calculate_intensity_for_scale(intensity * 0.6, floors, max_people, duration) + + for tick in range(duration): + # 高斯分布的流量强度 + peak_center = duration // 2 + peak_width = duration // 4 + distance_from_peak = abs(tick - peak_center) / peak_width + current_intensity = adjusted_intensity * max(0.3, math.exp(-distance_from_peak * distance_from_peak)) + + if random.random() < current_intensity: + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + else: + # 中大型建筑,假设1-2楼是餐厅,3+楼是办公室 + restaurant_floors = [1, 2] if floors > 2 else [1] + office_floors = list(range(max(3, len(restaurant_floors) + 1), floors)) + + adjusted_intensity = calculate_intensity_for_scale(intensity, floors, max_people, duration) + + for tick in range(duration): + # 高斯分布的流量强度 + peak_center = duration // 2 + peak_width = duration // 4 + distance_from_peak = abs(tick - peak_center) / peak_width + current_intensity = adjusted_intensity * max(0.2, math.exp(-distance_from_peak * distance_from_peak)) + + if random.random() < current_intensity: + if office_floors and random.random() < 0.5: + # 去餐厅 + origin = random.choice(office_floors) + destination = random.choice(restaurant_floors) + else: + # 回办公室 + origin = random.choice(restaurant_floors) + destination = random.choice(office_floors) if office_floors else 0 + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_random_traffic( + floors: int = 10, duration: int = 500, intensity: float = 0.3, max_people: int = 80, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成随机流量 - 均匀分布,适合所有规模建筑""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 根据目标人数调整强度 + adjusted_intensity = calculate_intensity_for_scale(intensity, floors, max_people, duration) + + for tick in range(duration): + # 添加轻微的时间变化,避免完全平坦 + time_variation = 1.0 + 0.1 * math.sin(tick * 4 * math.pi / duration) + current_intensity = adjusted_intensity * time_variation + + if random.random() < current_intensity: + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_fire_evacuation_traffic( + floors: int = 10, duration: int = 150, max_people: int = 120, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成火警疏散流量 - 紧急疏散到大厅""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 正常时间段 + normal_duration = duration // 3 + + # 正常流量 - 较少 + normal_intensity = 0.15 + for tick in range(normal_duration): + if random.random() < normal_intensity: + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + # 火警开始 - 大量疏散到大厅 + alarm_tick = normal_duration + + # 根据建筑规模调整每层人数 + if floors <= 5: + people_per_floor = (2, 4) # 小建筑每层2-4人 + elif floors <= 9: + people_per_floor = (3, 6) # 中建筑每层3-6人 + else: + people_per_floor = (4, 8) # 大建筑每层4-8人 + + for floor in range(1, floors): + # 每层随机数量的人需要疏散 + num_people = random.randint(people_per_floor[0], people_per_floor[1]) + for i in range(num_people): + # 在10个tick内陆续到达,模拟疏散的紧急性 + arrival_tick = alarm_tick + random.randint(0, min(10, duration - alarm_tick - 1)) + if arrival_tick < duration: + traffic.append( + {"id": passenger_id, "origin": floor, "destination": 0, "tick": arrival_tick} # 疏散到大厅 + ) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_mixed_scenario_traffic( + floors: int = 10, duration: int = 600, max_people: int = 150, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成混合场景流量 - 包含多种模式,适合中大型建筑""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 根据人数目标调整各阶段强度 + target_per_phase = max_people // 4 + + # 第一阶段:上行高峰 (0-25%) + phase1_end = duration // 4 + phase1_intensity = calculate_intensity_for_scale(0.7, floors, target_per_phase, phase1_end) + + for tick in range(phase1_end): + if random.random() < phase1_intensity: + lobby_ratio = 0.9 if floors > 5 else 0.95 + if random.random() < lobby_ratio: + origin = 0 + destination = random.randint(1, floors - 1) + else: + if floors > 2: + origin = random.randint(0, floors - 2) + destination = random.randint(origin + 1, floors - 1) + else: + origin = 0 + destination = floors - 1 + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + # 第二阶段:正常流量 (25%-50%) + phase2_end = duration // 2 + phase2_intensity = calculate_intensity_for_scale(0.3, floors, target_per_phase, phase2_end - phase1_end) + + for tick in range(phase1_end, phase2_end): + if random.random() < phase2_intensity: + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + # 第三阶段:午餐/中峰流量 (50%-67%) + phase3_end = phase2_end + duration // 6 + phase3_intensity = calculate_intensity_for_scale(0.6, floors, target_per_phase, phase3_end - phase2_end) + + for tick in range(phase2_end, phase3_end): + if random.random() < phase3_intensity: + if floors > 5 and random.random() < 0.6: + # 餐厅流量 - 仅适用于大型建筑 + if random.random() < 0.5: + origin = random.randint(3, floors - 1) + destination = random.randint(1, 2) + else: + origin = random.randint(1, 2) + destination = random.randint(3, floors - 1) + else: + # 其他流量 + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + # 第四阶段:下行高峰 (67%-100%) + phase4_intensity = calculate_intensity_for_scale(0.6, floors, target_per_phase, duration - phase3_end) + + for tick in range(phase3_end, duration): + if random.random() < phase4_intensity: + lobby_ratio = 0.85 if floors > 5 else 0.9 + if random.random() < lobby_ratio: + origin = random.randint(1, floors - 1) + destination = 0 + else: + if floors > 2: + origin = random.randint(2, floors - 1) + destination = random.randint(1, origin - 1) + else: + origin = floors - 1 + destination = 0 + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_high_density_traffic( + floors: int = 10, duration: int = 300, intensity: float = 1.2, max_people: int = 200, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成高密度流量 - 压力测试,适合测试电梯系统极限""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 计算目标强度,确保不超过人数限制 + target_people_per_tick = max_people / duration + safe_intensity = min(intensity, target_people_per_tick * 1.5) # 留出一些余量 + + for tick in range(duration): + # 高强度的随机流量,使用高斯分布增加变化 + base_passengers = safe_intensity + variation = random.gauss(0, safe_intensity * 0.3) # 30%变化 + num_passengers = max(0, int(base_passengers + variation)) + + for _ in range(num_passengers): + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + # 提前检查,避免生成过多乘客 + if len(traffic) >= max_people: + break + + if len(traffic) >= max_people: + break + + return limit_traffic_count(traffic, max_people) + + +def generate_small_building_traffic( + floors: int = 4, duration: int = 180, intensity: float = 0.4, max_people: int = 25, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成小建筑专用流量 - 简单楼层间移动,适合3-5层建筑""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 小建筑特点:频繁使用大厅,简单的上下楼 + adjusted_intensity = calculate_intensity_for_scale(intensity, floors, max_people, duration) + + for tick in range(duration): + # 轻微的时间变化 + time_factor = 1.0 + 0.3 * math.sin(tick * 2 * math.pi / duration) + current_intensity = adjusted_intensity * time_factor + + if random.random() < current_intensity: + # 80%涉及大厅的移动 + if random.random() < 0.8: + if random.random() < 0.5: + # 从大厅上楼 + origin = 0 + destination = random.randint(1, floors - 1) + else: + # 下到大厅 + origin = random.randint(1, floors - 1) + destination = 0 + else: + # 楼层间移动 + origin = random.randint(1, floors - 1) + destination = random.choice([f for f in range(1, floors) if f != origin]) + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_medical_building_traffic( + floors: int = 8, duration: int = 240, intensity: float = 0.5, max_people: int = 80, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成医疗建筑流量 - 模拟医院/诊所的特殊流量模式""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 医疗建筑特点:大厅使用频繁,某些楼层(如手术室)访问较少 + adjusted_intensity = calculate_intensity_for_scale(intensity, floors, max_people, duration) + + # 定义楼层类型权重 - 大厅和低层更频繁 + floor_weights = [] + for floor in range(floors): + if floor == 0: # 大厅 - 最高权重 + weight = 3.0 + elif floor <= 2: # 急诊、门诊 - 高权重 + weight = 2.0 + elif floor <= floors - 2: # 普通病房 - 中等权重 + weight = 1.0 + else: # 手术室、ICU - 低权重 + weight = 0.3 + floor_weights.append(weight) + + for tick in range(duration): + # 医疗建筑通常有明显的时间模式 + time_factor = 1.0 + 0.4 * math.sin((tick + duration * 0.2) * math.pi / duration) + current_intensity = adjusted_intensity * time_factor + + if random.random() < current_intensity: + # 85%的移动涉及大厅 + if random.random() < 0.85: + if random.random() < 0.6: + # 从大厅到其他楼层 + origin = 0 + # 使用权重选择目标楼层 + destinations = list(range(1, floors)) + weights = floor_weights[1:] + destination = random.choices(destinations, weights=weights)[0] + else: + # 从其他楼层到大厅 + origins = list(range(1, floors)) + weights = floor_weights[1:] + origin = random.choices(origins, weights=weights)[0] + destination = 0 + else: + # 楼层间移动(较少) + floor_candidates = list(range(floors)) + origin = random.choice(floor_candidates) + destination = random.choice([f for f in floor_candidates if f != origin]) + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_meeting_event_traffic( + floors: int = 6, duration: int = 150, intensity: float = 0.8, max_people: int = 50, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成会议事件流量 - 模拟大型会议开始和结束的流量模式""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 假设会议在某个楼层举行 + meeting_floor = floors // 2 if floors > 2 else 1 + + # 会议分为三个阶段:到达、中间、离开 + arrival_end = duration // 3 + departure_start = duration * 2 // 3 + + for tick in range(duration): + should_add_passenger = False + origin = 0 + destination = 0 + + if tick < arrival_end: + # 到达阶段 - 大量人员前往会议楼层 + phase_progress = tick / arrival_end + current_intensity = intensity * (1.0 + math.sin(phase_progress * math.pi)) + + if random.random() < current_intensity: + # 主要从大厅到会议楼层 + if random.random() < 0.9: + origin = 0 + destination = meeting_floor + else: + # 少量其他楼层间移动 + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + should_add_passenger = True + + elif tick >= departure_start: + # 离开阶段 - 大量人员从会议楼层离开 + phase_progress = (tick - departure_start) / (duration - departure_start) + current_intensity = intensity * (1.0 + math.sin(phase_progress * math.pi)) + + if random.random() < current_intensity: + # 主要从会议楼层到大厅 + if random.random() < 0.9: + origin = meeting_floor + destination = 0 + else: + # 少量其他移动 + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + should_add_passenger = True + else: + # 中间阶段 - 低流量 + if random.random() < intensity * 0.1: + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + should_add_passenger = True + + if should_add_passenger: + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +def generate_progressive_test_traffic( + floors: int = 8, duration: int = 400, max_people: int = 100, seed: int = 42 +) -> List[Dict[str, Any]]: + """生成渐进式测试流量 - 从低强度逐渐增加到高强度""" + random.seed(seed) + traffic = [] + passenger_id = 1 + + # 分为四个阶段,强度逐渐增加 + stage_duration = duration // 4 + + for stage in range(4): + stage_start = stage * stage_duration + stage_end = min((stage + 1) * stage_duration, duration) + stage_intensity = 0.2 + stage * 0.25 # 0.2, 0.45, 0.7, 0.95 + + stage_target = max_people // 4 + adjusted_intensity = calculate_intensity_for_scale(stage_intensity, floors, stage_target, stage_duration) + + for tick in range(stage_start, stage_end): + # 每个阶段内部也有变化 + local_progress = (tick - stage_start) / stage_duration + time_factor = 1.0 + 0.3 * math.sin(local_progress * 2 * math.pi) + current_intensity = adjusted_intensity * time_factor + + if random.random() < current_intensity: + origin = random.randint(0, floors - 1) + destination = random.choice([f for f in range(floors) if f != origin]) + + traffic.append({"id": passenger_id, "origin": origin, "destination": destination, "tick": tick}) + passenger_id += 1 + + return limit_traffic_count(traffic, max_people) + + +# 按建筑规模分类的场景配置 +TRAFFIC_SCENARIOS = { + # 经典场景 - 适用于所有规模,会根据建筑规模自动调整 + "up_peak": { + "generator": generate_up_peak_traffic, + "description": "上行高峰 - 主要从底层到高层", + "scales": { + "small": {"intensity": 0.5, "max_people": 20}, + "medium": {"intensity": 0.6, "max_people": 80}, + "large": {"intensity": 0.7, "max_people": 150}, + }, + "suitable_scales": ["small", "medium", "large"], + }, + "down_peak": { + "generator": generate_down_peak_traffic, + "description": "下行高峰 - 主要从高层到底层", + "scales": { + "small": {"intensity": 0.5, "max_people": 20}, + "medium": {"intensity": 0.6, "max_people": 80}, + "large": {"intensity": 0.7, "max_people": 150}, + }, + "suitable_scales": ["small", "medium", "large"], + }, + "inter_floor": { + "generator": generate_inter_floor_traffic, + "description": "楼层间流量 - 适合小建筑", + "scales": { + "small": {"intensity": 0.6, "max_people": 30}, + "medium": {"intensity": 0.4, "max_people": 60}, + "large": {"intensity": 0.3, "max_people": 80}, + }, + "suitable_scales": ["small", "medium", "large"], + }, + "lunch_rush": { + "generator": generate_lunch_rush_traffic, + "description": "午餐时间流量 - 双向流量,适合中大型建筑", + "scales": { + "small": {"intensity": 0.4, "max_people": 25}, + "medium": {"intensity": 0.7, "max_people": 60}, + "large": {"intensity": 0.8, "max_people": 100}, + }, + "suitable_scales": ["medium", "large"], + }, + "random": { + "generator": generate_random_traffic, + "description": "随机流量 - 均匀分布,适合所有规模", + "scales": { + "small": {"intensity": 0.4, "max_people": 25}, + "medium": {"intensity": 0.3, "max_people": 80}, + "large": {"intensity": 0.25, "max_people": 120}, + }, + "suitable_scales": ["small", "medium", "large"], + }, + "fire_evacuation": { + "generator": generate_fire_evacuation_traffic, + "description": "火警疏散 - 紧急疏散到大厅", + "scales": {"small": {"max_people": 20}, "medium": {"max_people": 70}, "large": {"max_people": 120}}, + "suitable_scales": ["small", "medium", "large"], + }, + "mixed_scenario": { + "generator": generate_mixed_scenario_traffic, + "description": "混合场景 - 包含多种流量模式,适合中大型建筑", + "scales": {"medium": {"max_people": 100}, "large": {"max_people": 180}}, + "suitable_scales": ["medium", "large"], + }, + "high_density": { + "generator": generate_high_density_traffic, + "description": "高密度流量 - 压力测试", + "scales": { + "small": {"intensity": 0.8, "max_people": 35}, + "medium": {"intensity": 1.0, "max_people": 120}, + "large": {"intensity": 1.2, "max_people": 200}, + }, + "suitable_scales": ["small", "medium", "large"], + }, + # 新增的专用场景 + "small_building": { + "generator": generate_small_building_traffic, + "description": "小建筑专用 - 简单楼层间移动", + "scales": {"small": {"intensity": 0.4, "max_people": 25}}, + "suitable_scales": ["small"], + }, + "medical": { + "generator": generate_medical_building_traffic, + "description": "医疗建筑 - 特殊流量模式", + "scales": {"medium": {"intensity": 0.5, "max_people": 80}, "large": {"intensity": 0.6, "max_people": 120}}, + "suitable_scales": ["medium", "large"], + }, + "meeting_event": { + "generator": generate_meeting_event_traffic, + "description": "会议事件 - 集中到达和离开", + "scales": { + "small": {"intensity": 0.6, "max_people": 30}, + "medium": {"intensity": 0.8, "max_people": 50}, + "large": {"intensity": 1.0, "max_people": 80}, + }, + "suitable_scales": ["small", "medium", "large"], + }, + "progressive_test": { + "generator": generate_progressive_test_traffic, + "description": "渐进式测试 - 强度逐渐增加", + "scales": {"small": {"max_people": 40}, "medium": {"max_people": 100}, "large": {"max_people": 150}}, + "suitable_scales": ["small", "medium", "large"], + }, +} + + +def determine_building_scale(floors: int, elevators: int) -> str: + """根据楼层数和电梯数确定建筑规模""" + if floors <= 5 and elevators <= 2: + return "small" + elif floors <= 9 and elevators <= 3: + return "medium" + else: + return "large" + + +def generate_traffic_file(scenario: str, output_file: str, scale: Optional[str] = None, **kwargs) -> int: + """生成单个流量文件,支持规模化配置""" + if scenario not in TRAFFIC_SCENARIOS: + raise ValueError(f"Unknown scenario: {scenario}. Available: {list(TRAFFIC_SCENARIOS.keys())}") + + config = TRAFFIC_SCENARIOS[scenario] + + # 确定建筑规模 + if scale is None: + floors = kwargs.get("floors", 6) # 默认中等规模 + elevators = kwargs.get("elevators", 2) + scale = determine_building_scale(floors, elevators) + + # 检查场景是否适合该规模 + if scale not in config["suitable_scales"]: + print( + f"Warning: Scenario '{scenario}' not recommended for scale '{scale}'. Suitable scales: {config['suitable_scales']}" + ) + # 选择最接近的适合规模 + if "medium" in config["suitable_scales"]: + scale = "medium" + else: + scale = config["suitable_scales"][0] + + # 获取规模特定的参数 + scale_params = config["scales"].get(scale, {}) + + # 合并参数:kwargs > scale_params > building_scale_defaults + building_scale = BUILDING_SCALES[scale] + params = {} + + # 设置默认参数 + params["floors"] = kwargs.get("floors", building_scale["floors"][0]) + params["elevators"] = kwargs.get("elevators", building_scale["elevators"][0]) + params["elevator_capacity"] = kwargs.get("elevator_capacity", building_scale["capacity"][0]) + + # 设置场景相关参数 + params["duration"] = kwargs.get("duration", building_scale["duration_range"][0]) + params["intensity"] = scale_params.get("intensity", 0.5) + params["max_people"] = scale_params.get("max_people", building_scale["max_people"][0]) + params["seed"] = kwargs.get("seed", 42) + + # 允许kwargs完全覆盖 + params.update(kwargs) + + # 生成流量数据 - 只传递生成器函数需要的参数 + import inspect + + generator_signature = inspect.signature(config["generator"]) + generator_params = {k: v for k, v in params.items() if k in generator_signature.parameters} + traffic_data = config["generator"](**generator_params) + + # 准备building配置 + building_config = { + "floors": params["floors"], + "elevators": params["elevators"], + "elevator_capacity": params["elevator_capacity"], + "scenario": scenario, + "scale": scale, + "description": f"{config['description']} ({scale}规模)", + "expected_passengers": len(traffic_data), + "duration": params["duration"], + } + + # 组合完整的数据结构 + complete_data = {"building": building_config, "traffic": traffic_data} + + # 写入文件 + with open(output_file, "w") as f: + json.dump(complete_data, f, indent=2, ensure_ascii=False) + + print(f"Generated {len(traffic_data)} passengers for scenario '{scenario}' ({scale}) -> {output_file}") + return len(traffic_data) + + +def generate_scaled_traffic_files( + output_dir: str, + scale: str = "medium", + seed: int = 42, + generate_all_scales: bool = False, + custom_building: Optional[Dict[str, Any]] = None, +): + """生成按规模分类的流量文件""" + output_path = Path(output_dir) + output_path.mkdir(exist_ok=True) + + if generate_all_scales: + # 生成所有规模的文件 + for scale_name in ["small", "medium", "large"]: + scale_dir = output_path / scale_name + scale_dir.mkdir(exist_ok=True) + _generate_files_for_scale(scale_dir, scale_name, seed) + else: + # 只生成指定规模 + if custom_building: + floors = custom_building.get("floors", BUILDING_SCALES[scale]["floors"][0]) + elevators = custom_building.get("elevators", BUILDING_SCALES[scale]["elevators"][0]) + elevator_capacity = custom_building.get("capacity", BUILDING_SCALES[scale]["capacity"][0]) + + # 重新确定规模 + detected_scale = determine_building_scale(floors, elevators) + if detected_scale != scale: + print(f"Note: Building config suggests {detected_scale} scale, but {scale} was requested") + scale = detected_scale + + _generate_files_for_scale(output_path, scale, seed, custom_building) + + +def _generate_files_for_scale( + output_path: Path, scale: str, seed: int, custom_building: Optional[Dict[str, Any]] = None +): + """为指定规模生成所有适合的场景文件""" + building_config = BUILDING_SCALES[scale] + total_passengers = 0 + files_generated = 0 + + # 确定建筑参数 + if custom_building: + floors = custom_building.get("floors", building_config["floors"][0]) + elevators = custom_building.get("elevators", building_config["elevators"][0]) + elevator_capacity = custom_building.get("capacity", building_config["capacity"][0]) + else: + # 使用规模的默认配置 + floors = building_config["floors"][0] + elevators = building_config["elevators"][0] + elevator_capacity = building_config["capacity"][0] + + print(f"\nGenerating {scale} scale traffic files:") + print(f"Building: {floors} floors, {elevators} elevators, capacity {elevator_capacity}") + + for scenario_name, config in TRAFFIC_SCENARIOS.items(): + # 检查场景是否适合该规模 + if scale not in config["suitable_scales"]: + continue + + filename = f"{scenario_name}.json" + file_path = output_path / filename + + # 准备参数 + params = { + "floors": floors, + "elevators": elevators, + "elevator_capacity": elevator_capacity, + "seed": seed + hash(scenario_name) % 1000, # 为每个场景使用不同的seed + } + + # 生成流量文件 + try: + passenger_count = generate_traffic_file(scenario_name, str(file_path), scale=scale, **params) + total_passengers += passenger_count + files_generated += 1 + except Exception as e: + print(f"Error generating {scenario_name}: {e}") + + print(f"Generated {files_generated} traffic files for {scale} scale in {output_path}") + print(f"Total passengers: {total_passengers}") + print( + f"Average per scenario: {total_passengers/files_generated:.1f}" if files_generated > 0 else "No files generated" + ) + + +def generate_all_traffic_files( + output_dir: str, + floors: int = 6, + elevators: int = 2, + elevator_capacity: int = 8, + seed: int = 42, +): + """生成所有场景的流量文件 - 保持向后兼容""" + scale = determine_building_scale(floors, elevators) + custom_building = {"floors": floors, "elevators": elevators, "capacity": elevator_capacity} + + generate_scaled_traffic_files(output_dir=output_dir, scale=scale, seed=seed, custom_building=custom_building) + + +def main(): + """主函数 - 命令行接口""" + import argparse + + parser = argparse.ArgumentParser(description="Generate scalable elevator traffic files") + parser.add_argument( + "--scale", + type=str, + choices=["small", "medium", "large"], + help="Building scale (overrides individual parameters)", + ) + parser.add_argument( + "--all-scales", action="store_true", help="Generate files for all scales in separate directories" + ) + parser.add_argument("--floors", type=int, help="Number of floors") + parser.add_argument("--elevators", type=int, help="Number of elevators") + parser.add_argument("--elevator-capacity", type=int, help="Elevator capacity") + parser.add_argument("--seed", type=int, default=42, help="Random seed") + parser.add_argument("--output-dir", type=str, default=None, help="Output directory (default: current directory)") + + args = parser.parse_args() + + output_dir = args.output_dir or os.path.dirname(__file__) + + if args.all_scales: + # 生成所有规模的文件 + generate_scaled_traffic_files(output_dir=output_dir, generate_all_scales=True, seed=args.seed) + elif args.scale: + # 生成指定规模的文件 + custom_building = None + if args.floors or args.elevators or args.elevator_capacity: + custom_building = {} + if args.floors: + custom_building["floors"] = args.floors + if args.elevators: + custom_building["elevators"] = args.elevators + if args.elevator_capacity: + custom_building["capacity"] = args.elevator_capacity + + generate_scaled_traffic_files( + output_dir=output_dir, scale=args.scale, seed=args.seed, custom_building=custom_building + ) + else: + # 向后兼容模式:使用旧的参数 + floors = args.floors or 6 + elevators = args.elevators or 2 + elevator_capacity = args.elevator_capacity or 8 + + generate_all_traffic_files( + output_dir=output_dir, + floors=floors, + elevators=elevators, + elevator_capacity=elevator_capacity, + seed=args.seed, + ) + + print("\nUsage examples:") + print(" # Generate all scales:") + print(" python generators.py --all-scales") + print(" # Generate small scale:") + print(" python generators.py --scale small") + print(" # Custom building (auto-detect scale):") + print(" python generators.py --floors 3 --elevators 1") + print(" # Force scale with custom config:") + print(" python generators.py --scale large --floors 12 --elevators 4") + + +if __name__ == "__main__": + main() diff --git a/elevator_saga/traffic/inter_floor.json b/elevator_saga/traffic/inter_floor.json new file mode 100644 index 0000000..614700b --- /dev/null +++ b/elevator_saga/traffic/inter_floor.json @@ -0,0 +1,470 @@ +{ + "building": { + "floors": 12, + "elevators": 4, + "elevator_capacity": 8, + "scenario": "inter_floor", + "scale": "large", + "description": "楼层间流量 - 适合小建筑 (large规模)", + "expected_passengers": 76, + "duration": 300 + }, + "traffic": [ + { + "id": 1, + "origin": 3, + "destination": 8, + "tick": 7 + }, + { + "id": 2, + "origin": 2, + "destination": 6, + "tick": 11 + }, + { + "id": 3, + "origin": 11, + "destination": 3, + "tick": 12 + }, + { + "id": 4, + "origin": 11, + "destination": 10, + "tick": 13 + }, + { + "id": 5, + "origin": 1, + "destination": 4, + "tick": 20 + }, + { + "id": 6, + "origin": 1, + "destination": 2, + "tick": 22 + }, + { + "id": 7, + "origin": 6, + "destination": 3, + "tick": 26 + }, + { + "id": 8, + "origin": 8, + "destination": 9, + "tick": 27 + }, + { + "id": 9, + "origin": 1, + "destination": 8, + "tick": 28 + }, + { + "id": 10, + "origin": 9, + "destination": 3, + "tick": 40 + }, + { + "id": 11, + "origin": 9, + "destination": 3, + "tick": 52 + }, + { + "id": 12, + "origin": 8, + "destination": 10, + "tick": 57 + }, + { + "id": 13, + "origin": 10, + "destination": 9, + "tick": 58 + }, + { + "id": 14, + "origin": 4, + "destination": 1, + "tick": 60 + }, + { + "id": 15, + "origin": 7, + "destination": 8, + "tick": 65 + }, + { + "id": 16, + "origin": 11, + "destination": 1, + "tick": 68 + }, + { + "id": 17, + "origin": 5, + "destination": 7, + "tick": 69 + }, + { + "id": 18, + "origin": 8, + "destination": 9, + "tick": 71 + }, + { + "id": 19, + "origin": 3, + "destination": 9, + "tick": 75 + }, + { + "id": 20, + "origin": 3, + "destination": 8, + "tick": 90 + }, + { + "id": 21, + "origin": 2, + "destination": 4, + "tick": 91 + }, + { + "id": 22, + "origin": 8, + "destination": 9, + "tick": 94 + }, + { + "id": 23, + "origin": 5, + "destination": 8, + "tick": 97 + }, + { + "id": 24, + "origin": 1, + "destination": 3, + "tick": 101 + }, + { + "id": 25, + "origin": 2, + "destination": 4, + "tick": 106 + }, + { + "id": 26, + "origin": 2, + "destination": 10, + "tick": 107 + }, + { + "id": 27, + "origin": 4, + "destination": 7, + "tick": 111 + }, + { + "id": 28, + "origin": 2, + "destination": 7, + "tick": 112 + }, + { + "id": 29, + "origin": 8, + "destination": 6, + "tick": 115 + }, + { + "id": 30, + "origin": 8, + "destination": 9, + "tick": 116 + }, + { + "id": 31, + "origin": 7, + "destination": 3, + "tick": 118 + }, + { + "id": 32, + "origin": 6, + "destination": 7, + "tick": 121 + }, + { + "id": 33, + "origin": 2, + "destination": 11, + "tick": 123 + }, + { + "id": 34, + "origin": 6, + "destination": 1, + "tick": 128 + }, + { + "id": 35, + "origin": 8, + "destination": 7, + "tick": 129 + }, + { + "id": 36, + "origin": 6, + "destination": 10, + "tick": 131 + }, + { + "id": 37, + "origin": 8, + "destination": 4, + "tick": 135 + }, + { + "id": 38, + "origin": 10, + "destination": 6, + "tick": 137 + }, + { + "id": 39, + "origin": 10, + "destination": 5, + "tick": 157 + }, + { + "id": 40, + "origin": 1, + "destination": 8, + "tick": 158 + }, + { + "id": 41, + "origin": 6, + "destination": 11, + "tick": 159 + }, + { + "id": 42, + "origin": 7, + "destination": 5, + "tick": 161 + }, + { + "id": 43, + "origin": 6, + "destination": 4, + "tick": 162 + }, + { + "id": 44, + "origin": 3, + "destination": 8, + "tick": 163 + }, + { + "id": 45, + "origin": 6, + "destination": 9, + "tick": 165 + }, + { + "id": 46, + "origin": 6, + "destination": 5, + "tick": 172 + }, + { + "id": 47, + "origin": 2, + "destination": 8, + "tick": 173 + }, + { + "id": 48, + "origin": 9, + "destination": 10, + "tick": 174 + }, + { + "id": 49, + "origin": 1, + "destination": 10, + "tick": 175 + }, + { + "id": 50, + "origin": 3, + "destination": 5, + "tick": 176 + }, + { + "id": 51, + "origin": 1, + "destination": 8, + "tick": 179 + }, + { + "id": 52, + "origin": 5, + "destination": 4, + "tick": 184 + }, + { + "id": 53, + "origin": 7, + "destination": 10, + "tick": 186 + }, + { + "id": 54, + "origin": 8, + "destination": 2, + "tick": 194 + }, + { + "id": 55, + "origin": 5, + "destination": 1, + "tick": 200 + }, + { + "id": 56, + "origin": 10, + "destination": 2, + "tick": 204 + }, + { + "id": 57, + "origin": 5, + "destination": 2, + "tick": 205 + }, + { + "id": 58, + "origin": 7, + "destination": 11, + "tick": 210 + }, + { + "id": 59, + "origin": 11, + "destination": 4, + "tick": 212 + }, + { + "id": 60, + "origin": 4, + "destination": 7, + "tick": 230 + }, + { + "id": 61, + "origin": 8, + "destination": 6, + "tick": 239 + }, + { + "id": 62, + "origin": 7, + "destination": 3, + "tick": 249 + }, + { + "id": 63, + "origin": 6, + "destination": 1, + "tick": 250 + }, + { + "id": 64, + "origin": 5, + "destination": 4, + "tick": 254 + }, + { + "id": 65, + "origin": 9, + "destination": 10, + "tick": 256 + }, + { + "id": 66, + "origin": 4, + "destination": 7, + "tick": 267 + }, + { + "id": 67, + "origin": 11, + "destination": 4, + "tick": 269 + }, + { + "id": 68, + "origin": 3, + "destination": 8, + "tick": 272 + }, + { + "id": 69, + "origin": 8, + "destination": 10, + "tick": 278 + }, + { + "id": 70, + "origin": 10, + "destination": 3, + "tick": 279 + }, + { + "id": 71, + "origin": 11, + "destination": 2, + "tick": 284 + }, + { + "id": 72, + "origin": 10, + "destination": 11, + "tick": 288 + }, + { + "id": 73, + "origin": 7, + "destination": 2, + "tick": 291 + }, + { + "id": 74, + "origin": 2, + "destination": 9, + "tick": 294 + }, + { + "id": 75, + "origin": 7, + "destination": 9, + "tick": 295 + }, + { + "id": 76, + "origin": 7, + "destination": 8, + "tick": 297 + } + ] +} \ No newline at end of file diff --git a/elevator_saga/utils/__init__.py b/elevator_saga/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/elevator_saga/utils/debug.py b/elevator_saga/utils/debug.py new file mode 100644 index 0000000..46e9d7d --- /dev/null +++ b/elevator_saga/utils/debug.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +""" +Debug utilities for Elevator Saga +调试工具模块 +""" + +# Global debug flag +_debug_enabled: bool = True + + +def set_debug_mode(enabled: bool) -> None: + """启用或禁用调试模式""" + global _debug_enabled + _debug_enabled = enabled + + +def debug_log(message: str) -> None: + """输出调试信息(如果启用了调试模式)""" + if _debug_enabled: + print(f"[DEBUG] {message}", flush=True) + + +def is_debug_enabled() -> bool: + """检查是否启用了调试模式""" + return _debug_enabled diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..958dea9 --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,46 @@ +{ + // —— 入口/排除 ——(按你项目结构来) + "include": ["."], + "exclude": [ + "build", + "dist", + "__pycache__", + ".mypy_cache", + ".pytest_cache", + "htmlcov", + ".idea", + ".vscode", + "docs/_build", + "dypymcp.egg-info", + ".venv_mcp", + "logs", + "environments", + "models" + ], + + // —— 语言/环境设置 —— + "pythonVersion": "3.10", + "typeCheckingMode": "strict", + + // 你的源码在仓库内的额外搜索路径(等价于以前 Pylance 的 include) + "executionEnvironments": [ + { + "root": ".", + "extraPaths": ["dypymcp"] + } + ], + + // —— 与你原 mypy/flake8 诉求相匹配的诊断级别 —— + // 忽略三方包的缺失导入(等价于 mypy 的 --ignore-missing-imports) + "reportMissingImports": "none", + // 可按需放宽/收紧: + "reportUnusedImport": "warning", + "reportUnusedVariable": "warning", + "reportUnknownArgumentType": "warning", + "reportUnknownMemberType": "warning", + "reportUnknownVariableType": "warning", + "reportUnknownParameterType": "warning", + "reportPrivateUsage": "warning", + "reportMissingTypeStubs": false + } + \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..cbd979b --- /dev/null +++ b/setup.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +""" +Setup script for Elevator Saga Python Package +""" +from setuptools import setup, find_packages + +with open("README_CN.md", "r", encoding="utf-8") as fh: + long_description = fh.read() + +setup( + name="elevator-saga", + version="1.0.0", + author="Elevator Saga Team", + description="Python implementation of Elevator Saga game with PyEE event system", + long_description=long_description, + long_description_content_type="text/markdown", + packages=find_packages(), + classifiers=[ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Intended Audience :: Education", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Games/Entertainment :: Simulation", + "Topic :: Software Development :: Libraries :: Python Modules", + ], + python_requires=">=3.8", + install_requires=[ + "pyee>=11.0.0", + "numpy>=1.20.0", + "matplotlib>=3.5.0", + "seaborn>=0.11.0", + "pandas>=1.3.0", + ], + extras_require={ + "dev": [ + "pytest>=6.0", + "pytest-cov", + "black", + "flake8", + ], + }, + entry_points={ + "console_scripts": [ + "elevator-saga=elevator_saga.cli.main:main", + "elevator-server=elevator_saga.cli.main:server_main", + "elevator-client=elevator_saga.cli.main:client_main", + "elevator-grader=elevator_saga.grader.grader:main", + "elevator-batch-test=elevator_saga.grader.batch_runner:main", + ], + }, + include_package_data=True, + zip_safe=False, +)