split each MCP server into tools/resources/prompts modules

This commit is contained in:
2026-04-16 02:30:03 -03:00
parent 8645adb3d6
commit 6856d09986
12 changed files with 569 additions and 563 deletions

View File

@@ -0,0 +1,25 @@
"""Prompts for the ops MCP server."""
from mcp_servers.ops.server import mcp
@mcp.prompt()
def handover_brief(hub: str = "ALL", shift_time: str = "21:00 CT") -> str:
"""Template for generating a shift handover brief.
Defines the structured format: IMMEDIATE / MONITOR / FYI sections,
priority scoring expectations, and detail level per item.
"""
return (
f"Generate a shift handover brief for {hub} at {shift_time}. "
"Structure the output as follows:\n\n"
"HEADER: Hub, time, outgoing/incoming manager names.\n\n"
"IMMEDIATE ACTION: Items requiring action within the next 2 hours. "
"Each item: flight/issue ID, what's happening, time until critical, "
"specific action needed, and any pre-staged resources.\n\n"
"MONITOR: Items that could escalate. Each item: what to watch, "
"trigger conditions for escalation, current trajectory.\n\n"
"FYI: Resolved items or low-risk situations the incoming shift should know about.\n\n"
"Be concise. Ops managers scan, they don't read paragraphs. "
"Use the data provided — do not invent details."
)

View File

@@ -0,0 +1,39 @@
"""Resources for the ops MCP server."""
import json
from mcp_servers.data.scenarios.manager import scenario_manager
from mcp_servers.ops import server
from mcp_servers.ops.server import mcp
@mcp.resource("ops://crew/roster")
def crew_roster() -> str:
"""Full crew roster for the current scenario.
Returns per crew: crew_id, name, role, base_hub, duty status summary.
"""
roster = []
for c in scenario_manager.crew:
hours_remaining = c.duty_hours_limit - c.duty_hours_elapsed
roster.append({
"crew_id": c.crew_id,
"name": c.name,
"role": c.role.value,
"base_hub": c.base_hub,
"duty_status": "AT_RISK" if hours_remaining <= 2.0 else "OK",
"hours_until_limit": round(hours_remaining, 2),
"next_flight": c.next_scheduled_flight,
})
return json.dumps(roster)
@mcp.resource("ops://handover/latest")
def latest_handover() -> str:
"""The most recently generated shift handover brief.
Returns null if no brief has been generated yet.
"""
if server._last_handover is None:
return json.dumps(None)
return json.dumps(server._last_handover)

View File

@@ -4,13 +4,10 @@ Covers: crew duty status, crew notes, pending rebookings, ops narrative,
crew roster, last handover brief, and handover brief prompt template. crew roster, last handover brief, and handover brief prompt template.
""" """
import json
from datetime import datetime, timezone from datetime import datetime, timezone
from fastmcp import FastMCP from fastmcp import FastMCP
from mcp_servers.data.scenarios.manager import scenario_manager
mcp = FastMCP( mcp = FastMCP(
"stellar-ops-internal", "stellar-ops-internal",
instructions=( instructions=(
@@ -19,7 +16,6 @@ mcp = FastMCP(
), ),
) )
# In-memory store for the last generated handover brief
_last_handover: dict | None = None _last_handover: dict | None = None
@@ -32,191 +28,4 @@ def store_handover_brief(brief: dict) -> None:
} }
# ── Tools ────────────────────────────────────────────────────────── from mcp_servers.ops import tools, resources, prompts # noqa: E402, F401
@mcp.tool()
def get_crew_notes(flight_id: str) -> list[str]:
"""Free-text notes logged by crew or ops team for a flight.
Includes: maintenance write-ups, catering delays, gate conflicts,
passenger incidents, operational remarks.
"""
return scenario_manager.crew_notes.get(flight_id, [])
@mcp.tool()
def get_crew_duty_status(crew_ids: list[str]) -> list[dict]:
"""Duty hours and rest state for each crew member.
Returns per crew: crew_id, name, role, duty_hours_elapsed,
duty_hours_limit, hours_until_limit, rest_hours_since_last,
at_risk (true if within 2h of FAA Part 117 limit).
"""
results = []
crew_map = {c.crew_id: c for c in scenario_manager.crew}
for crew_id in crew_ids:
crew = crew_map.get(crew_id)
if not crew:
results.append({"crew_id": crew_id, "error": "not_found"})
continue
hours_until_limit = crew.duty_hours_limit - crew.duty_hours_elapsed
results.append({
"crew_id": crew.crew_id,
"name": crew.name,
"role": crew.role.value,
"duty_hours_elapsed": crew.duty_hours_elapsed,
"duty_hours_limit": crew.duty_hours_limit,
"hours_until_limit": round(hours_until_limit, 2),
"rest_hours_since_last": crew.rest_hours_since_last,
"at_risk": hours_until_limit <= 2.0,
"base_hub": crew.base_hub,
"next_scheduled_flight": crew.next_scheduled_flight,
})
return results
@mcp.tool()
def get_pending_rebookings(hub: str, limit: int = 20) -> list[dict]:
"""Passengers needing rebooking at a hub, sorted by urgency.
Returns: pax_id, name, mileage_plus_status, original_flight,
destination, next_available_option, urgency.
"""
# Filter rebookings by flights originating from this hub
hub_flights = {f.flight_id for f in scenario_manager.flights if f.origin == hub.upper()}
rebookings = [
r for r in scenario_manager.rebookings
if r.original_flight in hub_flights
]
urgency_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
rebookings.sort(key=lambda r: urgency_order.get(r.urgency, 3))
return [r.model_dump(mode="json") for r in rebookings[:limit]]
@mcp.tool()
async def generate_narrative(context: dict) -> str:
"""Synthesizes aggregated operational context into a structured
handover brief for ops managers.
Uses Claude via Anthropic SDK (or Bedrock when USE_BEDROCK=true).
Output: prioritized, concise, structured by IMMEDIATE / MONITOR / FYI.
Falls back to template if no API key is configured.
"""
try:
from mcp_servers.shared_llm import generate, _get_provider
hub = context.get("hub", "ALL")
shift_time = context.get("shift_time", datetime.now(timezone.utc).strftime("%H:%M UTC"))
system_prompt = (
f"You are an airline operations shift handover briefing system. "
f"Generate a concise handover brief for {hub} at {shift_time}. "
f"Structure as: HEADER, then IMMEDIATE ACTION (items needing action within 2h), "
f"MONITOR (items that could escalate), FYI (resolved or low-risk). "
f"Be concise — ops managers scan, they don't read paragraphs. "
f"Use the data provided. Do not invent details."
)
text = await generate(system_prompt, json.dumps(context, indent=2))
return json.dumps({"text": text, "provider": _get_provider()})
except Exception:
return json.dumps({"text": _template_narrative(context), "provider": "template"})
def _template_narrative(context: dict) -> str:
"""Structured template fallback when LLM is unavailable."""
sections = []
immediate = context.get("immediate", [])
monitor = context.get("monitor", [])
fyi = context.get("fyi", [])
hub = context.get("hub", "ALL")
shift_time = context.get("shift_time", datetime.now(timezone.utc).strftime("%H:%M UTC"))
sections.append(f"SHIFT HANDOVER BRIEF — {hub} / {shift_time}")
sections.append(f"Generated: {datetime.now(timezone.utc).strftime('%H:%M UTC')}")
sections.append("")
if immediate:
sections.append("━━━ IMMEDIATE ACTION ━━━")
for item in immediate:
sections.append(f"{item}")
sections.append("")
if monitor:
sections.append("━━━ MONITOR ━━━")
for item in monitor:
sections.append(f"{item}")
sections.append("")
if fyi:
sections.append("━━━ FYI ━━━")
for item in fyi:
sections.append(f" {item}")
return "\n".join(sections)
# ── Resources ──────────────────────────────────────────────────────
@mcp.resource("ops://crew/roster")
def crew_roster() -> str:
"""Full crew roster for the current scenario.
Returns per crew: crew_id, name, role, base_hub, duty status summary.
"""
roster = []
for c in scenario_manager.crew:
hours_remaining = c.duty_hours_limit - c.duty_hours_elapsed
roster.append({
"crew_id": c.crew_id,
"name": c.name,
"role": c.role.value,
"base_hub": c.base_hub,
"duty_status": "AT_RISK" if hours_remaining <= 2.0 else "OK",
"hours_until_limit": round(hours_remaining, 2),
"next_flight": c.next_scheduled_flight,
})
return json.dumps(roster)
@mcp.resource("ops://handover/latest")
def latest_handover() -> str:
"""The most recently generated shift handover brief.
Returns null if no brief has been generated yet.
"""
if _last_handover is None:
return json.dumps(None)
return json.dumps(_last_handover)
# ── Prompts ────────────────────────────────────────────────────────
@mcp.prompt()
def handover_brief(hub: str = "ALL", shift_time: str = "21:00 CT") -> str:
"""Template for generating a shift handover brief.
Defines the structured format: IMMEDIATE / MONITOR / FYI sections,
priority scoring expectations, and detail level per item.
"""
return (
f"Generate a shift handover brief for {hub} at {shift_time}. "
"Structure the output as follows:\n\n"
"HEADER: Hub, time, outgoing/incoming manager names.\n\n"
"IMMEDIATE ACTION: Items requiring action within the next 2 hours. "
"Each item: flight/issue ID, what's happening, time until critical, "
"specific action needed, and any pre-staged resources.\n\n"
"MONITOR: Items that could escalate. Each item: what to watch, "
"trigger conditions for escalation, current trajectory.\n\n"
"FYI: Resolved items or low-risk situations the incoming shift should know about.\n\n"
"Be concise. Ops managers scan, they don't read paragraphs. "
"Use the data provided — do not invent details."
)

133
mcp_servers/ops/tools.py Normal file
View File

@@ -0,0 +1,133 @@
"""Tools for the ops MCP server."""
import json
from datetime import datetime, timezone
from mcp_servers.data.scenarios.manager import scenario_manager
from mcp_servers.ops.server import mcp
@mcp.tool()
def get_crew_notes(flight_id: str) -> list[str]:
"""Free-text notes logged by crew or ops team for a flight.
Includes: maintenance write-ups, catering delays, gate conflicts,
passenger incidents, operational remarks.
"""
return scenario_manager.crew_notes.get(flight_id, [])
@mcp.tool()
def get_crew_duty_status(crew_ids: list[str]) -> list[dict]:
"""Duty hours and rest state for each crew member.
Returns per crew: crew_id, name, role, duty_hours_elapsed,
duty_hours_limit, hours_until_limit, rest_hours_since_last,
at_risk (true if within 2h of FAA Part 117 limit).
"""
results = []
crew_map = {c.crew_id: c for c in scenario_manager.crew}
for crew_id in crew_ids:
crew = crew_map.get(crew_id)
if not crew:
results.append({"crew_id": crew_id, "error": "not_found"})
continue
hours_until_limit = crew.duty_hours_limit - crew.duty_hours_elapsed
results.append({
"crew_id": crew.crew_id,
"name": crew.name,
"role": crew.role.value,
"duty_hours_elapsed": crew.duty_hours_elapsed,
"duty_hours_limit": crew.duty_hours_limit,
"hours_until_limit": round(hours_until_limit, 2),
"rest_hours_since_last": crew.rest_hours_since_last,
"at_risk": hours_until_limit <= 2.0,
"base_hub": crew.base_hub,
"next_scheduled_flight": crew.next_scheduled_flight,
})
return results
@mcp.tool()
def get_pending_rebookings(hub: str, limit: int = 20) -> list[dict]:
"""Passengers needing rebooking at a hub, sorted by urgency.
Returns: pax_id, name, mileage_plus_status, original_flight,
destination, next_available_option, urgency.
"""
hub_flights = {f.flight_id for f in scenario_manager.flights if f.origin == hub.upper()}
rebookings = [
r for r in scenario_manager.rebookings
if r.original_flight in hub_flights
]
urgency_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
rebookings.sort(key=lambda r: urgency_order.get(r.urgency, 3))
return [r.model_dump(mode="json") for r in rebookings[:limit]]
@mcp.tool()
async def generate_narrative(context: dict) -> str:
"""Synthesizes aggregated operational context into a structured
handover brief for ops managers.
Uses Claude via Anthropic SDK (or Bedrock when USE_BEDROCK=true).
Output: prioritized, concise, structured by IMMEDIATE / MONITOR / FYI.
Falls back to template if no API key is configured.
"""
try:
from mcp_servers.shared_llm import generate, _get_provider
hub = context.get("hub", "ALL")
shift_time = context.get("shift_time", datetime.now(timezone.utc).strftime("%H:%M UTC"))
system_prompt = (
f"You are an airline operations shift handover briefing system. "
f"Generate a concise handover brief for {hub} at {shift_time}. "
f"Structure as: HEADER, then IMMEDIATE ACTION (items needing action within 2h), "
f"MONITOR (items that could escalate), FYI (resolved or low-risk). "
f"Be concise — ops managers scan, they don't read paragraphs. "
f"Use the data provided. Do not invent details."
)
text = await generate(system_prompt, json.dumps(context, indent=2))
return json.dumps({"text": text, "provider": _get_provider()})
except Exception:
return json.dumps({"text": _template_narrative(context), "provider": "template"})
def _template_narrative(context: dict) -> str:
"""Structured template fallback when LLM is unavailable."""
sections = []
immediate = context.get("immediate", [])
monitor = context.get("monitor", [])
fyi = context.get("fyi", [])
hub = context.get("hub", "ALL")
shift_time = context.get("shift_time", datetime.now(timezone.utc).strftime("%H:%M UTC"))
sections.append(f"SHIFT HANDOVER BRIEF — {hub} / {shift_time}")
sections.append(f"Generated: {datetime.now(timezone.utc).strftime('%H:%M UTC')}")
sections.append("")
if immediate:
sections.append("━━━ IMMEDIATE ACTION ━━━")
for item in immediate:
sections.append(f"{item}")
sections.append("")
if monitor:
sections.append("━━━ MONITOR ━━━")
for item in monitor:
sections.append(f"{item}")
sections.append("")
if fyi:
sections.append("━━━ FYI ━━━")
for item in fyi:
sections.append(f" {item}")
return "\n".join(sections)

View File

@@ -0,0 +1,36 @@
"""Prompts for the passenger MCP server."""
from mcp_servers.passenger.server import mcp
TONE_INSTRUCTIONS = {
"empathetic": (
"Write a passenger notification about this flight disruption. "
"Be empathetic and human. Explain WHY the delay/cancellation happened "
"using the weather, maintenance, or operational data provided. "
"Tell the passenger what's happening next: new boarding time, gate, "
"rebooking options. End with reassurance. No jargon."
),
"factual": (
"Write a brief, factual notification. Include: flight number, "
"route, status, delay duration, cause (one phrase), new departure time, "
"gate. No editorial. No reassurance. Just facts."
),
"brief_sms": (
"Write an SMS-length notification (under 160 characters). "
"Format: UA{flight} {route}: {status}. {cause}. New dep: {time}. Gate {gate}."
),
}
@mcp.prompt()
def passenger_notification(tone: str = "empathetic") -> str:
"""Template for generating passenger delay/cancellation notifications.
tone: empathetic (default) | factual | brief_sms
"""
instruction = TONE_INSTRUCTIONS.get(tone, TONE_INSTRUCTIONS["empathetic"])
return (
f"{instruction}\n\n"
"Use the operational data provided below. Do not invent details. "
"If data is missing for a section, omit that section."
)

View File

@@ -0,0 +1,49 @@
"""Resources for the passenger MCP server."""
import json
from mcp_servers.data.scenarios.manager import scenario_manager
from mcp_servers.passenger.server import mcp
@mcp.resource("ops://flights/{flight_id}/manifest")
def flight_manifest(flight_id: str) -> str:
"""Passenger manifest summary for a flight.
Returns: flight_id, passenger count, count by MP status tier,
special needs count, connection count.
Summary-level — no PII exposed through this resource.
"""
flight = None
for f in scenario_manager.flights:
if f.flight_id == flight_id:
flight = f
break
if not flight:
return json.dumps({"error": f"Flight {flight_id} not found"})
pax_on_flight = [
p for p in scenario_manager.passengers
if p.flight_id == flight_id
]
status_counts = {}
special_needs_count = 0
connection_count = 0
for p in pax_on_flight:
status = p.mileage_plus_status.value
status_counts[status] = status_counts.get(status, 0) + 1
if p.special_needs:
special_needs_count += 1
if p.connection_flight:
connection_count += 1
return json.dumps({
"flight_id": flight_id,
"total_passengers": flight.passenger_count,
"by_status": status_counts,
"special_needs_count": special_needs_count,
"connection_count": connection_count,
})

View File

@@ -4,13 +4,8 @@ Covers: passenger notification generation, flight manifest, and
notification prompt template (multi-tone). notification prompt template (multi-tone).
""" """
import json
from fastmcp import FastMCP from fastmcp import FastMCP
from mcp_servers.data.models import MPStatus
from mcp_servers.data.scenarios.manager import scenario_manager
mcp = FastMCP( mcp = FastMCP(
"stellar-ops-passenger", "stellar-ops-passenger",
instructions=( instructions=(
@@ -19,151 +14,4 @@ mcp = FastMCP(
), ),
) )
from mcp_servers.passenger import tools, resources, prompts # noqa: E402, F401
# ── Tools ──────────────────────────────────────────────────────────
@mcp.tool()
async def generate_notification(context: dict) -> str:
"""Synthesizes flight disruption context into an empathetic,
actionable passenger notification.
Uses Claude via Anthropic SDK (or Bedrock when USE_BEDROCK=true).
Output: clear, human, no jargon, includes gate/time/status.
Falls back to template if no API key is configured.
"""
try:
from mcp_servers.shared_llm import generate, _get_provider
system_prompt = (
"You are a passenger notification system for Stellar Air. "
"Write a clear, empathetic notification about this flight disruption. "
"Explain WHY the delay or cancellation happened using the operational data provided. "
"Tell the passenger what's happening next: new boarding time, gate, status. "
"Be human and reassuring. No aviation jargon. No speculation. "
"If data is missing for a section, omit it — don't make things up."
)
text = await generate(system_prompt, json.dumps(context, indent=2))
return json.dumps({"text": text, "provider": _get_provider()})
except Exception:
return json.dumps({"text": _template_notification(context), "provider": "template"})
def _template_notification(context: dict) -> str:
"""Structured template fallback when LLM is unavailable."""
flight_id = context.get("flight_id", "")
origin = context.get("origin", "")
destination = context.get("destination", "")
status = context.get("status", "DELAYED")
delay_minutes = context.get("delay_minutes", 0)
delay_cause = context.get("delay_cause", "")
gate = context.get("gate", "")
weather_summary = context.get("weather_summary", "")
crew_notes_summary = context.get("crew_notes_summary", "")
lines = [
f"{flight_id}{origin}{destination}",
f"Status: {status}" + (f" {delay_minutes} minutes" if delay_minutes else ""),
"",
]
if delay_cause:
cause_text = {
"WEATHER": "weather conditions along your route",
"MAINTENANCE": "a routine maintenance check on your aircraft",
"CREW": "ensuring your crew is fully rested for safe operation",
"ATC": "air traffic control restrictions",
"LATE_AIRCRAFT": "the late arrival of your inbound aircraft",
}.get(delay_cause, f"{delay_cause.lower()}")
lines.append(f"Your flight is delayed due to {cause_text}.")
if weather_summary:
lines.append(f"Current conditions: {weather_summary}.")
if gate:
lines.append(f"\nGate {gate} — no gate change expected.")
return "\n".join(lines)
# ── Resources ──────────────────────────────────────────────────────
@mcp.resource("ops://flights/{flight_id}/manifest")
def flight_manifest(flight_id: str) -> str:
"""Passenger manifest summary for a flight.
Returns: flight_id, passenger count, count by MP status tier,
special needs count, connection count.
Summary-level — no PII exposed through this resource.
"""
flight = None
for f in scenario_manager.flights:
if f.flight_id == flight_id:
flight = f
break
if not flight:
return json.dumps({"error": f"Flight {flight_id} not found"})
# Count passengers by status from scenario data
pax_on_flight = [
p for p in scenario_manager.passengers
if p.flight_id == flight_id
]
status_counts = {}
special_needs_count = 0
connection_count = 0
for p in pax_on_flight:
status = p.mileage_plus_status.value
status_counts[status] = status_counts.get(status, 0) + 1
if p.special_needs:
special_needs_count += 1
if p.connection_flight:
connection_count += 1
return json.dumps({
"flight_id": flight_id,
"total_passengers": flight.passenger_count,
"by_status": status_counts,
"special_needs_count": special_needs_count,
"connection_count": connection_count,
})
# ── Prompts ────────────────────────────────────────────────────────
TONE_INSTRUCTIONS = {
"empathetic": (
"Write a passenger notification about this flight disruption. "
"Be empathetic and human. Explain WHY the delay/cancellation happened "
"using the weather, maintenance, or operational data provided. "
"Tell the passenger what's happening next: new boarding time, gate, "
"rebooking options. End with reassurance. No jargon."
),
"factual": (
"Write a brief, factual notification. Include: flight number, "
"route, status, delay duration, cause (one phrase), new departure time, "
"gate. No editorial. No reassurance. Just facts."
),
"brief_sms": (
"Write an SMS-length notification (under 160 characters). "
"Format: UA{flight} {route}: {status}. {cause}. New dep: {time}. Gate {gate}."
),
}
@mcp.prompt()
def passenger_notification(tone: str = "empathetic") -> str:
"""Template for generating passenger delay/cancellation notifications.
tone: empathetic (default) | factual | brief_sms
"""
instruction = TONE_INSTRUCTIONS.get(tone, TONE_INSTRUCTIONS["empathetic"])
return (
f"{instruction}\n\n"
"Use the operational data provided below. Do not invent details. "
"If data is missing for a section, omit that section."
)

View File

@@ -0,0 +1,66 @@
"""Tools for the passenger MCP server."""
import json
from mcp_servers.passenger.server import mcp
@mcp.tool()
async def generate_notification(context: dict) -> str:
"""Synthesizes flight disruption context into an empathetic,
actionable passenger notification.
Uses Claude via Anthropic SDK (or Bedrock when USE_BEDROCK=true).
Output: clear, human, no jargon, includes gate/time/status.
Falls back to template if no API key is configured.
"""
try:
from mcp_servers.shared_llm import generate, _get_provider
system_prompt = (
"You are a passenger notification system for Stellar Air. "
"Write a clear, empathetic notification about this flight disruption. "
"Explain WHY the delay or cancellation happened using the operational data provided. "
"Tell the passenger what's happening next: new boarding time, gate, status. "
"Be human and reassuring. No aviation jargon. No speculation. "
"If data is missing for a section, omit it — don't make things up."
)
text = await generate(system_prompt, json.dumps(context, indent=2))
return json.dumps({"text": text, "provider": _get_provider()})
except Exception:
return json.dumps({"text": _template_notification(context), "provider": "template"})
def _template_notification(context: dict) -> str:
"""Structured template fallback when LLM is unavailable."""
flight_id = context.get("flight_id", "")
origin = context.get("origin", "")
destination = context.get("destination", "")
status = context.get("status", "DELAYED")
delay_minutes = context.get("delay_minutes", 0)
delay_cause = context.get("delay_cause", "")
gate = context.get("gate", "")
weather_summary = context.get("weather_summary", "")
lines = [
f"{flight_id}{origin}{destination}",
f"Status: {status}" + (f" {delay_minutes} minutes" if delay_minutes else ""),
"",
]
if delay_cause:
cause_text = {
"WEATHER": "weather conditions along your route",
"MAINTENANCE": "a routine maintenance check on your aircraft",
"CREW": "ensuring your crew is fully rested for safe operation",
"ATC": "air traffic control restrictions",
"LATE_AIRCRAFT": "the late arrival of your inbound aircraft",
}.get(delay_cause, f"{delay_cause.lower()}")
lines.append(f"Your flight is delayed due to {cause_text}.")
if weather_summary:
lines.append(f"Current conditions: {weather_summary}.")
if gate:
lines.append(f"\nGate {gate} — no gate change expected.")
return "\n".join(lines)

View File

@@ -0,0 +1,54 @@
"""Prompts for the shared MCP server."""
from mcp_servers.shared.server import mcp
DELAY_TEMPLATES = {
("WEATHER", "passenger"): (
"Explain this flight delay to a passenger. The cause is weather-related. "
"Be empathetic and transparent. Include what's happening with the weather, "
"what the airline is doing about it, and what the passenger should expect next. "
"Do not use aviation jargon."
),
("WEATHER", "ops_manager"): (
"Summarize this weather-caused delay for an ops manager. "
"Include: delay cause code, affected systems, GDP/ground stop status if applicable, "
"forecast window, and recommended next actions. Be concise."
),
("MAINTENANCE", "passenger"): (
"Explain this maintenance-caused delay to a passenger. "
"Be reassuring — emphasize safety. Describe what's being checked "
"without technical jargon. Give expected timeline if available."
),
("MAINTENANCE", "ops_manager"): (
"Summarize this maintenance delay for an ops manager. "
"Include: MEL item, system affected, restriction details, "
"estimated release time, and downstream impact."
),
("CREW", "passenger"): (
"Explain this crew-related delay to a passenger. "
"Frame it as 'ensuring your crew is fully rested for safe operation.' "
"Do not mention specific regulations or duty limits."
),
("CREW", "ops_manager"): (
"Summarize this crew-caused delay for an ops manager. "
"Include: Part 117 status, hours remaining, swap options, "
"backup crew availability, and timeline."
),
}
@mcp.prompt()
def delay_explainer(cause_code: str, audience: str) -> str:
"""Turns a raw delay cause code into a human-readable explanation prompt.
cause_code: WEATHER | MAINTENANCE | CREW | ATC | LATE_AIRCRAFT
audience: passenger | ops_manager
"""
key = (cause_code.upper(), audience.lower())
default_key = ("WEATHER", audience.lower())
template = DELAY_TEMPLATES.get(key, DELAY_TEMPLATES.get(default_key, ""))
return (
f"{template}\n\n"
"Use the operational data provided below. Do not invent details. "
"If data is missing for a section, omit that section."
)

View File

@@ -0,0 +1,25 @@
"""Resources for the shared MCP server."""
import json
from mcp_servers.data.models import HUBS
from mcp_servers.data.scenarios.manager import scenario_manager
from mcp_servers.shared.server import mcp
@mcp.resource("ops://hubs/{hub_code}")
def hub_info(hub_code: str) -> str:
"""Static reference data for a hub.
Returns: full name, codes, coordinates, timezone, terminal/gate/runway count.
"""
hub = HUBS.get(hub_code.upper())
if not hub:
return f"Unknown hub: {hub_code}. Known: {', '.join(HUBS.keys())}"
return hub.model_dump_json()
@mcp.resource("ops://scenarios/active")
def active_scenario() -> str:
"""Current active scenario metadata."""
return json.dumps(scenario_manager.get_metadata())

View File

@@ -6,10 +6,6 @@ maintenance flags, hub reference data, and delay explanation prompts.
from fastmcp import FastMCP from fastmcp import FastMCP
from mcp_servers.data.models import HUBS, FlightStatus
from mcp_servers.data.real import faa, openmeteo
from mcp_servers.data.scenarios.manager import scenario_manager
mcp = FastMCP( mcp = FastMCP(
"stellar-ops-shared", "stellar-ops-shared",
instructions=( instructions=(
@@ -19,217 +15,4 @@ mcp = FastMCP(
), ),
) )
from mcp_servers.shared import tools, resources, prompts # noqa: E402, F401
# ── Tools ──────────────────────────────────────────────────────────
@mcp.tool()
def get_flight_status(flight_id: str) -> dict:
"""Lightweight status check for a single flight.
Returns: flight_id, status (ON_TIME/DELAYED/CANCELLED/DIVERTED),
delay_minutes, gate, last_updated.
Use this for quick triage before pulling full operational data.
"""
for f in scenario_manager.flights:
if f.flight_id == flight_id:
return {
"flight_id": f.flight_id,
"status": f.status.value,
"delay_minutes": f.delay_minutes,
"gate": f.gate,
"origin": f.origin,
"destination": f.destination,
"source": "scenario_mock",
}
return {"error": f"Flight {flight_id} not found in active scenario"}
@mcp.tool()
def get_flight_details(flight_id: str) -> dict:
"""Full operational context for a flight.
Returns: scheduled vs actual times, delay cause code, inbound aircraft
tail number, gate assignment, crew IDs, passenger count.
"""
for f in scenario_manager.flights:
if f.flight_id == flight_id:
return f.model_dump(mode="json")
return {"error": f"Flight {flight_id} not found in active scenario"}
@mcp.tool()
def get_irregular_ops(hub: str) -> list[dict]:
"""All flights in irregular operations state at a hub.
hub: ORD | EWR | IAH | SFO | DEN
Returns list of: flight_id, irrop_type, affected_pax_count,
delay_cause, delay_minutes.
"""
results = []
for f in scenario_manager.flights:
if f.origin == hub.upper() and f.status != FlightStatus.ON_TIME:
results.append({
"flight_id": f.flight_id,
"irrop_type": f.status.value,
"affected_pax_count": f.passenger_count,
"delay_cause": f.delay_cause.value if f.delay_cause else None,
"delay_minutes": f.delay_minutes,
"origin": f.origin,
"destination": f.destination,
"gate": f.gate,
})
return results
@mcp.tool()
async def get_route_weather(origin: str, destination: str) -> dict:
"""Live weather at origin, destination, and en-route waypoints.
Source: OpenMeteo API (real-time, no API key).
Returns per waypoint: conditions, temperature, wind, visibility,
precipitation, significant events.
"""
return await openmeteo.get_weather_along_route(origin, destination)
@mcp.tool()
async def get_hub_forecasts() -> dict:
"""4-hour weather forecast for all United hubs (ORD, EWR, IAH, SFO, DEN).
Source: OpenMeteo API.
Returns per hub: hourly forecast with conditions, wind, visibility,
and a risk_flag if convective activity or low visibility expected.
"""
return await openmeteo.get_weather_forecast_hubs()
@mcp.tool()
async def get_airport_status(airport_code: str) -> dict:
"""Live FAA airport status.
Source: FAA NASSTATUS API (real-time, no API key).
Returns: delay_type, delay_reason, average_delay_minutes,
ground_delay_program active/inactive, ground_stop active/inactive.
Falls back to 'status_unavailable' if FAA API is unreachable.
"""
return await faa.get_airport_status(airport_code)
@mcp.tool()
async def get_airport_congestion(airport_code: str) -> dict:
"""Gate availability and taxi times at an airport.
Combines: FAA live status (real) + gate/taxi mock data from scenario.
Returns: faa_status, gates_available, gates_total,
avg_taxi_out_minutes, avg_taxi_in_minutes.
"""
faa_status = await faa.get_airport_status(airport_code)
hub = HUBS.get(airport_code.upper())
total_gates = hub.gates if hub else 100
# Mock congestion based on disrupted flights in scenario
disrupted = sum(
1 for f in scenario_manager.flights
if f.origin == airport_code.upper() and f.status != FlightStatus.ON_TIME
)
gates_occupied = min(total_gates, int(total_gates * 0.7) + disrupted * 3)
return {
"airport": airport_code.upper(),
"faa_status": faa_status,
"gates_available": total_gates - gates_occupied,
"gates_total": total_gates,
"avg_taxi_out_minutes": 12 + disrupted * 4,
"avg_taxi_in_minutes": 8 + disrupted * 2,
"source": "faa_live+scenario_mock",
}
@mcp.tool()
def get_maintenance_flags(aircraft_tail: str) -> list[dict]:
"""Active MEL (Minimum Equipment List) items for an aircraft.
Returns per item: mel_id, system, description, restriction
(route/ops limitations), expiry date.
"""
items = scenario_manager.maintenance.get(aircraft_tail, [])
return [item.model_dump(mode="json") for item in items]
# ── Resources ──────────────────────────────────────────────────────
@mcp.resource("ops://hubs/{hub_code}")
def hub_info(hub_code: str) -> str:
"""Static reference data for a hub.
Returns: full name, codes, coordinates, timezone, terminal/gate/runway count.
"""
hub = HUBS.get(hub_code.upper())
if not hub:
return f"Unknown hub: {hub_code}. Known: {', '.join(HUBS.keys())}"
return hub.model_dump_json()
@mcp.resource("ops://scenarios/active")
def active_scenario() -> str:
"""Current active scenario metadata."""
import json
return json.dumps(scenario_manager.get_metadata())
# ── Prompts ────────────────────────────────────────────────────────
DELAY_TEMPLATES = {
("WEATHER", "passenger"): (
"Explain this flight delay to a passenger. The cause is weather-related. "
"Be empathetic and transparent. Include what's happening with the weather, "
"what the airline is doing about it, and what the passenger should expect next. "
"Do not use aviation jargon."
),
("WEATHER", "ops_manager"): (
"Summarize this weather-caused delay for an ops manager. "
"Include: delay cause code, affected systems, GDP/ground stop status if applicable, "
"forecast window, and recommended next actions. Be concise."
),
("MAINTENANCE", "passenger"): (
"Explain this maintenance-caused delay to a passenger. "
"Be reassuring — emphasize safety. Describe what's being checked "
"without technical jargon. Give expected timeline if available."
),
("MAINTENANCE", "ops_manager"): (
"Summarize this maintenance delay for an ops manager. "
"Include: MEL item, system affected, restriction details, "
"estimated release time, and downstream impact."
),
("CREW", "passenger"): (
"Explain this crew-related delay to a passenger. "
"Frame it as 'ensuring your crew is fully rested for safe operation.' "
"Do not mention specific regulations or duty limits."
),
("CREW", "ops_manager"): (
"Summarize this crew-caused delay for an ops manager. "
"Include: Part 117 status, hours remaining, swap options, "
"backup crew availability, and timeline."
),
}
@mcp.prompt()
def delay_explainer(cause_code: str, audience: str) -> str:
"""Turns a raw delay cause code into a human-readable explanation prompt.
cause_code: WEATHER | MAINTENANCE | CREW | ATC | LATE_AIRCRAFT
audience: passenger | ops_manager
"""
key = (cause_code.upper(), audience.lower())
default_key = ("WEATHER", audience.lower())
template = DELAY_TEMPLATES.get(key, DELAY_TEMPLATES.get(default_key, ""))
return (
f"{template}\n\n"
"Use the operational data provided below. Do not invent details. "
"If data is missing for a section, omit that section."
)

139
mcp_servers/shared/tools.py Normal file
View File

@@ -0,0 +1,139 @@
"""Tools for the shared MCP server."""
from mcp_servers.data.models import HUBS, FlightStatus
from mcp_servers.data.real import faa, openmeteo
from mcp_servers.data.scenarios.manager import scenario_manager
from mcp_servers.shared.server import mcp
@mcp.tool()
def get_flight_status(flight_id: str) -> dict:
"""Lightweight status check for a single flight.
Returns: flight_id, status (ON_TIME/DELAYED/CANCELLED/DIVERTED),
delay_minutes, gate, last_updated.
Use this for quick triage before pulling full operational data.
"""
for f in scenario_manager.flights:
if f.flight_id == flight_id:
return {
"flight_id": f.flight_id,
"status": f.status.value,
"delay_minutes": f.delay_minutes,
"gate": f.gate,
"origin": f.origin,
"destination": f.destination,
"source": "scenario_mock",
}
return {"error": f"Flight {flight_id} not found in active scenario"}
@mcp.tool()
def get_flight_details(flight_id: str) -> dict:
"""Full operational context for a flight.
Returns: scheduled vs actual times, delay cause code, inbound aircraft
tail number, gate assignment, crew IDs, passenger count.
"""
for f in scenario_manager.flights:
if f.flight_id == flight_id:
return f.model_dump(mode="json")
return {"error": f"Flight {flight_id} not found in active scenario"}
@mcp.tool()
def get_irregular_ops(hub: str) -> list[dict]:
"""All flights in irregular operations state at a hub.
hub: ORD | EWR | IAH | SFO | DEN
Returns list of: flight_id, irrop_type, affected_pax_count,
delay_cause, delay_minutes.
"""
results = []
for f in scenario_manager.flights:
if f.origin == hub.upper() and f.status != FlightStatus.ON_TIME:
results.append({
"flight_id": f.flight_id,
"irrop_type": f.status.value,
"affected_pax_count": f.passenger_count,
"delay_cause": f.delay_cause.value if f.delay_cause else None,
"delay_minutes": f.delay_minutes,
"origin": f.origin,
"destination": f.destination,
"gate": f.gate,
})
return results
@mcp.tool()
async def get_route_weather(origin: str, destination: str) -> dict:
"""Live weather at origin, destination, and en-route waypoints.
Source: OpenMeteo API (real-time, no API key).
Returns per waypoint: conditions, temperature, wind, visibility,
precipitation, significant events.
"""
return await openmeteo.get_weather_along_route(origin, destination)
@mcp.tool()
async def get_hub_forecasts() -> dict:
"""4-hour weather forecast for all United hubs (ORD, EWR, IAH, SFO, DEN).
Source: OpenMeteo API.
Returns per hub: hourly forecast with conditions, wind, visibility,
and a risk_flag if convective activity or low visibility expected.
"""
return await openmeteo.get_weather_forecast_hubs()
@mcp.tool()
async def get_airport_status(airport_code: str) -> dict:
"""Live FAA airport status.
Source: FAA NASSTATUS API (real-time, no API key).
Returns: delay_type, delay_reason, average_delay_minutes,
ground_delay_program active/inactive, ground_stop active/inactive.
Falls back to 'status_unavailable' if FAA API is unreachable.
"""
return await faa.get_airport_status(airport_code)
@mcp.tool()
async def get_airport_congestion(airport_code: str) -> dict:
"""Gate availability and taxi times at an airport.
Combines: FAA live status (real) + gate/taxi mock data from scenario.
Returns: faa_status, gates_available, gates_total,
avg_taxi_out_minutes, avg_taxi_in_minutes.
"""
faa_status = await faa.get_airport_status(airport_code)
hub = HUBS.get(airport_code.upper())
total_gates = hub.gates if hub else 100
disrupted = sum(
1 for f in scenario_manager.flights
if f.origin == airport_code.upper() and f.status != FlightStatus.ON_TIME
)
gates_occupied = min(total_gates, int(total_gates * 0.7) + disrupted * 3)
return {
"airport": airport_code.upper(),
"faa_status": faa_status,
"gates_available": total_gates - gates_occupied,
"gates_total": total_gates,
"avg_taxi_out_minutes": 12 + disrupted * 4,
"avg_taxi_in_minutes": 8 + disrupted * 2,
"source": "faa_live+scenario_mock",
}
@mcp.tool()
def get_maintenance_flags(aircraft_tail: str) -> list[dict]:
"""Active MEL (Minimum Equipment List) items for an aircraft.
Returns per item: mel_id, system, description, restriction
(route/ops limitations), expiry date.
"""
items = scenario_manager.maintenance.get(aircraft_tail, [])
return [item.model_dump(mode="json") for item in items]