init commit

This commit is contained in:
2026-04-12 07:19:48 -03:00
commit 9dbf89da02
111 changed files with 14925 additions and 0 deletions

View File

@@ -0,0 +1,85 @@
"""Scenario manager — loads and switches between operational scenarios."""
from importlib import import_module
from typing import Any
SCENARIO_MODULES = {
"normal_ops": "mcp_servers.data.scenarios.normal_ops",
"weather_disruption_ord": "mcp_servers.data.scenarios.weather_disruption_ord",
"maintenance_delay_sfo": "mcp_servers.data.scenarios.maintenance_delay_sfo",
"crew_swap_ewr": "mcp_servers.data.scenarios.crew_swap_ewr",
}
class ScenarioManager:
"""Manages the active scenario. Singleton — all MCP servers share one instance."""
def __init__(self) -> None:
self._active_id: str = "weather_disruption_ord"
self._cache: dict[str, Any] = {}
@property
def active_id(self) -> str:
return self._active_id
def set_active(self, scenario_id: str) -> dict:
if scenario_id not in SCENARIO_MODULES:
raise ValueError(
f"Unknown scenario: {scenario_id}. "
f"Available: {', '.join(SCENARIO_MODULES.keys())}"
)
self._active_id = scenario_id
return self.get_metadata()
def get_metadata(self, scenario_id: str | None = None) -> dict:
mod = self._load(scenario_id or self._active_id)
return {
"scenario_id": mod.SCENARIO_ID,
"name": mod.SCENARIO_NAME,
"description": mod.SCENARIO_DESCRIPTION,
"hubs": mod.SCENARIO_HUBS,
"flight_count": len(mod.FLIGHTS),
"disrupted_flights": sum(
1 for f in mod.FLIGHTS if f.status != "ON_TIME"
),
}
def list_scenarios(self) -> list[dict]:
return [self.get_metadata(sid) for sid in SCENARIO_MODULES]
def _load(self, scenario_id: str) -> Any:
if scenario_id not in self._cache:
self._cache[scenario_id] = import_module(SCENARIO_MODULES[scenario_id])
return self._cache[scenario_id]
@property
def _module(self) -> Any:
return self._load(self._active_id)
@property
def flights(self):
return self._module.FLIGHTS
@property
def crew(self):
return self._module.CREW
@property
def crew_notes(self) -> dict[str, list[str]]:
return self._module.CREW_NOTES
@property
def maintenance(self):
return self._module.MAINTENANCE
@property
def rebookings(self):
return self._module.REBOOKINGS
@property
def passengers(self):
return self._module.PASSENGERS
# Singleton
scenario_manager = ScenarioManager()