183 lines
6.4 KiB
Python
183 lines
6.4 KiB
Python
"""FCE agent — "Behind Every Departure" (Flight Context Engine) passenger notification agent.
|
|
|
|
Connects to: shared + passenger MCP servers.
|
|
When a flight is disrupted, gathers all operational context and generates
|
|
an empathetic passenger notification.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from agents.shared.mcp_client import MCPMultiClient
|
|
from agents.shared.tool_runner import build_tool_caller
|
|
|
|
|
|
async def run_fce(
|
|
flight_id: str,
|
|
mcp: MCPMultiClient,
|
|
on_event: Any = None,
|
|
lf: Any = None,
|
|
) -> dict:
|
|
run_start = time.time()
|
|
errors = []
|
|
tool_calls = []
|
|
|
|
async def emit(event_type: str, **data):
|
|
event = {"type": event_type, "timestamp": datetime.now(timezone.utc).isoformat(), **data}
|
|
tool_calls.append(event)
|
|
if on_event:
|
|
await on_event(event)
|
|
|
|
# ── Node 1: Triage ──
|
|
|
|
await emit("node_enter", node="triage")
|
|
|
|
t0 = time.time()
|
|
flight_status = await mcp.call_tool("shared", "get_flight_status", {"flight_id": flight_id})
|
|
latency = int((time.time() - t0) * 1000)
|
|
await emit("tool_call_end", tool="get_flight_status", latency_ms=latency, is_live=False)
|
|
|
|
if isinstance(flight_status, dict) and "error" in flight_status:
|
|
return {"error": flight_status["error"], "flight_id": flight_id}
|
|
|
|
status = flight_status.get("status", "")
|
|
delay_minutes = flight_status.get("delay_minutes", 0)
|
|
should_notify = status in ("CANCELLED", "DIVERTED") or (
|
|
status == "DELAYED" and delay_minutes >= 10
|
|
)
|
|
|
|
await emit("node_exit", node="triage", result={"should_notify": should_notify})
|
|
|
|
if not should_notify:
|
|
return {
|
|
"flight_id": flight_id,
|
|
"type": "NO_NOTIFICATION",
|
|
"reason": f"Status {status}, delay {delay_minutes}min — below threshold",
|
|
"duration_ms": int((time.time() - run_start) * 1000),
|
|
}
|
|
|
|
# ── Node 2: Gather Context (parallel tool calls) ──
|
|
|
|
await emit("node_enter", node="gather_context")
|
|
|
|
origin = flight_status.get("origin", "")
|
|
destination = flight_status.get("destination", "")
|
|
|
|
_call = build_tool_caller(mcp, emit=emit, errors=errors, lf=lf)
|
|
|
|
# Fire all independent calls in parallel
|
|
ops_data_task = asyncio.create_task(
|
|
_call("shared", "get_flight_details", {"flight_id": flight_id})
|
|
)
|
|
weather_task = asyncio.create_task(
|
|
_call("shared", "get_route_weather", {"origin": origin, "destination": destination}, is_live=True)
|
|
)
|
|
airport_status_task = asyncio.create_task(
|
|
_call("shared", "get_airport_status", {"airport_code": origin}, is_live=True)
|
|
)
|
|
airport_congestion_task = asyncio.create_task(
|
|
_call("shared", "get_airport_congestion", {"airport_code": origin}, is_live=True)
|
|
)
|
|
crew_notes_task = asyncio.create_task(
|
|
_call("ops", "get_crew_notes", {"flight_id": flight_id})
|
|
)
|
|
|
|
ops_data, weather, airport_status, airport_congestion, crew_notes = await asyncio.gather(
|
|
ops_data_task, weather_task, airport_status_task, airport_congestion_task, crew_notes_task
|
|
)
|
|
|
|
await emit("node_exit", node="gather_context")
|
|
|
|
# ── Node 3: Synthesize ──
|
|
|
|
await emit("node_enter", node="synthesize")
|
|
|
|
# Build weather summary
|
|
weather_summary = ""
|
|
if weather and isinstance(weather, dict):
|
|
events = weather.get("significant_events", [])
|
|
if events:
|
|
weather_summary = ", ".join(e.get("condition", "") for e in events)
|
|
else:
|
|
origin_wp = weather.get("waypoints", {}).get("origin", {})
|
|
if isinstance(origin_wp, dict) and "weather" in origin_wp:
|
|
weather_summary = origin_wp["weather"].get("condition", "")
|
|
|
|
# Build crew notes summary
|
|
crew_summary = ""
|
|
if crew_notes and isinstance(crew_notes, list):
|
|
# Take first 2 notes for the notification
|
|
relevant = [n for n in crew_notes if not n.startswith("CANCELLED")][:2]
|
|
crew_summary = " ".join(relevant)
|
|
|
|
context = {
|
|
"flight_id": flight_id,
|
|
"origin": origin,
|
|
"destination": destination,
|
|
"status": status,
|
|
"delay_minutes": delay_minutes,
|
|
"delay_cause": flight_status.get("delay_cause")
|
|
or (ops_data.get("delay_cause") if isinstance(ops_data, dict) else None),
|
|
"gate": flight_status.get("gate", ""),
|
|
"weather_summary": weather_summary,
|
|
"crew_notes_summary": crew_summary,
|
|
"get_airport_status": airport_status,
|
|
"get_airport_congestion": airport_congestion,
|
|
}
|
|
|
|
t0 = time.time()
|
|
raw_result = await mcp.call_tool("passenger", "generate_notification", {"context": context})
|
|
latency = int((time.time() - t0) * 1000)
|
|
await emit("tool_call_end", tool="generate_notification", latency_ms=latency, is_live=False)
|
|
|
|
# Parse structured response (text + provider)
|
|
llm_provider = "template"
|
|
if isinstance(raw_result, dict) and "text" in raw_result:
|
|
notification_text = raw_result["text"]
|
|
llm_provider = raw_result.get("provider", "unknown")
|
|
elif isinstance(raw_result, str):
|
|
try:
|
|
parsed = json.loads(raw_result)
|
|
notification_text = parsed.get("text", raw_result)
|
|
llm_provider = parsed.get("provider", "unknown")
|
|
except (json.JSONDecodeError, TypeError):
|
|
notification_text = raw_result
|
|
else:
|
|
notification_text = str(raw_result)
|
|
|
|
await emit("node_exit", node="synthesize")
|
|
|
|
# ── Node 4: Format Output ──
|
|
|
|
await emit("node_enter", node="format_output")
|
|
|
|
data_sources = ["flight_ops"]
|
|
if weather:
|
|
data_sources.append("weather_live")
|
|
if airport_status:
|
|
data_sources.append("faa_status_live")
|
|
if crew_notes:
|
|
data_sources.append("crew_notes")
|
|
|
|
notification = {
|
|
"flight_id": flight_id,
|
|
"type": "DELAY_NOTIFICATION" if status == "DELAYED" else f"{status}_NOTIFICATION",
|
|
"status": status,
|
|
"delay_minutes": delay_minutes,
|
|
"notification_text": notification_text,
|
|
"llm_provider": llm_provider,
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"data_sources": data_sources,
|
|
"human_approved": True, # auto-approve in demo
|
|
"errors": errors,
|
|
"duration_ms": int((time.time() - run_start) * 1000),
|
|
}
|
|
|
|
await emit("node_exit", node="format_output")
|
|
await emit("agent_end", output_summary=f"{status} notification for {flight_id}")
|
|
|
|
return notification
|