diff --git a/mcp_servers/ops/prompts.py b/mcp_servers/ops/prompts.py new file mode 100644 index 0000000..19abb04 --- /dev/null +++ b/mcp_servers/ops/prompts.py @@ -0,0 +1,25 @@ +"""Prompts for the ops MCP server.""" + +from mcp_servers.ops.server import mcp + + +@mcp.prompt() +def handover_brief(hub: str = "ALL", shift_time: str = "21:00 CT") -> str: + """Template for generating a shift handover brief. + + Defines the structured format: IMMEDIATE / MONITOR / FYI sections, + priority scoring expectations, and detail level per item. + """ + return ( + f"Generate a shift handover brief for {hub} at {shift_time}. " + "Structure the output as follows:\n\n" + "HEADER: Hub, time, outgoing/incoming manager names.\n\n" + "IMMEDIATE ACTION: Items requiring action within the next 2 hours. " + "Each item: flight/issue ID, what's happening, time until critical, " + "specific action needed, and any pre-staged resources.\n\n" + "MONITOR: Items that could escalate. Each item: what to watch, " + "trigger conditions for escalation, current trajectory.\n\n" + "FYI: Resolved items or low-risk situations the incoming shift should know about.\n\n" + "Be concise. Ops managers scan, they don't read paragraphs. " + "Use the data provided — do not invent details." + ) diff --git a/mcp_servers/ops/resources.py b/mcp_servers/ops/resources.py new file mode 100644 index 0000000..f569292 --- /dev/null +++ b/mcp_servers/ops/resources.py @@ -0,0 +1,39 @@ +"""Resources for the ops MCP server.""" + +import json + +from mcp_servers.data.scenarios.manager import scenario_manager +from mcp_servers.ops import server +from mcp_servers.ops.server import mcp + + +@mcp.resource("ops://crew/roster") +def crew_roster() -> str: + """Full crew roster for the current scenario. + + Returns per crew: crew_id, name, role, base_hub, duty status summary. + """ + roster = [] + for c in scenario_manager.crew: + hours_remaining = c.duty_hours_limit - c.duty_hours_elapsed + roster.append({ + "crew_id": c.crew_id, + "name": c.name, + "role": c.role.value, + "base_hub": c.base_hub, + "duty_status": "AT_RISK" if hours_remaining <= 2.0 else "OK", + "hours_until_limit": round(hours_remaining, 2), + "next_flight": c.next_scheduled_flight, + }) + return json.dumps(roster) + + +@mcp.resource("ops://handover/latest") +def latest_handover() -> str: + """The most recently generated shift handover brief. + + Returns null if no brief has been generated yet. + """ + if server._last_handover is None: + return json.dumps(None) + return json.dumps(server._last_handover) diff --git a/mcp_servers/ops/server.py b/mcp_servers/ops/server.py index db65f83..6a0fb84 100644 --- a/mcp_servers/ops/server.py +++ b/mcp_servers/ops/server.py @@ -4,13 +4,10 @@ Covers: crew duty status, crew notes, pending rebookings, ops narrative, crew roster, last handover brief, and handover brief prompt template. """ -import json from datetime import datetime, timezone from fastmcp import FastMCP -from mcp_servers.data.scenarios.manager import scenario_manager - mcp = FastMCP( "stellar-ops-internal", instructions=( @@ -19,7 +16,6 @@ mcp = FastMCP( ), ) -# In-memory store for the last generated handover brief _last_handover: dict | None = None @@ -32,191 +28,4 @@ def store_handover_brief(brief: dict) -> None: } -# ── Tools ────────────────────────────────────────────────────────── - - -@mcp.tool() -def get_crew_notes(flight_id: str) -> list[str]: - """Free-text notes logged by crew or ops team for a flight. - - Includes: maintenance write-ups, catering delays, gate conflicts, - passenger incidents, operational remarks. - """ - return scenario_manager.crew_notes.get(flight_id, []) - - -@mcp.tool() -def get_crew_duty_status(crew_ids: list[str]) -> list[dict]: - """Duty hours and rest state for each crew member. - - Returns per crew: crew_id, name, role, duty_hours_elapsed, - duty_hours_limit, hours_until_limit, rest_hours_since_last, - at_risk (true if within 2h of FAA Part 117 limit). - """ - results = [] - crew_map = {c.crew_id: c for c in scenario_manager.crew} - - for crew_id in crew_ids: - crew = crew_map.get(crew_id) - if not crew: - results.append({"crew_id": crew_id, "error": "not_found"}) - continue - - hours_until_limit = crew.duty_hours_limit - crew.duty_hours_elapsed - results.append({ - "crew_id": crew.crew_id, - "name": crew.name, - "role": crew.role.value, - "duty_hours_elapsed": crew.duty_hours_elapsed, - "duty_hours_limit": crew.duty_hours_limit, - "hours_until_limit": round(hours_until_limit, 2), - "rest_hours_since_last": crew.rest_hours_since_last, - "at_risk": hours_until_limit <= 2.0, - "base_hub": crew.base_hub, - "next_scheduled_flight": crew.next_scheduled_flight, - }) - - return results - - -@mcp.tool() -def get_pending_rebookings(hub: str, limit: int = 20) -> list[dict]: - """Passengers needing rebooking at a hub, sorted by urgency. - - Returns: pax_id, name, mileage_plus_status, original_flight, - destination, next_available_option, urgency. - """ - # Filter rebookings by flights originating from this hub - hub_flights = {f.flight_id for f in scenario_manager.flights if f.origin == hub.upper()} - rebookings = [ - r for r in scenario_manager.rebookings - if r.original_flight in hub_flights - ] - - urgency_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2} - rebookings.sort(key=lambda r: urgency_order.get(r.urgency, 3)) - - return [r.model_dump(mode="json") for r in rebookings[:limit]] - - -@mcp.tool() -async def generate_narrative(context: dict) -> str: - """Synthesizes aggregated operational context into a structured - handover brief for ops managers. - - Uses Claude via Anthropic SDK (or Bedrock when USE_BEDROCK=true). - Output: prioritized, concise, structured by IMMEDIATE / MONITOR / FYI. - Falls back to template if no API key is configured. - """ - try: - from mcp_servers.shared_llm import generate, _get_provider - - hub = context.get("hub", "ALL") - shift_time = context.get("shift_time", datetime.now(timezone.utc).strftime("%H:%M UTC")) - - system_prompt = ( - f"You are an airline operations shift handover briefing system. " - f"Generate a concise handover brief for {hub} at {shift_time}. " - f"Structure as: HEADER, then IMMEDIATE ACTION (items needing action within 2h), " - f"MONITOR (items that could escalate), FYI (resolved or low-risk). " - f"Be concise — ops managers scan, they don't read paragraphs. " - f"Use the data provided. Do not invent details." - ) - text = await generate(system_prompt, json.dumps(context, indent=2)) - return json.dumps({"text": text, "provider": _get_provider()}) - except Exception: - return json.dumps({"text": _template_narrative(context), "provider": "template"}) - - -def _template_narrative(context: dict) -> str: - """Structured template fallback when LLM is unavailable.""" - sections = [] - immediate = context.get("immediate", []) - monitor = context.get("monitor", []) - fyi = context.get("fyi", []) - - hub = context.get("hub", "ALL") - shift_time = context.get("shift_time", datetime.now(timezone.utc).strftime("%H:%M UTC")) - - sections.append(f"SHIFT HANDOVER BRIEF — {hub} / {shift_time}") - sections.append(f"Generated: {datetime.now(timezone.utc).strftime('%H:%M UTC')}") - sections.append("") - - if immediate: - sections.append("━━━ IMMEDIATE ACTION ━━━") - for item in immediate: - sections.append(f"▶ {item}") - sections.append("") - - if monitor: - sections.append("━━━ MONITOR ━━━") - for item in monitor: - sections.append(f"⚠ {item}") - sections.append("") - - if fyi: - sections.append("━━━ FYI ━━━") - for item in fyi: - sections.append(f"ℹ {item}") - - return "\n".join(sections) - - -# ── Resources ────────────────────────────────────────────────────── - - -@mcp.resource("ops://crew/roster") -def crew_roster() -> str: - """Full crew roster for the current scenario. - - Returns per crew: crew_id, name, role, base_hub, duty status summary. - """ - roster = [] - for c in scenario_manager.crew: - hours_remaining = c.duty_hours_limit - c.duty_hours_elapsed - roster.append({ - "crew_id": c.crew_id, - "name": c.name, - "role": c.role.value, - "base_hub": c.base_hub, - "duty_status": "AT_RISK" if hours_remaining <= 2.0 else "OK", - "hours_until_limit": round(hours_remaining, 2), - "next_flight": c.next_scheduled_flight, - }) - return json.dumps(roster) - - -@mcp.resource("ops://handover/latest") -def latest_handover() -> str: - """The most recently generated shift handover brief. - - Returns null if no brief has been generated yet. - """ - if _last_handover is None: - return json.dumps(None) - return json.dumps(_last_handover) - - -# ── Prompts ──────────────────────────────────────────────────────── - - -@mcp.prompt() -def handover_brief(hub: str = "ALL", shift_time: str = "21:00 CT") -> str: - """Template for generating a shift handover brief. - - Defines the structured format: IMMEDIATE / MONITOR / FYI sections, - priority scoring expectations, and detail level per item. - """ - return ( - f"Generate a shift handover brief for {hub} at {shift_time}. " - "Structure the output as follows:\n\n" - "HEADER: Hub, time, outgoing/incoming manager names.\n\n" - "IMMEDIATE ACTION: Items requiring action within the next 2 hours. " - "Each item: flight/issue ID, what's happening, time until critical, " - "specific action needed, and any pre-staged resources.\n\n" - "MONITOR: Items that could escalate. Each item: what to watch, " - "trigger conditions for escalation, current trajectory.\n\n" - "FYI: Resolved items or low-risk situations the incoming shift should know about.\n\n" - "Be concise. Ops managers scan, they don't read paragraphs. " - "Use the data provided — do not invent details." - ) +from mcp_servers.ops import tools, resources, prompts # noqa: E402, F401 diff --git a/mcp_servers/ops/tools.py b/mcp_servers/ops/tools.py new file mode 100644 index 0000000..38fd5ce --- /dev/null +++ b/mcp_servers/ops/tools.py @@ -0,0 +1,133 @@ +"""Tools for the ops MCP server.""" + +import json +from datetime import datetime, timezone + +from mcp_servers.data.scenarios.manager import scenario_manager +from mcp_servers.ops.server import mcp + + +@mcp.tool() +def get_crew_notes(flight_id: str) -> list[str]: + """Free-text notes logged by crew or ops team for a flight. + + Includes: maintenance write-ups, catering delays, gate conflicts, + passenger incidents, operational remarks. + """ + return scenario_manager.crew_notes.get(flight_id, []) + + +@mcp.tool() +def get_crew_duty_status(crew_ids: list[str]) -> list[dict]: + """Duty hours and rest state for each crew member. + + Returns per crew: crew_id, name, role, duty_hours_elapsed, + duty_hours_limit, hours_until_limit, rest_hours_since_last, + at_risk (true if within 2h of FAA Part 117 limit). + """ + results = [] + crew_map = {c.crew_id: c for c in scenario_manager.crew} + + for crew_id in crew_ids: + crew = crew_map.get(crew_id) + if not crew: + results.append({"crew_id": crew_id, "error": "not_found"}) + continue + + hours_until_limit = crew.duty_hours_limit - crew.duty_hours_elapsed + results.append({ + "crew_id": crew.crew_id, + "name": crew.name, + "role": crew.role.value, + "duty_hours_elapsed": crew.duty_hours_elapsed, + "duty_hours_limit": crew.duty_hours_limit, + "hours_until_limit": round(hours_until_limit, 2), + "rest_hours_since_last": crew.rest_hours_since_last, + "at_risk": hours_until_limit <= 2.0, + "base_hub": crew.base_hub, + "next_scheduled_flight": crew.next_scheduled_flight, + }) + + return results + + +@mcp.tool() +def get_pending_rebookings(hub: str, limit: int = 20) -> list[dict]: + """Passengers needing rebooking at a hub, sorted by urgency. + + Returns: pax_id, name, mileage_plus_status, original_flight, + destination, next_available_option, urgency. + """ + hub_flights = {f.flight_id for f in scenario_manager.flights if f.origin == hub.upper()} + rebookings = [ + r for r in scenario_manager.rebookings + if r.original_flight in hub_flights + ] + + urgency_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2} + rebookings.sort(key=lambda r: urgency_order.get(r.urgency, 3)) + + return [r.model_dump(mode="json") for r in rebookings[:limit]] + + +@mcp.tool() +async def generate_narrative(context: dict) -> str: + """Synthesizes aggregated operational context into a structured + handover brief for ops managers. + + Uses Claude via Anthropic SDK (or Bedrock when USE_BEDROCK=true). + Output: prioritized, concise, structured by IMMEDIATE / MONITOR / FYI. + Falls back to template if no API key is configured. + """ + try: + from mcp_servers.shared_llm import generate, _get_provider + + hub = context.get("hub", "ALL") + shift_time = context.get("shift_time", datetime.now(timezone.utc).strftime("%H:%M UTC")) + + system_prompt = ( + f"You are an airline operations shift handover briefing system. " + f"Generate a concise handover brief for {hub} at {shift_time}. " + f"Structure as: HEADER, then IMMEDIATE ACTION (items needing action within 2h), " + f"MONITOR (items that could escalate), FYI (resolved or low-risk). " + f"Be concise — ops managers scan, they don't read paragraphs. " + f"Use the data provided. Do not invent details." + ) + text = await generate(system_prompt, json.dumps(context, indent=2)) + return json.dumps({"text": text, "provider": _get_provider()}) + except Exception: + return json.dumps({"text": _template_narrative(context), "provider": "template"}) + + +def _template_narrative(context: dict) -> str: + """Structured template fallback when LLM is unavailable.""" + sections = [] + immediate = context.get("immediate", []) + monitor = context.get("monitor", []) + fyi = context.get("fyi", []) + + hub = context.get("hub", "ALL") + shift_time = context.get("shift_time", datetime.now(timezone.utc).strftime("%H:%M UTC")) + + sections.append(f"SHIFT HANDOVER BRIEF — {hub} / {shift_time}") + sections.append(f"Generated: {datetime.now(timezone.utc).strftime('%H:%M UTC')}") + sections.append("") + + if immediate: + sections.append("━━━ IMMEDIATE ACTION ━━━") + for item in immediate: + sections.append(f"▶ {item}") + sections.append("") + + if monitor: + sections.append("━━━ MONITOR ━━━") + for item in monitor: + sections.append(f"⚠ {item}") + sections.append("") + + if fyi: + sections.append("━━━ FYI ━━━") + for item in fyi: + sections.append(f"ℹ {item}") + + return "\n".join(sections) diff --git a/mcp_servers/passenger/prompts.py b/mcp_servers/passenger/prompts.py new file mode 100644 index 0000000..e92b929 --- /dev/null +++ b/mcp_servers/passenger/prompts.py @@ -0,0 +1,36 @@ +"""Prompts for the passenger MCP server.""" + +from mcp_servers.passenger.server import mcp + +TONE_INSTRUCTIONS = { + "empathetic": ( + "Write a passenger notification about this flight disruption. " + "Be empathetic and human. Explain WHY the delay/cancellation happened " + "using the weather, maintenance, or operational data provided. " + "Tell the passenger what's happening next: new boarding time, gate, " + "rebooking options. End with reassurance. No jargon." + ), + "factual": ( + "Write a brief, factual notification. Include: flight number, " + "route, status, delay duration, cause (one phrase), new departure time, " + "gate. No editorial. No reassurance. Just facts." + ), + "brief_sms": ( + "Write an SMS-length notification (under 160 characters). " + "Format: UA{flight} {route}: {status}. {cause}. New dep: {time}. Gate {gate}." + ), +} + + +@mcp.prompt() +def passenger_notification(tone: str = "empathetic") -> str: + """Template for generating passenger delay/cancellation notifications. + + tone: empathetic (default) | factual | brief_sms + """ + instruction = TONE_INSTRUCTIONS.get(tone, TONE_INSTRUCTIONS["empathetic"]) + return ( + f"{instruction}\n\n" + "Use the operational data provided below. Do not invent details. " + "If data is missing for a section, omit that section." + ) diff --git a/mcp_servers/passenger/resources.py b/mcp_servers/passenger/resources.py new file mode 100644 index 0000000..c3615f8 --- /dev/null +++ b/mcp_servers/passenger/resources.py @@ -0,0 +1,49 @@ +"""Resources for the passenger MCP server.""" + +import json + +from mcp_servers.data.scenarios.manager import scenario_manager +from mcp_servers.passenger.server import mcp + + +@mcp.resource("ops://flights/{flight_id}/manifest") +def flight_manifest(flight_id: str) -> str: + """Passenger manifest summary for a flight. + + Returns: flight_id, passenger count, count by MP status tier, + special needs count, connection count. + Summary-level — no PII exposed through this resource. + """ + flight = None + for f in scenario_manager.flights: + if f.flight_id == flight_id: + flight = f + break + + if not flight: + return json.dumps({"error": f"Flight {flight_id} not found"}) + + pax_on_flight = [ + p for p in scenario_manager.passengers + if p.flight_id == flight_id + ] + + status_counts = {} + special_needs_count = 0 + connection_count = 0 + + for p in pax_on_flight: + status = p.mileage_plus_status.value + status_counts[status] = status_counts.get(status, 0) + 1 + if p.special_needs: + special_needs_count += 1 + if p.connection_flight: + connection_count += 1 + + return json.dumps({ + "flight_id": flight_id, + "total_passengers": flight.passenger_count, + "by_status": status_counts, + "special_needs_count": special_needs_count, + "connection_count": connection_count, + }) diff --git a/mcp_servers/passenger/server.py b/mcp_servers/passenger/server.py index 8627348..705a336 100644 --- a/mcp_servers/passenger/server.py +++ b/mcp_servers/passenger/server.py @@ -4,13 +4,8 @@ Covers: passenger notification generation, flight manifest, and notification prompt template (multi-tone). """ -import json - from fastmcp import FastMCP -from mcp_servers.data.models import MPStatus -from mcp_servers.data.scenarios.manager import scenario_manager - mcp = FastMCP( "stellar-ops-passenger", instructions=( @@ -19,151 +14,4 @@ mcp = FastMCP( ), ) - -# ── Tools ────────────────────────────────────────────────────────── - - -@mcp.tool() -async def generate_notification(context: dict) -> str: - """Synthesizes flight disruption context into an empathetic, - actionable passenger notification. - - Uses Claude via Anthropic SDK (or Bedrock when USE_BEDROCK=true). - Output: clear, human, no jargon, includes gate/time/status. - Falls back to template if no API key is configured. - """ - try: - from mcp_servers.shared_llm import generate, _get_provider - - system_prompt = ( - "You are a passenger notification system for Stellar Air. " - "Write a clear, empathetic notification about this flight disruption. " - "Explain WHY the delay or cancellation happened using the operational data provided. " - "Tell the passenger what's happening next: new boarding time, gate, status. " - "Be human and reassuring. No aviation jargon. No speculation. " - "If data is missing for a section, omit it — don't make things up." - ) - text = await generate(system_prompt, json.dumps(context, indent=2)) - return json.dumps({"text": text, "provider": _get_provider()}) - except Exception: - return json.dumps({"text": _template_notification(context), "provider": "template"}) - - -def _template_notification(context: dict) -> str: - """Structured template fallback when LLM is unavailable.""" - flight_id = context.get("flight_id", "") - origin = context.get("origin", "") - destination = context.get("destination", "") - status = context.get("status", "DELAYED") - delay_minutes = context.get("delay_minutes", 0) - delay_cause = context.get("delay_cause", "") - gate = context.get("gate", "") - weather_summary = context.get("weather_summary", "") - crew_notes_summary = context.get("crew_notes_summary", "") - - lines = [ - f"{flight_id} — {origin} → {destination}", - f"Status: {status}" + (f" {delay_minutes} minutes" if delay_minutes else ""), - "", - ] - - if delay_cause: - cause_text = { - "WEATHER": "weather conditions along your route", - "MAINTENANCE": "a routine maintenance check on your aircraft", - "CREW": "ensuring your crew is fully rested for safe operation", - "ATC": "air traffic control restrictions", - "LATE_AIRCRAFT": "the late arrival of your inbound aircraft", - }.get(delay_cause, f"{delay_cause.lower()}") - lines.append(f"Your flight is delayed due to {cause_text}.") - if weather_summary: - lines.append(f"Current conditions: {weather_summary}.") - - if gate: - lines.append(f"\nGate {gate} — no gate change expected.") - - return "\n".join(lines) - - -# ── Resources ────────────────────────────────────────────────────── - - -@mcp.resource("ops://flights/{flight_id}/manifest") -def flight_manifest(flight_id: str) -> str: - """Passenger manifest summary for a flight. - - Returns: flight_id, passenger count, count by MP status tier, - special needs count, connection count. - Summary-level — no PII exposed through this resource. - """ - flight = None - for f in scenario_manager.flights: - if f.flight_id == flight_id: - flight = f - break - - if not flight: - return json.dumps({"error": f"Flight {flight_id} not found"}) - - # Count passengers by status from scenario data - pax_on_flight = [ - p for p in scenario_manager.passengers - if p.flight_id == flight_id - ] - - status_counts = {} - special_needs_count = 0 - connection_count = 0 - - for p in pax_on_flight: - status = p.mileage_plus_status.value - status_counts[status] = status_counts.get(status, 0) + 1 - if p.special_needs: - special_needs_count += 1 - if p.connection_flight: - connection_count += 1 - - return json.dumps({ - "flight_id": flight_id, - "total_passengers": flight.passenger_count, - "by_status": status_counts, - "special_needs_count": special_needs_count, - "connection_count": connection_count, - }) - - -# ── Prompts ──────────────────────────────────────────────────────── - - -TONE_INSTRUCTIONS = { - "empathetic": ( - "Write a passenger notification about this flight disruption. " - "Be empathetic and human. Explain WHY the delay/cancellation happened " - "using the weather, maintenance, or operational data provided. " - "Tell the passenger what's happening next: new boarding time, gate, " - "rebooking options. End with reassurance. No jargon." - ), - "factual": ( - "Write a brief, factual notification. Include: flight number, " - "route, status, delay duration, cause (one phrase), new departure time, " - "gate. No editorial. No reassurance. Just facts." - ), - "brief_sms": ( - "Write an SMS-length notification (under 160 characters). " - "Format: UA{flight} {route}: {status}. {cause}. New dep: {time}. Gate {gate}." - ), -} - - -@mcp.prompt() -def passenger_notification(tone: str = "empathetic") -> str: - """Template for generating passenger delay/cancellation notifications. - - tone: empathetic (default) | factual | brief_sms - """ - instruction = TONE_INSTRUCTIONS.get(tone, TONE_INSTRUCTIONS["empathetic"]) - return ( - f"{instruction}\n\n" - "Use the operational data provided below. Do not invent details. " - "If data is missing for a section, omit that section." - ) +from mcp_servers.passenger import tools, resources, prompts # noqa: E402, F401 diff --git a/mcp_servers/passenger/tools.py b/mcp_servers/passenger/tools.py new file mode 100644 index 0000000..b9e809f --- /dev/null +++ b/mcp_servers/passenger/tools.py @@ -0,0 +1,66 @@ +"""Tools for the passenger MCP server.""" + +import json + +from mcp_servers.passenger.server import mcp + + +@mcp.tool() +async def generate_notification(context: dict) -> str: + """Synthesizes flight disruption context into an empathetic, + actionable passenger notification. + + Uses Claude via Anthropic SDK (or Bedrock when USE_BEDROCK=true). + Output: clear, human, no jargon, includes gate/time/status. + Falls back to template if no API key is configured. + """ + try: + from mcp_servers.shared_llm import generate, _get_provider + + system_prompt = ( + "You are a passenger notification system for Stellar Air. " + "Write a clear, empathetic notification about this flight disruption. " + "Explain WHY the delay or cancellation happened using the operational data provided. " + "Tell the passenger what's happening next: new boarding time, gate, status. " + "Be human and reassuring. No aviation jargon. No speculation. " + "If data is missing for a section, omit it — don't make things up." + ) + text = await generate(system_prompt, json.dumps(context, indent=2)) + return json.dumps({"text": text, "provider": _get_provider()}) + except Exception: + return json.dumps({"text": _template_notification(context), "provider": "template"}) + + +def _template_notification(context: dict) -> str: + """Structured template fallback when LLM is unavailable.""" + flight_id = context.get("flight_id", "") + origin = context.get("origin", "") + destination = context.get("destination", "") + status = context.get("status", "DELAYED") + delay_minutes = context.get("delay_minutes", 0) + delay_cause = context.get("delay_cause", "") + gate = context.get("gate", "") + weather_summary = context.get("weather_summary", "") + + lines = [ + f"{flight_id} — {origin} → {destination}", + f"Status: {status}" + (f" {delay_minutes} minutes" if delay_minutes else ""), + "", + ] + + if delay_cause: + cause_text = { + "WEATHER": "weather conditions along your route", + "MAINTENANCE": "a routine maintenance check on your aircraft", + "CREW": "ensuring your crew is fully rested for safe operation", + "ATC": "air traffic control restrictions", + "LATE_AIRCRAFT": "the late arrival of your inbound aircraft", + }.get(delay_cause, f"{delay_cause.lower()}") + lines.append(f"Your flight is delayed due to {cause_text}.") + if weather_summary: + lines.append(f"Current conditions: {weather_summary}.") + + if gate: + lines.append(f"\nGate {gate} — no gate change expected.") + + return "\n".join(lines) diff --git a/mcp_servers/shared/prompts.py b/mcp_servers/shared/prompts.py new file mode 100644 index 0000000..57cb193 --- /dev/null +++ b/mcp_servers/shared/prompts.py @@ -0,0 +1,54 @@ +"""Prompts for the shared MCP server.""" + +from mcp_servers.shared.server import mcp + +DELAY_TEMPLATES = { + ("WEATHER", "passenger"): ( + "Explain this flight delay to a passenger. The cause is weather-related. " + "Be empathetic and transparent. Include what's happening with the weather, " + "what the airline is doing about it, and what the passenger should expect next. " + "Do not use aviation jargon." + ), + ("WEATHER", "ops_manager"): ( + "Summarize this weather-caused delay for an ops manager. " + "Include: delay cause code, affected systems, GDP/ground stop status if applicable, " + "forecast window, and recommended next actions. Be concise." + ), + ("MAINTENANCE", "passenger"): ( + "Explain this maintenance-caused delay to a passenger. " + "Be reassuring — emphasize safety. Describe what's being checked " + "without technical jargon. Give expected timeline if available." + ), + ("MAINTENANCE", "ops_manager"): ( + "Summarize this maintenance delay for an ops manager. " + "Include: MEL item, system affected, restriction details, " + "estimated release time, and downstream impact." + ), + ("CREW", "passenger"): ( + "Explain this crew-related delay to a passenger. " + "Frame it as 'ensuring your crew is fully rested for safe operation.' " + "Do not mention specific regulations or duty limits." + ), + ("CREW", "ops_manager"): ( + "Summarize this crew-caused delay for an ops manager. " + "Include: Part 117 status, hours remaining, swap options, " + "backup crew availability, and timeline." + ), +} + + +@mcp.prompt() +def delay_explainer(cause_code: str, audience: str) -> str: + """Turns a raw delay cause code into a human-readable explanation prompt. + + cause_code: WEATHER | MAINTENANCE | CREW | ATC | LATE_AIRCRAFT + audience: passenger | ops_manager + """ + key = (cause_code.upper(), audience.lower()) + default_key = ("WEATHER", audience.lower()) + template = DELAY_TEMPLATES.get(key, DELAY_TEMPLATES.get(default_key, "")) + return ( + f"{template}\n\n" + "Use the operational data provided below. Do not invent details. " + "If data is missing for a section, omit that section." + ) diff --git a/mcp_servers/shared/resources.py b/mcp_servers/shared/resources.py new file mode 100644 index 0000000..3f9754c --- /dev/null +++ b/mcp_servers/shared/resources.py @@ -0,0 +1,25 @@ +"""Resources for the shared MCP server.""" + +import json + +from mcp_servers.data.models import HUBS +from mcp_servers.data.scenarios.manager import scenario_manager +from mcp_servers.shared.server import mcp + + +@mcp.resource("ops://hubs/{hub_code}") +def hub_info(hub_code: str) -> str: + """Static reference data for a hub. + + Returns: full name, codes, coordinates, timezone, terminal/gate/runway count. + """ + hub = HUBS.get(hub_code.upper()) + if not hub: + return f"Unknown hub: {hub_code}. Known: {', '.join(HUBS.keys())}" + return hub.model_dump_json() + + +@mcp.resource("ops://scenarios/active") +def active_scenario() -> str: + """Current active scenario metadata.""" + return json.dumps(scenario_manager.get_metadata()) diff --git a/mcp_servers/shared/server.py b/mcp_servers/shared/server.py index 614b714..eefc9c5 100644 --- a/mcp_servers/shared/server.py +++ b/mcp_servers/shared/server.py @@ -6,10 +6,6 @@ maintenance flags, hub reference data, and delay explanation prompts. from fastmcp import FastMCP -from mcp_servers.data.models import HUBS, FlightStatus -from mcp_servers.data.real import faa, openmeteo -from mcp_servers.data.scenarios.manager import scenario_manager - mcp = FastMCP( "stellar-ops-shared", instructions=( @@ -19,217 +15,4 @@ mcp = FastMCP( ), ) - -# ── Tools ────────────────────────────────────────────────────────── - - -@mcp.tool() -def get_flight_status(flight_id: str) -> dict: - """Lightweight status check for a single flight. - - Returns: flight_id, status (ON_TIME/DELAYED/CANCELLED/DIVERTED), - delay_minutes, gate, last_updated. - Use this for quick triage before pulling full operational data. - """ - for f in scenario_manager.flights: - if f.flight_id == flight_id: - return { - "flight_id": f.flight_id, - "status": f.status.value, - "delay_minutes": f.delay_minutes, - "gate": f.gate, - "origin": f.origin, - "destination": f.destination, - "source": "scenario_mock", - } - return {"error": f"Flight {flight_id} not found in active scenario"} - - -@mcp.tool() -def get_flight_details(flight_id: str) -> dict: - """Full operational context for a flight. - - Returns: scheduled vs actual times, delay cause code, inbound aircraft - tail number, gate assignment, crew IDs, passenger count. - """ - for f in scenario_manager.flights: - if f.flight_id == flight_id: - return f.model_dump(mode="json") - return {"error": f"Flight {flight_id} not found in active scenario"} - - -@mcp.tool() -def get_irregular_ops(hub: str) -> list[dict]: - """All flights in irregular operations state at a hub. - - hub: ORD | EWR | IAH | SFO | DEN - Returns list of: flight_id, irrop_type, affected_pax_count, - delay_cause, delay_minutes. - """ - results = [] - for f in scenario_manager.flights: - if f.origin == hub.upper() and f.status != FlightStatus.ON_TIME: - results.append({ - "flight_id": f.flight_id, - "irrop_type": f.status.value, - "affected_pax_count": f.passenger_count, - "delay_cause": f.delay_cause.value if f.delay_cause else None, - "delay_minutes": f.delay_minutes, - "origin": f.origin, - "destination": f.destination, - "gate": f.gate, - }) - return results - - -@mcp.tool() -async def get_route_weather(origin: str, destination: str) -> dict: - """Live weather at origin, destination, and en-route waypoints. - - Source: OpenMeteo API (real-time, no API key). - Returns per waypoint: conditions, temperature, wind, visibility, - precipitation, significant events. - """ - return await openmeteo.get_weather_along_route(origin, destination) - - -@mcp.tool() -async def get_hub_forecasts() -> dict: - """4-hour weather forecast for all United hubs (ORD, EWR, IAH, SFO, DEN). - - Source: OpenMeteo API. - Returns per hub: hourly forecast with conditions, wind, visibility, - and a risk_flag if convective activity or low visibility expected. - """ - return await openmeteo.get_weather_forecast_hubs() - - -@mcp.tool() -async def get_airport_status(airport_code: str) -> dict: - """Live FAA airport status. - - Source: FAA NASSTATUS API (real-time, no API key). - Returns: delay_type, delay_reason, average_delay_minutes, - ground_delay_program active/inactive, ground_stop active/inactive. - Falls back to 'status_unavailable' if FAA API is unreachable. - """ - return await faa.get_airport_status(airport_code) - - -@mcp.tool() -async def get_airport_congestion(airport_code: str) -> dict: - """Gate availability and taxi times at an airport. - - Combines: FAA live status (real) + gate/taxi mock data from scenario. - Returns: faa_status, gates_available, gates_total, - avg_taxi_out_minutes, avg_taxi_in_minutes. - """ - faa_status = await faa.get_airport_status(airport_code) - hub = HUBS.get(airport_code.upper()) - total_gates = hub.gates if hub else 100 - - # Mock congestion based on disrupted flights in scenario - disrupted = sum( - 1 for f in scenario_manager.flights - if f.origin == airport_code.upper() and f.status != FlightStatus.ON_TIME - ) - gates_occupied = min(total_gates, int(total_gates * 0.7) + disrupted * 3) - - return { - "airport": airport_code.upper(), - "faa_status": faa_status, - "gates_available": total_gates - gates_occupied, - "gates_total": total_gates, - "avg_taxi_out_minutes": 12 + disrupted * 4, - "avg_taxi_in_minutes": 8 + disrupted * 2, - "source": "faa_live+scenario_mock", - } - - -@mcp.tool() -def get_maintenance_flags(aircraft_tail: str) -> list[dict]: - """Active MEL (Minimum Equipment List) items for an aircraft. - - Returns per item: mel_id, system, description, restriction - (route/ops limitations), expiry date. - """ - items = scenario_manager.maintenance.get(aircraft_tail, []) - return [item.model_dump(mode="json") for item in items] - - -# ── Resources ────────────────────────────────────────────────────── - - -@mcp.resource("ops://hubs/{hub_code}") -def hub_info(hub_code: str) -> str: - """Static reference data for a hub. - - Returns: full name, codes, coordinates, timezone, terminal/gate/runway count. - """ - hub = HUBS.get(hub_code.upper()) - if not hub: - return f"Unknown hub: {hub_code}. Known: {', '.join(HUBS.keys())}" - return hub.model_dump_json() - - -@mcp.resource("ops://scenarios/active") -def active_scenario() -> str: - """Current active scenario metadata.""" - import json - - return json.dumps(scenario_manager.get_metadata()) - - -# ── Prompts ──────────────────────────────────────────────────────── - - -DELAY_TEMPLATES = { - ("WEATHER", "passenger"): ( - "Explain this flight delay to a passenger. The cause is weather-related. " - "Be empathetic and transparent. Include what's happening with the weather, " - "what the airline is doing about it, and what the passenger should expect next. " - "Do not use aviation jargon." - ), - ("WEATHER", "ops_manager"): ( - "Summarize this weather-caused delay for an ops manager. " - "Include: delay cause code, affected systems, GDP/ground stop status if applicable, " - "forecast window, and recommended next actions. Be concise." - ), - ("MAINTENANCE", "passenger"): ( - "Explain this maintenance-caused delay to a passenger. " - "Be reassuring — emphasize safety. Describe what's being checked " - "without technical jargon. Give expected timeline if available." - ), - ("MAINTENANCE", "ops_manager"): ( - "Summarize this maintenance delay for an ops manager. " - "Include: MEL item, system affected, restriction details, " - "estimated release time, and downstream impact." - ), - ("CREW", "passenger"): ( - "Explain this crew-related delay to a passenger. " - "Frame it as 'ensuring your crew is fully rested for safe operation.' " - "Do not mention specific regulations or duty limits." - ), - ("CREW", "ops_manager"): ( - "Summarize this crew-caused delay for an ops manager. " - "Include: Part 117 status, hours remaining, swap options, " - "backup crew availability, and timeline." - ), -} - - -@mcp.prompt() -def delay_explainer(cause_code: str, audience: str) -> str: - """Turns a raw delay cause code into a human-readable explanation prompt. - - cause_code: WEATHER | MAINTENANCE | CREW | ATC | LATE_AIRCRAFT - audience: passenger | ops_manager - """ - key = (cause_code.upper(), audience.lower()) - default_key = ("WEATHER", audience.lower()) - template = DELAY_TEMPLATES.get(key, DELAY_TEMPLATES.get(default_key, "")) - return ( - f"{template}\n\n" - "Use the operational data provided below. Do not invent details. " - "If data is missing for a section, omit that section." - ) +from mcp_servers.shared import tools, resources, prompts # noqa: E402, F401 diff --git a/mcp_servers/shared/tools.py b/mcp_servers/shared/tools.py new file mode 100644 index 0000000..896e75f --- /dev/null +++ b/mcp_servers/shared/tools.py @@ -0,0 +1,139 @@ +"""Tools for the shared MCP server.""" + +from mcp_servers.data.models import HUBS, FlightStatus +from mcp_servers.data.real import faa, openmeteo +from mcp_servers.data.scenarios.manager import scenario_manager +from mcp_servers.shared.server import mcp + + +@mcp.tool() +def get_flight_status(flight_id: str) -> dict: + """Lightweight status check for a single flight. + + Returns: flight_id, status (ON_TIME/DELAYED/CANCELLED/DIVERTED), + delay_minutes, gate, last_updated. + Use this for quick triage before pulling full operational data. + """ + for f in scenario_manager.flights: + if f.flight_id == flight_id: + return { + "flight_id": f.flight_id, + "status": f.status.value, + "delay_minutes": f.delay_minutes, + "gate": f.gate, + "origin": f.origin, + "destination": f.destination, + "source": "scenario_mock", + } + return {"error": f"Flight {flight_id} not found in active scenario"} + + +@mcp.tool() +def get_flight_details(flight_id: str) -> dict: + """Full operational context for a flight. + + Returns: scheduled vs actual times, delay cause code, inbound aircraft + tail number, gate assignment, crew IDs, passenger count. + """ + for f in scenario_manager.flights: + if f.flight_id == flight_id: + return f.model_dump(mode="json") + return {"error": f"Flight {flight_id} not found in active scenario"} + + +@mcp.tool() +def get_irregular_ops(hub: str) -> list[dict]: + """All flights in irregular operations state at a hub. + + hub: ORD | EWR | IAH | SFO | DEN + Returns list of: flight_id, irrop_type, affected_pax_count, + delay_cause, delay_minutes. + """ + results = [] + for f in scenario_manager.flights: + if f.origin == hub.upper() and f.status != FlightStatus.ON_TIME: + results.append({ + "flight_id": f.flight_id, + "irrop_type": f.status.value, + "affected_pax_count": f.passenger_count, + "delay_cause": f.delay_cause.value if f.delay_cause else None, + "delay_minutes": f.delay_minutes, + "origin": f.origin, + "destination": f.destination, + "gate": f.gate, + }) + return results + + +@mcp.tool() +async def get_route_weather(origin: str, destination: str) -> dict: + """Live weather at origin, destination, and en-route waypoints. + + Source: OpenMeteo API (real-time, no API key). + Returns per waypoint: conditions, temperature, wind, visibility, + precipitation, significant events. + """ + return await openmeteo.get_weather_along_route(origin, destination) + + +@mcp.tool() +async def get_hub_forecasts() -> dict: + """4-hour weather forecast for all United hubs (ORD, EWR, IAH, SFO, DEN). + + Source: OpenMeteo API. + Returns per hub: hourly forecast with conditions, wind, visibility, + and a risk_flag if convective activity or low visibility expected. + """ + return await openmeteo.get_weather_forecast_hubs() + + +@mcp.tool() +async def get_airport_status(airport_code: str) -> dict: + """Live FAA airport status. + + Source: FAA NASSTATUS API (real-time, no API key). + Returns: delay_type, delay_reason, average_delay_minutes, + ground_delay_program active/inactive, ground_stop active/inactive. + Falls back to 'status_unavailable' if FAA API is unreachable. + """ + return await faa.get_airport_status(airport_code) + + +@mcp.tool() +async def get_airport_congestion(airport_code: str) -> dict: + """Gate availability and taxi times at an airport. + + Combines: FAA live status (real) + gate/taxi mock data from scenario. + Returns: faa_status, gates_available, gates_total, + avg_taxi_out_minutes, avg_taxi_in_minutes. + """ + faa_status = await faa.get_airport_status(airport_code) + hub = HUBS.get(airport_code.upper()) + total_gates = hub.gates if hub else 100 + + disrupted = sum( + 1 for f in scenario_manager.flights + if f.origin == airport_code.upper() and f.status != FlightStatus.ON_TIME + ) + gates_occupied = min(total_gates, int(total_gates * 0.7) + disrupted * 3) + + return { + "airport": airport_code.upper(), + "faa_status": faa_status, + "gates_available": total_gates - gates_occupied, + "gates_total": total_gates, + "avg_taxi_out_minutes": 12 + disrupted * 4, + "avg_taxi_in_minutes": 8 + disrupted * 2, + "source": "faa_live+scenario_mock", + } + + +@mcp.tool() +def get_maintenance_flags(aircraft_tail: str) -> list[dict]: + """Active MEL (Minimum Equipment List) items for an aircraft. + + Returns per item: mel_id, system, description, restriction + (route/ops limitations), expiry date. + """ + items = scenario_manager.maintenance.get(aircraft_tail, []) + return [item.model_dump(mode="json") for item in items]