diff --git a/agents/fce.py b/agents/fce.py index 1a8b747..ef4f35c 100644 --- a/agents/fce.py +++ b/agents/fce.py @@ -18,17 +18,8 @@ async def run_fce( flight_id: str, mcp: MCPMultiClient, on_event: Any = None, + trace: Any = None, ) -> dict: - """Run the FCE agent for a single flight. - - Args: - flight_id: The flight to generate a notification for. - mcp: Connected MCPMultiClient (shared + passenger). - on_event: Optional async callback for real-time events. - - Returns: - Notification result dict. - """ run_start = time.time() errors = [] tool_calls = [] @@ -76,20 +67,27 @@ async def run_fce( async def _call(server, tool, args, is_live=False, timeout=15.0): t = time.time() + span = trace.span(name=tool, input=args, metadata={"server": server, "is_live": is_live}) if trace else None try: result = await asyncio.wait_for( mcp.call_tool(server, tool, args), timeout=timeout, ) lat = int((time.time() - t) * 1000) + if span: + span.end(output=result, metadata={"latency_ms": lat}) await emit("tool_call_end", tool=tool, latency_ms=lat, is_live=is_live) return result except asyncio.TimeoutError: lat = int((time.time() - t) * 1000) + if span: + span.end(output={"error": "timeout"}, level="ERROR") await emit("tool_call_error", tool=tool, error="timeout", latency_ms=lat) errors.append(f"{tool}: timeout after {timeout}s") return None except Exception as e: lat = int((time.time() - t) * 1000) + if span: + span.end(output={"error": str(e)}, level="ERROR") await emit("tool_call_error", tool=tool, error=str(e), latency_ms=lat) errors.append(f"{tool}: {e}") return None diff --git a/agents/handover.py b/agents/handover.py index 6efbb16..535d4cd 100644 --- a/agents/handover.py +++ b/agents/handover.py @@ -26,17 +26,8 @@ async def run_handover( hubs: list[str] | None = None, mcp: MCPMultiClient | None = None, on_event: Any = None, + trace: Any = None, ) -> dict: - """Run the Shift Handover agent. - - Args: - hubs: Hubs to cover (default: all 5). - mcp: Connected MCPMultiClient (shared + ops). - on_event: Optional async callback for real-time events. - - Returns: - Handover brief result dict. - """ target_hubs = hubs or ALL_HUBS run_start = time.time() errors = [] @@ -54,20 +45,27 @@ async def run_handover( async def _call(server, tool, args, is_live=False, timeout=15.0): t = time.time() + span = trace.span(name=tool, input=args, metadata={"server": server, "is_live": is_live}) if trace else None try: result = await asyncio.wait_for( mcp.call_tool(server, tool, args), timeout=timeout, ) lat = int((time.time() - t) * 1000) + if span: + span.end(output=result, metadata={"latency_ms": lat}) await emit("tool_call_end", tool=tool, latency_ms=lat, is_live=is_live) return result except asyncio.TimeoutError: lat = int((time.time() - t) * 1000) + if span: + span.end(output={"error": "timeout"}, level="ERROR") await emit("tool_call_error", tool=tool, error="timeout", latency_ms=lat) errors.append(f"{tool}: timeout after {timeout}s") return None except Exception as e: lat = int((time.time() - t) * 1000) + if span: + span.end(output={"error": str(e)}, level="ERROR") await emit("tool_call_error", tool=tool, error=str(e), latency_ms=lat) errors.append(f"{tool}: {e}") return None diff --git a/api/config.py b/api/config.py index f154726..e124968 100644 --- a/api/config.py +++ b/api/config.py @@ -19,6 +19,9 @@ class Settings(BaseSettings): aws_default_region: str = "us-east-1" bedrock_model_id: str = "anthropic.claude-sonnet-4-20250514-v1:0" kong_proxy_url: str = "" + langfuse_public_key: str = "" + langfuse_secret_key: str = "" + langfuse_host: str = "http://langfuse:3000" model_config = {"env_prefix": "", "case_sensitive": False} diff --git a/api/main.py b/api/main.py index 58888ac..0b3705e 100644 --- a/api/main.py +++ b/api/main.py @@ -23,6 +23,23 @@ logging.basicConfig( ) +def _get_langfuse(): + """Lazy Langfuse client — returns None if not configured.""" + try: + from api.config import get_settings + s = get_settings() + if not s.langfuse_public_key or not s.langfuse_secret_key: + return None + from langfuse import Langfuse + return Langfuse( + public_key=s.langfuse_public_key, + secret_key=s.langfuse_secret_key, + host=s.langfuse_host, + ) + except Exception: + return None + + # ── WebSocket event hub ── class EventHub: @@ -136,14 +153,27 @@ async def trigger_fce(req: FCERequest): async def on_event(event): await event_hub.broadcast({"run_id": run_id, **event}) + langfuse = _get_langfuse() + trace = langfuse.trace( + name="fce", id=run_id, + metadata={"flight_id": req.flight_id, "scenario": scenario_manager.active_id}, + ) if langfuse else None + try: async with connect_servers(["shared", "ops", "passenger"]) as mcp: - result = await run_fce(req.flight_id, mcp, on_event=on_event) + result = await run_fce(req.flight_id, mcp, on_event=on_event, trace=trace) runs[run_id] = {"status": "completed", "agent": "fce", "result": result} + if trace: + trace.update(output={"status": "completed", "type": result.get("type", "")}) logger.info("agent_complete agent=fce run_id=%s flight=%s", run_id, req.flight_id) except Exception as e: runs[run_id] = {"status": "error", "agent": "fce", "error": str(e)} + if trace: + trace.update(output={"status": "error", "error": str(e)}) logger.error("agent_error agent=fce run_id=%s error=%s", run_id, e) + finally: + if langfuse: + langfuse.flush() logger.info("agent_start agent=fce run_id=%s flight=%s", run_id, req.flight_id) asyncio.create_task(_run()) @@ -169,14 +199,27 @@ async def trigger_handover(req: HandoverRequest): async def on_event(event): await event_hub.broadcast({"run_id": run_id, **event}) + langfuse = _get_langfuse() + trace = langfuse.trace( + name="handover", id=run_id, + metadata={"hubs": req.hubs, "scenario": scenario_manager.active_id}, + ) if langfuse else None + try: async with connect_servers(["shared", "ops"]) as mcp: - result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event) + result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event, trace=trace) runs[run_id] = {"status": "completed", "agent": "handover", "result": result} + if trace: + trace.update(output={"status": "completed"}) logger.info("agent_complete agent=handover run_id=%s hubs=%s", run_id, req.hubs) except Exception as e: runs[run_id] = {"status": "error", "agent": "handover", "error": str(e)} + if trace: + trace.update(output={"status": "error", "error": str(e)}) logger.error("agent_error agent=handover run_id=%s error=%s", run_id, e) + finally: + if langfuse: + langfuse.flush() logger.info("agent_start agent=handover run_id=%s hubs=%s", run_id, req.hubs) asyncio.create_task(_run()) diff --git a/ctrl/k8s/base/configmap.yaml b/ctrl/k8s/base/configmap.yaml index d13a1e2..5f2a8c5 100644 --- a/ctrl/k8s/base/configmap.yaml +++ b/ctrl/k8s/base/configmap.yaml @@ -9,4 +9,6 @@ data: GROQ_API_KEY: "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" GROQ_MODEL: "llama-3.3-70b-versatile" LANGFUSE_HOST: "http://langfuse:3000" + LANGFUSE_PUBLIC_KEY: "pk-lf-34928c5c-8525-4e1f-98be-d46c5bda6785" + LANGFUSE_SECRET_KEY: "sk-lf-e1f1e753-c9b9-476b-9caa-3fd6bb911b56" KONG_PROXY_URL: ""