"""FCE agent — "Behind Every Departure" (Flight Context Engine) passenger notification agent. Connects to: shared + passenger MCP servers. When a flight is disrupted, gathers all operational context and generates an empathetic passenger notification. """ import asyncio import json import time from datetime import datetime, timezone from typing import Any from agents.shared.mcp_client import MCPMultiClient from agents.shared.tool_runner import build_tool_caller async def run_fce( flight_id: str, mcp: MCPMultiClient, on_event: Any = None, lf: Any = None, ) -> dict: run_start = time.time() errors = [] tool_calls = [] async def emit(event_type: str, **data): event = {"type": event_type, "timestamp": datetime.now(timezone.utc).isoformat(), **data} tool_calls.append(event) if on_event: await on_event(event) # ── Node 1: Triage ── await emit("node_enter", node="triage") t0 = time.time() flight_status = await mcp.call_tool("shared", "get_flight_status", {"flight_id": flight_id}) latency = int((time.time() - t0) * 1000) await emit("tool_call_end", tool="get_flight_status", latency_ms=latency, is_live=False) if isinstance(flight_status, dict) and "error" in flight_status: return {"error": flight_status["error"], "flight_id": flight_id} status = flight_status.get("status", "") delay_minutes = flight_status.get("delay_minutes", 0) should_notify = status in ("CANCELLED", "DIVERTED") or ( status == "DELAYED" and delay_minutes >= 10 ) await emit("node_exit", node="triage", result={"should_notify": should_notify}) if not should_notify: return { "flight_id": flight_id, "type": "NO_NOTIFICATION", "reason": f"Status {status}, delay {delay_minutes}min — below threshold", "duration_ms": int((time.time() - run_start) * 1000), } # ── Node 2: Gather Context (parallel tool calls) ── await emit("node_enter", node="gather_context") origin = flight_status.get("origin", "") destination = flight_status.get("destination", "") _call = build_tool_caller(mcp, emit=emit, errors=errors, lf=lf) # Fire all independent calls in parallel ops_data_task = asyncio.create_task( _call("shared", "get_flight_details", {"flight_id": flight_id}) ) weather_task = asyncio.create_task( _call("shared", "get_route_weather", {"origin": origin, "destination": destination}, is_live=True) ) airport_status_task = asyncio.create_task( _call("shared", "get_airport_status", {"airport_code": origin}, is_live=True) ) airport_congestion_task = asyncio.create_task( _call("shared", "get_airport_congestion", {"airport_code": origin}, is_live=True) ) crew_notes_task = asyncio.create_task( _call("ops", "get_crew_notes", {"flight_id": flight_id}) ) ops_data, weather, airport_status, airport_congestion, crew_notes = await asyncio.gather( ops_data_task, weather_task, airport_status_task, airport_congestion_task, crew_notes_task ) await emit("node_exit", node="gather_context") # ── Node 3: Synthesize ── await emit("node_enter", node="synthesize") # Build weather summary weather_summary = "" if weather and isinstance(weather, dict): events = weather.get("significant_events", []) if events: weather_summary = ", ".join(e.get("condition", "") for e in events) else: origin_wp = weather.get("waypoints", {}).get("origin", {}) if isinstance(origin_wp, dict) and "weather" in origin_wp: weather_summary = origin_wp["weather"].get("condition", "") # Build crew notes summary crew_summary = "" if crew_notes and isinstance(crew_notes, list): # Take first 2 notes for the notification relevant = [n for n in crew_notes if not n.startswith("CANCELLED")][:2] crew_summary = " ".join(relevant) context = { "flight_id": flight_id, "origin": origin, "destination": destination, "status": status, "delay_minutes": delay_minutes, "delay_cause": flight_status.get("delay_cause") or (ops_data.get("delay_cause") if isinstance(ops_data, dict) else None), "gate": flight_status.get("gate", ""), "weather_summary": weather_summary, "crew_notes_summary": crew_summary, "get_airport_status": airport_status, "get_airport_congestion": airport_congestion, } t0 = time.time() raw_result = await mcp.call_tool("passenger", "generate_notification", {"context": context}) latency = int((time.time() - t0) * 1000) await emit("tool_call_end", tool="generate_notification", latency_ms=latency, is_live=False) # Parse structured response (text + provider) llm_provider = "template" if isinstance(raw_result, dict) and "text" in raw_result: notification_text = raw_result["text"] llm_provider = raw_result.get("provider", "unknown") elif isinstance(raw_result, str): try: parsed = json.loads(raw_result) notification_text = parsed.get("text", raw_result) llm_provider = parsed.get("provider", "unknown") except (json.JSONDecodeError, TypeError): notification_text = raw_result else: notification_text = str(raw_result) await emit("node_exit", node="synthesize") # ── Node 4: Format Output ── await emit("node_enter", node="format_output") data_sources = ["flight_ops"] if weather: data_sources.append("weather_live") if airport_status: data_sources.append("faa_status_live") if crew_notes: data_sources.append("crew_notes") notification = { "flight_id": flight_id, "type": "DELAY_NOTIFICATION" if status == "DELAYED" else f"{status}_NOTIFICATION", "status": status, "delay_minutes": delay_minutes, "notification_text": notification_text, "llm_provider": llm_provider, "generated_at": datetime.now(timezone.utc).isoformat(), "data_sources": data_sources, "human_approved": True, # auto-approve in demo "errors": errors, "duration_ms": int((time.time() - run_start) * 1000), } await emit("node_exit", node="format_output") await emit("agent_end", output_summary=f"{status} notification for {flight_id}") return notification