phase 8
This commit is contained in:
111
detect/graph.py
111
detect/graph.py
@@ -20,6 +20,7 @@ from detect.stages.scene_filter import scene_filter
|
||||
from detect.stages.yolo_detector import detect_objects
|
||||
from detect.stages.ocr_stage import run_ocr
|
||||
from detect.stages.brand_resolver import resolve_brands
|
||||
from detect.tracing import trace_node, flush as flush_traces
|
||||
|
||||
INFERENCE_URL = os.environ.get("INFERENCE_URL") # None = local mode
|
||||
|
||||
@@ -66,9 +67,11 @@ def _emit_transition(state: DetectState, node: str, status: str):
|
||||
def node_extract_frames(state: DetectState) -> dict:
|
||||
_emit_transition(state, "extract_frames", "running")
|
||||
|
||||
profile = _get_profile(state)
|
||||
config = profile.frame_extraction_config()
|
||||
frames = extract_frames(state["video_path"], config, job_id=state.get("job_id"))
|
||||
with trace_node(state, "extract_frames") as span:
|
||||
profile = _get_profile(state)
|
||||
config = profile.frame_extraction_config()
|
||||
frames = extract_frames(state["video_path"], config, job_id=state.get("job_id"))
|
||||
span.set_output({"frames_extracted": len(frames)})
|
||||
|
||||
_emit_transition(state, "extract_frames", "done")
|
||||
return {"frames": frames, "stats": PipelineStats(frames_extracted=len(frames))}
|
||||
@@ -77,10 +80,12 @@ def node_extract_frames(state: DetectState) -> dict:
|
||||
def node_filter_scenes(state: DetectState) -> dict:
|
||||
_emit_transition(state, "filter_scenes", "running")
|
||||
|
||||
profile = _get_profile(state)
|
||||
config = profile.scene_filter_config()
|
||||
frames = state.get("frames", [])
|
||||
kept = scene_filter(frames, config, job_id=state.get("job_id"))
|
||||
with trace_node(state, "filter_scenes") as span:
|
||||
profile = _get_profile(state)
|
||||
config = profile.scene_filter_config()
|
||||
frames = state.get("frames", [])
|
||||
kept = scene_filter(frames, config, job_id=state.get("job_id"))
|
||||
span.set_output({"frames_in": len(frames), "frames_kept": len(kept)})
|
||||
|
||||
stats = state.get("stats", PipelineStats())
|
||||
stats.frames_after_scene_filter = len(kept)
|
||||
@@ -92,15 +97,18 @@ def node_filter_scenes(state: DetectState) -> dict:
|
||||
def node_detect_objects(state: DetectState) -> dict:
|
||||
_emit_transition(state, "detect_objects", "running")
|
||||
|
||||
profile = _get_profile(state)
|
||||
config = profile.detection_config()
|
||||
frames = state.get("filtered_frames", [])
|
||||
job_id = state.get("job_id")
|
||||
with trace_node(state, "detect_objects") as span:
|
||||
profile = _get_profile(state)
|
||||
config = profile.detection_config()
|
||||
frames = state.get("filtered_frames", [])
|
||||
job_id = state.get("job_id")
|
||||
|
||||
all_boxes = detect_objects(frames, config, inference_url=INFERENCE_URL, job_id=job_id)
|
||||
all_boxes = detect_objects(frames, config, inference_url=INFERENCE_URL, job_id=job_id)
|
||||
total_regions = sum(len(boxes) for boxes in all_boxes.values())
|
||||
span.set_output({"frames": len(frames), "regions_detected": total_regions})
|
||||
|
||||
stats = state.get("stats", PipelineStats())
|
||||
stats.regions_detected = sum(len(boxes) for boxes in all_boxes.values())
|
||||
stats.regions_detected = total_regions
|
||||
|
||||
_emit_transition(state, "detect_objects", "done")
|
||||
return {"boxes_by_frame": all_boxes, "stats": stats}
|
||||
@@ -109,13 +117,15 @@ def node_detect_objects(state: DetectState) -> dict:
|
||||
def node_run_ocr(state: DetectState) -> dict:
|
||||
_emit_transition(state, "run_ocr", "running")
|
||||
|
||||
profile = _get_profile(state)
|
||||
config = profile.ocr_config()
|
||||
frames = state.get("filtered_frames", [])
|
||||
boxes = state.get("boxes_by_frame", {})
|
||||
job_id = state.get("job_id")
|
||||
with trace_node(state, "run_ocr") as span:
|
||||
profile = _get_profile(state)
|
||||
config = profile.ocr_config()
|
||||
frames = state.get("filtered_frames", [])
|
||||
boxes = state.get("boxes_by_frame", {})
|
||||
job_id = state.get("job_id")
|
||||
|
||||
candidates = run_ocr(frames, boxes, config, inference_url=INFERENCE_URL, job_id=job_id)
|
||||
candidates = run_ocr(frames, boxes, config, inference_url=INFERENCE_URL, job_id=job_id)
|
||||
span.set_output({"regions_in": sum(len(b) for b in boxes.values()), "text_candidates": len(candidates)})
|
||||
|
||||
stats = state.get("stats", PipelineStats())
|
||||
stats.regions_resolved_by_ocr = len(candidates)
|
||||
@@ -127,16 +137,18 @@ def node_run_ocr(state: DetectState) -> dict:
|
||||
def node_match_brands(state: DetectState) -> dict:
|
||||
_emit_transition(state, "match_brands", "running")
|
||||
|
||||
profile = _get_profile(state)
|
||||
dictionary = profile.brand_dictionary()
|
||||
resolver_config = profile.resolver_config()
|
||||
candidates = state.get("text_candidates", [])
|
||||
job_id = state.get("job_id")
|
||||
with trace_node(state, "match_brands") as span:
|
||||
profile = _get_profile(state)
|
||||
dictionary = profile.brand_dictionary()
|
||||
resolver_config = profile.resolver_config()
|
||||
candidates = state.get("text_candidates", [])
|
||||
job_id = state.get("job_id")
|
||||
|
||||
matched, unresolved = resolve_brands(
|
||||
candidates, dictionary, resolver_config,
|
||||
content_type=profile.name, job_id=job_id,
|
||||
)
|
||||
matched, unresolved = resolve_brands(
|
||||
candidates, dictionary, resolver_config,
|
||||
content_type=profile.name, job_id=job_id,
|
||||
)
|
||||
span.set_output({"matched": len(matched), "unresolved": len(unresolved)})
|
||||
|
||||
_emit_transition(state, "match_brands", "done")
|
||||
return {"detections": matched, "unresolved_candidates": unresolved}
|
||||
@@ -144,37 +156,48 @@ def node_match_brands(state: DetectState) -> dict:
|
||||
|
||||
def node_escalate_vlm(state: DetectState) -> dict:
|
||||
_emit_transition(state, "escalate_vlm", "running")
|
||||
job_id = state.get("job_id")
|
||||
emit.log(job_id, "VLMLocal", "INFO", "Stub: VLM escalation not yet implemented")
|
||||
|
||||
with trace_node(state, "escalate_vlm") as span:
|
||||
job_id = state.get("job_id")
|
||||
emit.log(job_id, "VLMLocal", "INFO", "Stub: VLM escalation not yet implemented")
|
||||
span.set_output({"stub": True})
|
||||
|
||||
_emit_transition(state, "escalate_vlm", "done")
|
||||
return {}
|
||||
|
||||
|
||||
def node_escalate_cloud(state: DetectState) -> dict:
|
||||
_emit_transition(state, "escalate_cloud", "running")
|
||||
job_id = state.get("job_id")
|
||||
emit.log(job_id, "CloudLLM", "INFO", "Stub: cloud LLM escalation not yet implemented")
|
||||
|
||||
with trace_node(state, "escalate_cloud") as span:
|
||||
job_id = state.get("job_id")
|
||||
emit.log(job_id, "CloudLLM", "INFO", "Stub: cloud LLM escalation not yet implemented")
|
||||
span.set_output({"stub": True})
|
||||
|
||||
_emit_transition(state, "escalate_cloud", "done")
|
||||
return {}
|
||||
|
||||
|
||||
def node_compile_report(state: DetectState) -> dict:
|
||||
_emit_transition(state, "compile_report", "running")
|
||||
job_id = state.get("job_id")
|
||||
|
||||
profile = _get_profile(state)
|
||||
detections = state.get("detections", [])
|
||||
report = profile.aggregate(detections)
|
||||
report.video_source = state.get("video_path", "")
|
||||
with trace_node(state, "compile_report") as span:
|
||||
job_id = state.get("job_id")
|
||||
profile = _get_profile(state)
|
||||
detections = state.get("detections", [])
|
||||
report = profile.aggregate(detections)
|
||||
report.video_source = state.get("video_path", "")
|
||||
|
||||
emit.log(job_id, "Aggregator", "INFO",
|
||||
f"Report: {len(report.brands)} brands, {len(report.timeline)} detections")
|
||||
emit.job_complete(job_id, {
|
||||
"video_source": report.video_source,
|
||||
"content_type": report.content_type,
|
||||
"brands": {k: {"total_appearances": v.total_appearances} for k, v in report.brands.items()},
|
||||
})
|
||||
emit.log(job_id, "Aggregator", "INFO",
|
||||
f"Report: {len(report.brands)} brands, {len(report.timeline)} detections")
|
||||
emit.job_complete(job_id, {
|
||||
"video_source": report.video_source,
|
||||
"content_type": report.content_type,
|
||||
"brands": {k: {"total_appearances": v.total_appearances} for k, v in report.brands.items()},
|
||||
})
|
||||
span.set_output({"brands": len(report.brands), "detections": len(report.timeline)})
|
||||
|
||||
flush_traces()
|
||||
_emit_transition(state, "compile_report", "done")
|
||||
return {"report": report}
|
||||
|
||||
|
||||
131
detect/tracing.py
Normal file
131
detect/tracing.py
Normal file
@@ -0,0 +1,131 @@
|
||||
"""
|
||||
Langfuse tracing for the detection pipeline.
|
||||
|
||||
Provides span helpers that graph nodes use to record timing, frame counts,
|
||||
and stage-level metadata. The Langfuse client is optional — if not configured
|
||||
(no LANGFUSE_SECRET_KEY), tracing is a no-op.
|
||||
|
||||
Usage in graph nodes:
|
||||
from detect.tracing import trace_node
|
||||
|
||||
def node_extract_frames(state):
|
||||
with trace_node(state, "extract_frames") as span:
|
||||
...
|
||||
span.set_output({"frames": len(frames)})
|
||||
return {...}
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_client = None
|
||||
_enabled: bool | None = None
|
||||
|
||||
|
||||
def _get_client():
|
||||
"""Lazy-init Langfuse client. Returns None if not configured."""
|
||||
global _client, _enabled
|
||||
if _enabled is False:
|
||||
return None
|
||||
if _client is not None:
|
||||
return _client
|
||||
|
||||
secret = os.environ.get("LANGFUSE_SECRET_KEY", "")
|
||||
if not secret:
|
||||
_enabled = False
|
||||
logger.info("Langfuse not configured (no LANGFUSE_SECRET_KEY), tracing disabled")
|
||||
return None
|
||||
|
||||
try:
|
||||
from langfuse import Langfuse
|
||||
_client = Langfuse()
|
||||
_enabled = True
|
||||
logger.info("Langfuse tracing enabled")
|
||||
return _client
|
||||
except Exception as e:
|
||||
_enabled = False
|
||||
logger.warning("Langfuse init failed: %s — tracing disabled", e)
|
||||
return None
|
||||
|
||||
|
||||
@dataclass
|
||||
class SpanContext:
|
||||
"""Wraps a Langfuse span with convenience methods."""
|
||||
_span: object | None = None
|
||||
_start: float = field(default_factory=time.monotonic)
|
||||
metadata: dict = field(default_factory=dict)
|
||||
|
||||
def set_output(self, output: dict) -> None:
|
||||
self.metadata.update(output)
|
||||
|
||||
def set_error(self, error: str) -> None:
|
||||
self.metadata["error"] = error
|
||||
|
||||
def _finish(self, status: str = "ok") -> None:
|
||||
elapsed = time.monotonic() - self._start
|
||||
self.metadata["duration_seconds"] = round(elapsed, 3)
|
||||
self.metadata["status"] = status
|
||||
|
||||
if self._span is not None:
|
||||
try:
|
||||
self._span.update(
|
||||
output=self.metadata,
|
||||
level="ERROR" if status == "error" else "DEFAULT",
|
||||
)
|
||||
self._span.end()
|
||||
except Exception as e:
|
||||
logger.debug("Failed to end Langfuse span: %s", e)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def trace_node(state: dict, node_name: str):
|
||||
"""
|
||||
Context manager that creates a Langfuse span for a pipeline node.
|
||||
|
||||
Usage:
|
||||
with trace_node(state, "extract_frames") as span:
|
||||
frames = do_work()
|
||||
span.set_output({"frames": len(frames)})
|
||||
"""
|
||||
job_id = state.get("job_id", "unknown")
|
||||
profile = state.get("profile_name", "")
|
||||
client = _get_client()
|
||||
|
||||
span_obj = None
|
||||
if client is not None:
|
||||
try:
|
||||
trace = client.trace(
|
||||
name=f"detect:{job_id}",
|
||||
session_id=job_id,
|
||||
metadata={"profile": profile},
|
||||
)
|
||||
span_obj = trace.span(
|
||||
name=node_name,
|
||||
input={"job_id": job_id, "profile": profile},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to create Langfuse span: %s", e)
|
||||
|
||||
ctx = SpanContext(_span=span_obj)
|
||||
try:
|
||||
yield ctx
|
||||
ctx._finish("ok")
|
||||
except Exception:
|
||||
ctx._finish("error")
|
||||
raise
|
||||
|
||||
|
||||
def flush():
|
||||
"""Flush pending Langfuse events. Call at pipeline end."""
|
||||
if _client is not None:
|
||||
try:
|
||||
_client.flush()
|
||||
except Exception as e:
|
||||
logger.debug("Langfuse flush failed: %s", e)
|
||||
Reference in New Issue
Block a user