fix Langfuse SDK: use start_as_current_observation API

This commit is contained in:
2026-04-15 23:34:47 -03:00
parent f03e47d9ba
commit 1b1b1383a1
3 changed files with 56 additions and 36 deletions

View File

@@ -18,7 +18,7 @@ async def run_fce(
flight_id: str, flight_id: str,
mcp: MCPMultiClient, mcp: MCPMultiClient,
on_event: Any = None, on_event: Any = None,
trace: Any = None, langfuse: Any = None,
) -> dict: ) -> dict:
run_start = time.time() run_start = time.time()
errors = [] errors = []
@@ -67,27 +67,35 @@ async def run_fce(
async def _call(server, tool, args, is_live=False, timeout=15.0): async def _call(server, tool, args, is_live=False, timeout=15.0):
t = time.time() t = time.time()
span = trace.span(name=tool, input=args, metadata={"server": server, "is_live": is_live}) if trace else None ctx = langfuse.start_as_current_observation(
name=tool, as_type="tool",
input=args, metadata={"server": server, "is_live": is_live},
) if langfuse else None
if ctx:
ctx.__enter__()
try: try:
result = await asyncio.wait_for( result = await asyncio.wait_for(
mcp.call_tool(server, tool, args), timeout=timeout, mcp.call_tool(server, tool, args), timeout=timeout,
) )
lat = int((time.time() - t) * 1000) lat = int((time.time() - t) * 1000)
if span: if ctx:
span.end(output=result, metadata={"latency_ms": lat}) langfuse.update_current_span(output=result, metadata={"latency_ms": lat})
ctx.__exit__(None, None, None)
await emit("tool_call_end", tool=tool, latency_ms=lat, is_live=is_live) await emit("tool_call_end", tool=tool, latency_ms=lat, is_live=is_live)
return result return result
except asyncio.TimeoutError: except asyncio.TimeoutError:
lat = int((time.time() - t) * 1000) lat = int((time.time() - t) * 1000)
if span: if ctx:
span.end(output={"error": "timeout"}, level="ERROR") langfuse.update_current_span(output={"error": "timeout"}, level="ERROR")
ctx.__exit__(None, None, None)
await emit("tool_call_error", tool=tool, error="timeout", latency_ms=lat) await emit("tool_call_error", tool=tool, error="timeout", latency_ms=lat)
errors.append(f"{tool}: timeout after {timeout}s") errors.append(f"{tool}: timeout after {timeout}s")
return None return None
except Exception as e: except Exception as e:
lat = int((time.time() - t) * 1000) lat = int((time.time() - t) * 1000)
if span: if ctx:
span.end(output={"error": str(e)}, level="ERROR") langfuse.update_current_span(output={"error": str(e)}, level="ERROR")
ctx.__exit__(None, None, None)
await emit("tool_call_error", tool=tool, error=str(e), latency_ms=lat) await emit("tool_call_error", tool=tool, error=str(e), latency_ms=lat)
errors.append(f"{tool}: {e}") errors.append(f"{tool}: {e}")
return None return None

View File

@@ -26,7 +26,7 @@ async def run_handover(
hubs: list[str] | None = None, hubs: list[str] | None = None,
mcp: MCPMultiClient | None = None, mcp: MCPMultiClient | None = None,
on_event: Any = None, on_event: Any = None,
trace: Any = None, langfuse: Any = None,
) -> dict: ) -> dict:
target_hubs = hubs or ALL_HUBS target_hubs = hubs or ALL_HUBS
run_start = time.time() run_start = time.time()
@@ -45,27 +45,35 @@ async def run_handover(
async def _call(server, tool, args, is_live=False, timeout=15.0): async def _call(server, tool, args, is_live=False, timeout=15.0):
t = time.time() t = time.time()
span = trace.span(name=tool, input=args, metadata={"server": server, "is_live": is_live}) if trace else None ctx = langfuse.start_as_current_observation(
name=tool, as_type="tool",
input=args, metadata={"server": server, "is_live": is_live},
) if langfuse else None
if ctx:
ctx.__enter__()
try: try:
result = await asyncio.wait_for( result = await asyncio.wait_for(
mcp.call_tool(server, tool, args), timeout=timeout, mcp.call_tool(server, tool, args), timeout=timeout,
) )
lat = int((time.time() - t) * 1000) lat = int((time.time() - t) * 1000)
if span: if ctx:
span.end(output=result, metadata={"latency_ms": lat}) langfuse.update_current_span(output=result, metadata={"latency_ms": lat})
ctx.__exit__(None, None, None)
await emit("tool_call_end", tool=tool, latency_ms=lat, is_live=is_live) await emit("tool_call_end", tool=tool, latency_ms=lat, is_live=is_live)
return result return result
except asyncio.TimeoutError: except asyncio.TimeoutError:
lat = int((time.time() - t) * 1000) lat = int((time.time() - t) * 1000)
if span: if ctx:
span.end(output={"error": "timeout"}, level="ERROR") langfuse.update_current_span(output={"error": "timeout"}, level="ERROR")
ctx.__exit__(None, None, None)
await emit("tool_call_error", tool=tool, error="timeout", latency_ms=lat) await emit("tool_call_error", tool=tool, error="timeout", latency_ms=lat)
errors.append(f"{tool}: timeout after {timeout}s") errors.append(f"{tool}: timeout after {timeout}s")
return None return None
except Exception as e: except Exception as e:
lat = int((time.time() - t) * 1000) lat = int((time.time() - t) * 1000)
if span: if ctx:
span.end(output={"error": str(e)}, level="ERROR") langfuse.update_current_span(output={"error": str(e)}, level="ERROR")
ctx.__exit__(None, None, None)
await emit("tool_call_error", tool=tool, error=str(e), latency_ms=lat) await emit("tool_call_error", tool=tool, error=str(e), latency_ms=lat)
errors.append(f"{tool}: {e}") errors.append(f"{tool}: {e}")
return None return None

View File

@@ -154,22 +154,24 @@ async def trigger_fce(req: FCERequest):
await event_hub.broadcast({"run_id": run_id, **event}) await event_hub.broadcast({"run_id": run_id, **event})
langfuse = _get_langfuse() langfuse = _get_langfuse()
trace = langfuse.trace(
name="fce", id=run_id,
metadata={"flight_id": req.flight_id, "scenario": scenario_manager.active_id},
) if langfuse else None
try: try:
if langfuse:
with langfuse.start_as_current_observation(
name="fce", as_type="agent",
input={"flight_id": req.flight_id},
metadata={"run_id": run_id, "scenario": scenario_manager.active_id},
):
async with connect_servers(["shared", "ops", "passenger"]) as mcp: async with connect_servers(["shared", "ops", "passenger"]) as mcp:
result = await run_fce(req.flight_id, mcp, on_event=on_event, trace=trace) result = await run_fce(req.flight_id, mcp, on_event=on_event, langfuse=langfuse)
langfuse.set_current_trace_io(output=result)
else:
async with connect_servers(["shared", "ops", "passenger"]) as mcp:
result = await run_fce(req.flight_id, mcp, on_event=on_event)
runs[run_id] = {"status": "completed", "agent": "fce", "result": result} runs[run_id] = {"status": "completed", "agent": "fce", "result": result}
if trace:
trace.update(output={"status": "completed", "type": result.get("type", "")})
logger.info("agent_complete agent=fce run_id=%s flight=%s", run_id, req.flight_id) logger.info("agent_complete agent=fce run_id=%s flight=%s", run_id, req.flight_id)
except Exception as e: except Exception as e:
runs[run_id] = {"status": "error", "agent": "fce", "error": str(e)} runs[run_id] = {"status": "error", "agent": "fce", "error": str(e)}
if trace:
trace.update(output={"status": "error", "error": str(e)})
logger.error("agent_error agent=fce run_id=%s error=%s", run_id, e) logger.error("agent_error agent=fce run_id=%s error=%s", run_id, e)
finally: finally:
if langfuse: if langfuse:
@@ -200,22 +202,24 @@ async def trigger_handover(req: HandoverRequest):
await event_hub.broadcast({"run_id": run_id, **event}) await event_hub.broadcast({"run_id": run_id, **event})
langfuse = _get_langfuse() langfuse = _get_langfuse()
trace = langfuse.trace(
name="handover", id=run_id,
metadata={"hubs": req.hubs, "scenario": scenario_manager.active_id},
) if langfuse else None
try: try:
if langfuse:
with langfuse.start_as_current_observation(
name="handover", as_type="agent",
input={"hubs": req.hubs},
metadata={"run_id": run_id, "scenario": scenario_manager.active_id},
):
async with connect_servers(["shared", "ops"]) as mcp: async with connect_servers(["shared", "ops"]) as mcp:
result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event, trace=trace) result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event, langfuse=langfuse)
langfuse.set_current_trace_io(output=result)
else:
async with connect_servers(["shared", "ops"]) as mcp:
result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event)
runs[run_id] = {"status": "completed", "agent": "handover", "result": result} runs[run_id] = {"status": "completed", "agent": "handover", "result": result}
if trace:
trace.update(output={"status": "completed"})
logger.info("agent_complete agent=handover run_id=%s hubs=%s", run_id, req.hubs) logger.info("agent_complete agent=handover run_id=%s hubs=%s", run_id, req.hubs)
except Exception as e: except Exception as e:
runs[run_id] = {"status": "error", "agent": "handover", "error": str(e)} runs[run_id] = {"status": "error", "agent": "handover", "error": str(e)}
if trace:
trace.update(output={"status": "error", "error": str(e)})
logger.error("agent_error agent=handover run_id=%s error=%s", run_id, e) logger.error("agent_error agent=handover run_id=%s error=%s", run_id, e)
finally: finally:
if langfuse: if langfuse: