move irrop to def, upgrade Langfuse to v3 with ClickHouse, fix SDK v4 instrumentation

This commit is contained in:
2026-04-16 00:01:21 -03:00
parent 1b1b1383a1
commit 004c4a9e65
14 changed files with 131 additions and 156 deletions

View File

@@ -153,29 +153,29 @@ async def trigger_fce(req: FCERequest):
async def on_event(event):
await event_hub.broadcast({"run_id": run_id, **event})
langfuse = _get_langfuse()
lf = _get_langfuse()
try:
if langfuse:
with langfuse.start_as_current_observation(
name="fce", as_type="agent",
input={"flight_id": req.flight_id},
metadata={"run_id": run_id, "scenario": scenario_manager.active_id},
):
async with connect_servers(["shared", "ops", "passenger"]) as mcp:
result = await run_fce(req.flight_id, mcp, on_event=on_event, langfuse=langfuse)
langfuse.set_current_trace_io(output=result)
else:
async with connect_servers(["shared", "ops", "passenger"]) as mcp:
result = await run_fce(req.flight_id, mcp, on_event=on_event)
ctx = lf.start_as_current_observation(
name="fce", as_type="agent",
input={"flight_id": req.flight_id},
metadata={"run_id": run_id, "scenario": scenario_manager.active_id},
) if lf else None
if ctx:
ctx.__enter__()
async with connect_servers(["shared", "ops", "passenger"]) as mcp:
result = await run_fce(req.flight_id, mcp, on_event=on_event, lf=lf)
runs[run_id] = {"status": "completed", "agent": "fce", "result": result}
if ctx:
lf.set_current_trace_io(output=result)
ctx.__exit__(None, None, None)
logger.info("agent_complete agent=fce run_id=%s flight=%s", run_id, req.flight_id)
except Exception as e:
runs[run_id] = {"status": "error", "agent": "fce", "error": str(e)}
logger.error("agent_error agent=fce run_id=%s error=%s", run_id, e)
finally:
if langfuse:
langfuse.flush()
if lf:
lf.shutdown()
logger.info("agent_start agent=fce run_id=%s flight=%s", run_id, req.flight_id)
asyncio.create_task(_run())
@@ -201,29 +201,29 @@ async def trigger_handover(req: HandoverRequest):
async def on_event(event):
await event_hub.broadcast({"run_id": run_id, **event})
langfuse = _get_langfuse()
lf = _get_langfuse()
try:
if langfuse:
with langfuse.start_as_current_observation(
name="handover", as_type="agent",
input={"hubs": req.hubs},
metadata={"run_id": run_id, "scenario": scenario_manager.active_id},
):
async with connect_servers(["shared", "ops"]) as mcp:
result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event, langfuse=langfuse)
langfuse.set_current_trace_io(output=result)
else:
async with connect_servers(["shared", "ops"]) as mcp:
result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event)
ctx = lf.start_as_current_observation(
name="handover", as_type="agent",
input={"hubs": req.hubs},
metadata={"run_id": run_id, "scenario": scenario_manager.active_id},
) if lf else None
if ctx:
ctx.__enter__()
async with connect_servers(["shared", "ops"]) as mcp:
result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event, lf=lf)
runs[run_id] = {"status": "completed", "agent": "handover", "result": result}
if ctx:
lf.set_current_trace_io(output=result)
ctx.__exit__(None, None, None)
logger.info("agent_complete agent=handover run_id=%s hubs=%s", run_id, req.hubs)
except Exception as e:
runs[run_id] = {"status": "error", "agent": "handover", "error": str(e)}
logger.error("agent_error agent=handover run_id=%s error=%s", run_id, e)
finally:
if langfuse:
langfuse.flush()
if lf:
lf.shutdown()
logger.info("agent_start agent=handover run_id=%s hubs=%s", run_id, req.hubs)
asyncio.create_task(_run())