132 lines
3.6 KiB
Python
132 lines
3.6 KiB
Python
"""
|
|
Langfuse tracing for the detection pipeline.
|
|
|
|
Provides span helpers that graph nodes use to record timing, frame counts,
|
|
and stage-level metadata. The Langfuse client is optional — if not configured
|
|
(no LANGFUSE_SECRET_KEY), tracing is a no-op.
|
|
|
|
Usage in graph nodes:
|
|
from core.detect.tracing import trace_node
|
|
|
|
def node_extract_frames(state):
|
|
with trace_node(state, "extract_frames") as span:
|
|
...
|
|
span.set_output({"frames": len(frames)})
|
|
return {...}
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass, field
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_client = None
|
|
_enabled: bool | None = None
|
|
|
|
|
|
def _get_client():
|
|
"""Lazy-init Langfuse client. Returns None if not configured."""
|
|
global _client, _enabled
|
|
if _enabled is False:
|
|
return None
|
|
if _client is not None:
|
|
return _client
|
|
|
|
secret = os.environ.get("LANGFUSE_SECRET_KEY", "")
|
|
if not secret:
|
|
_enabled = False
|
|
logger.info("Langfuse not configured (no LANGFUSE_SECRET_KEY), tracing disabled")
|
|
return None
|
|
|
|
try:
|
|
from langfuse import Langfuse
|
|
_client = Langfuse()
|
|
_enabled = True
|
|
logger.info("Langfuse tracing enabled")
|
|
return _client
|
|
except Exception as e:
|
|
_enabled = False
|
|
logger.warning("Langfuse init failed: %s — tracing disabled", e)
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class SpanContext:
|
|
"""Wraps a Langfuse span with convenience methods."""
|
|
_span: object | None = None
|
|
_start: float = field(default_factory=time.monotonic)
|
|
metadata: dict = field(default_factory=dict)
|
|
|
|
def set_output(self, output: dict) -> None:
|
|
self.metadata.update(output)
|
|
|
|
def set_error(self, error: str) -> None:
|
|
self.metadata["error"] = error
|
|
|
|
def _finish(self, status: str = "ok") -> None:
|
|
elapsed = time.monotonic() - self._start
|
|
self.metadata["duration_seconds"] = round(elapsed, 3)
|
|
self.metadata["status"] = status
|
|
|
|
if self._span is not None:
|
|
try:
|
|
self._span.update(
|
|
output=self.metadata,
|
|
level="ERROR" if status == "error" else "DEFAULT",
|
|
)
|
|
self._span.end()
|
|
except Exception as e:
|
|
logger.debug("Failed to end Langfuse span: %s", e)
|
|
|
|
|
|
@contextmanager
|
|
def trace_node(state: dict, node_name: str):
|
|
"""
|
|
Context manager that creates a Langfuse span for a pipeline node.
|
|
|
|
Usage:
|
|
with trace_node(state, "extract_frames") as span:
|
|
frames = do_work()
|
|
span.set_output({"frames": len(frames)})
|
|
"""
|
|
job_id = state.get("job_id", "unknown")
|
|
profile = state.get("profile_name", "")
|
|
client = _get_client()
|
|
|
|
span_obj = None
|
|
if client is not None:
|
|
try:
|
|
trace = client.trace(
|
|
name=f"detect:{job_id}",
|
|
session_id=job_id,
|
|
metadata={"profile": profile},
|
|
)
|
|
span_obj = trace.span(
|
|
name=node_name,
|
|
input={"job_id": job_id, "profile": profile},
|
|
)
|
|
except Exception as e:
|
|
logger.debug("Failed to create Langfuse span: %s", e)
|
|
|
|
ctx = SpanContext(_span=span_obj)
|
|
try:
|
|
yield ctx
|
|
ctx._finish("ok")
|
|
except Exception:
|
|
ctx._finish("error")
|
|
raise
|
|
|
|
|
|
def flush():
|
|
"""Flush pending Langfuse events. Call at pipeline end."""
|
|
if _client is not None:
|
|
try:
|
|
_client.flush()
|
|
except Exception as e:
|
|
logger.debug("Langfuse flush failed: %s", e)
|