From 1b1b1383a17e58ee9104e4d0fc17d95450583f60 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Wed, 15 Apr 2026 23:34:47 -0300 Subject: [PATCH] fix Langfuse SDK: use start_as_current_observation API --- agents/fce.py | 24 ++++++++++++++++-------- agents/handover.py | 24 ++++++++++++++++-------- api/main.py | 44 ++++++++++++++++++++++++-------------------- 3 files changed, 56 insertions(+), 36 deletions(-) diff --git a/agents/fce.py b/agents/fce.py index ef4f35c..8bf58f6 100644 --- a/agents/fce.py +++ b/agents/fce.py @@ -18,7 +18,7 @@ async def run_fce( flight_id: str, mcp: MCPMultiClient, on_event: Any = None, - trace: Any = None, + langfuse: Any = None, ) -> dict: run_start = time.time() errors = [] @@ -67,27 +67,35 @@ async def run_fce( async def _call(server, tool, args, is_live=False, timeout=15.0): t = time.time() - span = trace.span(name=tool, input=args, metadata={"server": server, "is_live": is_live}) if trace else None + ctx = langfuse.start_as_current_observation( + name=tool, as_type="tool", + input=args, metadata={"server": server, "is_live": is_live}, + ) if langfuse else None + if ctx: + ctx.__enter__() try: result = await asyncio.wait_for( mcp.call_tool(server, tool, args), timeout=timeout, ) lat = int((time.time() - t) * 1000) - if span: - span.end(output=result, metadata={"latency_ms": lat}) + if ctx: + langfuse.update_current_span(output=result, metadata={"latency_ms": lat}) + ctx.__exit__(None, None, None) await emit("tool_call_end", tool=tool, latency_ms=lat, is_live=is_live) return result except asyncio.TimeoutError: lat = int((time.time() - t) * 1000) - if span: - span.end(output={"error": "timeout"}, level="ERROR") + if ctx: + langfuse.update_current_span(output={"error": "timeout"}, level="ERROR") + ctx.__exit__(None, None, None) await emit("tool_call_error", tool=tool, error="timeout", latency_ms=lat) errors.append(f"{tool}: timeout after {timeout}s") return None except Exception as e: lat = int((time.time() - t) * 1000) - if span: - span.end(output={"error": str(e)}, level="ERROR") + if ctx: + langfuse.update_current_span(output={"error": str(e)}, level="ERROR") + ctx.__exit__(None, None, None) await emit("tool_call_error", tool=tool, error=str(e), latency_ms=lat) errors.append(f"{tool}: {e}") return None diff --git a/agents/handover.py b/agents/handover.py index 535d4cd..110f3e5 100644 --- a/agents/handover.py +++ b/agents/handover.py @@ -26,7 +26,7 @@ async def run_handover( hubs: list[str] | None = None, mcp: MCPMultiClient | None = None, on_event: Any = None, - trace: Any = None, + langfuse: Any = None, ) -> dict: target_hubs = hubs or ALL_HUBS run_start = time.time() @@ -45,27 +45,35 @@ async def run_handover( async def _call(server, tool, args, is_live=False, timeout=15.0): t = time.time() - span = trace.span(name=tool, input=args, metadata={"server": server, "is_live": is_live}) if trace else None + ctx = langfuse.start_as_current_observation( + name=tool, as_type="tool", + input=args, metadata={"server": server, "is_live": is_live}, + ) if langfuse else None + if ctx: + ctx.__enter__() try: result = await asyncio.wait_for( mcp.call_tool(server, tool, args), timeout=timeout, ) lat = int((time.time() - t) * 1000) - if span: - span.end(output=result, metadata={"latency_ms": lat}) + if ctx: + langfuse.update_current_span(output=result, metadata={"latency_ms": lat}) + ctx.__exit__(None, None, None) await emit("tool_call_end", tool=tool, latency_ms=lat, is_live=is_live) return result except asyncio.TimeoutError: lat = int((time.time() - t) * 1000) - if span: - span.end(output={"error": "timeout"}, level="ERROR") + if ctx: + langfuse.update_current_span(output={"error": "timeout"}, level="ERROR") + ctx.__exit__(None, None, None) await emit("tool_call_error", tool=tool, error="timeout", latency_ms=lat) errors.append(f"{tool}: timeout after {timeout}s") return None except Exception as e: lat = int((time.time() - t) * 1000) - if span: - span.end(output={"error": str(e)}, level="ERROR") + if ctx: + langfuse.update_current_span(output={"error": str(e)}, level="ERROR") + ctx.__exit__(None, None, None) await emit("tool_call_error", tool=tool, error=str(e), latency_ms=lat) errors.append(f"{tool}: {e}") return None diff --git a/api/main.py b/api/main.py index 0b3705e..4006d6e 100644 --- a/api/main.py +++ b/api/main.py @@ -154,22 +154,24 @@ async def trigger_fce(req: FCERequest): await event_hub.broadcast({"run_id": run_id, **event}) langfuse = _get_langfuse() - trace = langfuse.trace( - name="fce", id=run_id, - metadata={"flight_id": req.flight_id, "scenario": scenario_manager.active_id}, - ) if langfuse else None try: - async with connect_servers(["shared", "ops", "passenger"]) as mcp: - result = await run_fce(req.flight_id, mcp, on_event=on_event, trace=trace) + if langfuse: + with langfuse.start_as_current_observation( + name="fce", as_type="agent", + input={"flight_id": req.flight_id}, + metadata={"run_id": run_id, "scenario": scenario_manager.active_id}, + ): + async with connect_servers(["shared", "ops", "passenger"]) as mcp: + result = await run_fce(req.flight_id, mcp, on_event=on_event, langfuse=langfuse) + langfuse.set_current_trace_io(output=result) + else: + async with connect_servers(["shared", "ops", "passenger"]) as mcp: + result = await run_fce(req.flight_id, mcp, on_event=on_event) runs[run_id] = {"status": "completed", "agent": "fce", "result": result} - if trace: - trace.update(output={"status": "completed", "type": result.get("type", "")}) logger.info("agent_complete agent=fce run_id=%s flight=%s", run_id, req.flight_id) except Exception as e: runs[run_id] = {"status": "error", "agent": "fce", "error": str(e)} - if trace: - trace.update(output={"status": "error", "error": str(e)}) logger.error("agent_error agent=fce run_id=%s error=%s", run_id, e) finally: if langfuse: @@ -200,22 +202,24 @@ async def trigger_handover(req: HandoverRequest): await event_hub.broadcast({"run_id": run_id, **event}) langfuse = _get_langfuse() - trace = langfuse.trace( - name="handover", id=run_id, - metadata={"hubs": req.hubs, "scenario": scenario_manager.active_id}, - ) if langfuse else None try: - async with connect_servers(["shared", "ops"]) as mcp: - result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event, trace=trace) + if langfuse: + with langfuse.start_as_current_observation( + name="handover", as_type="agent", + input={"hubs": req.hubs}, + metadata={"run_id": run_id, "scenario": scenario_manager.active_id}, + ): + async with connect_servers(["shared", "ops"]) as mcp: + result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event, langfuse=langfuse) + langfuse.set_current_trace_io(output=result) + else: + async with connect_servers(["shared", "ops"]) as mcp: + result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event) runs[run_id] = {"status": "completed", "agent": "handover", "result": result} - if trace: - trace.update(output={"status": "completed"}) logger.info("agent_complete agent=handover run_id=%s hubs=%s", run_id, req.hubs) except Exception as e: runs[run_id] = {"status": "error", "agent": "handover", "error": str(e)} - if trace: - trace.update(output={"status": "error", "error": str(e)}) logger.error("agent_error agent=handover run_id=%s error=%s", run_id, e) finally: if langfuse: