From dfa3c125148724fecc942bdd8306081d084ef6e2 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Thu, 26 Mar 2026 01:30:26 -0300 Subject: [PATCH] phase 8 --- detect/graph.py | 111 ++++++---- detect/tracing.py | 131 ++++++++++++ requirements.txt | 3 + tests/detect/manual/test_timeline_cost.py | 196 ++++++++++++++++++ tests/detect/test_tracing.py | 34 +++ ui/detection-app/src/App.vue | 6 + .../src/panels/CostStatsPanel.vue | 123 +++++++++++ ui/detection-app/src/panels/TimelinePanel.vue | 180 ++++++++++++++++ 8 files changed, 740 insertions(+), 44 deletions(-) create mode 100644 detect/tracing.py create mode 100644 tests/detect/manual/test_timeline_cost.py create mode 100644 tests/detect/test_tracing.py create mode 100644 ui/detection-app/src/panels/CostStatsPanel.vue create mode 100644 ui/detection-app/src/panels/TimelinePanel.vue diff --git a/detect/graph.py b/detect/graph.py index ef91009..a60a91d 100644 --- a/detect/graph.py +++ b/detect/graph.py @@ -20,6 +20,7 @@ from detect.stages.scene_filter import scene_filter from detect.stages.yolo_detector import detect_objects from detect.stages.ocr_stage import run_ocr from detect.stages.brand_resolver import resolve_brands +from detect.tracing import trace_node, flush as flush_traces INFERENCE_URL = os.environ.get("INFERENCE_URL") # None = local mode @@ -66,9 +67,11 @@ def _emit_transition(state: DetectState, node: str, status: str): def node_extract_frames(state: DetectState) -> dict: _emit_transition(state, "extract_frames", "running") - profile = _get_profile(state) - config = profile.frame_extraction_config() - frames = extract_frames(state["video_path"], config, job_id=state.get("job_id")) + with trace_node(state, "extract_frames") as span: + profile = _get_profile(state) + config = profile.frame_extraction_config() + frames = extract_frames(state["video_path"], config, job_id=state.get("job_id")) + span.set_output({"frames_extracted": len(frames)}) _emit_transition(state, "extract_frames", "done") return {"frames": frames, "stats": PipelineStats(frames_extracted=len(frames))} @@ -77,10 +80,12 @@ def node_extract_frames(state: DetectState) -> dict: def node_filter_scenes(state: DetectState) -> dict: _emit_transition(state, "filter_scenes", "running") - profile = _get_profile(state) - config = profile.scene_filter_config() - frames = state.get("frames", []) - kept = scene_filter(frames, config, job_id=state.get("job_id")) + with trace_node(state, "filter_scenes") as span: + profile = _get_profile(state) + config = profile.scene_filter_config() + frames = state.get("frames", []) + kept = scene_filter(frames, config, job_id=state.get("job_id")) + span.set_output({"frames_in": len(frames), "frames_kept": len(kept)}) stats = state.get("stats", PipelineStats()) stats.frames_after_scene_filter = len(kept) @@ -92,15 +97,18 @@ def node_filter_scenes(state: DetectState) -> dict: def node_detect_objects(state: DetectState) -> dict: _emit_transition(state, "detect_objects", "running") - profile = _get_profile(state) - config = profile.detection_config() - frames = state.get("filtered_frames", []) - job_id = state.get("job_id") + with trace_node(state, "detect_objects") as span: + profile = _get_profile(state) + config = profile.detection_config() + frames = state.get("filtered_frames", []) + job_id = state.get("job_id") - all_boxes = detect_objects(frames, config, inference_url=INFERENCE_URL, job_id=job_id) + all_boxes = detect_objects(frames, config, inference_url=INFERENCE_URL, job_id=job_id) + total_regions = sum(len(boxes) for boxes in all_boxes.values()) + span.set_output({"frames": len(frames), "regions_detected": total_regions}) stats = state.get("stats", PipelineStats()) - stats.regions_detected = sum(len(boxes) for boxes in all_boxes.values()) + stats.regions_detected = total_regions _emit_transition(state, "detect_objects", "done") return {"boxes_by_frame": all_boxes, "stats": stats} @@ -109,13 +117,15 @@ def node_detect_objects(state: DetectState) -> dict: def node_run_ocr(state: DetectState) -> dict: _emit_transition(state, "run_ocr", "running") - profile = _get_profile(state) - config = profile.ocr_config() - frames = state.get("filtered_frames", []) - boxes = state.get("boxes_by_frame", {}) - job_id = state.get("job_id") + with trace_node(state, "run_ocr") as span: + profile = _get_profile(state) + config = profile.ocr_config() + frames = state.get("filtered_frames", []) + boxes = state.get("boxes_by_frame", {}) + job_id = state.get("job_id") - candidates = run_ocr(frames, boxes, config, inference_url=INFERENCE_URL, job_id=job_id) + candidates = run_ocr(frames, boxes, config, inference_url=INFERENCE_URL, job_id=job_id) + span.set_output({"regions_in": sum(len(b) for b in boxes.values()), "text_candidates": len(candidates)}) stats = state.get("stats", PipelineStats()) stats.regions_resolved_by_ocr = len(candidates) @@ -127,16 +137,18 @@ def node_run_ocr(state: DetectState) -> dict: def node_match_brands(state: DetectState) -> dict: _emit_transition(state, "match_brands", "running") - profile = _get_profile(state) - dictionary = profile.brand_dictionary() - resolver_config = profile.resolver_config() - candidates = state.get("text_candidates", []) - job_id = state.get("job_id") + with trace_node(state, "match_brands") as span: + profile = _get_profile(state) + dictionary = profile.brand_dictionary() + resolver_config = profile.resolver_config() + candidates = state.get("text_candidates", []) + job_id = state.get("job_id") - matched, unresolved = resolve_brands( - candidates, dictionary, resolver_config, - content_type=profile.name, job_id=job_id, - ) + matched, unresolved = resolve_brands( + candidates, dictionary, resolver_config, + content_type=profile.name, job_id=job_id, + ) + span.set_output({"matched": len(matched), "unresolved": len(unresolved)}) _emit_transition(state, "match_brands", "done") return {"detections": matched, "unresolved_candidates": unresolved} @@ -144,37 +156,48 @@ def node_match_brands(state: DetectState) -> dict: def node_escalate_vlm(state: DetectState) -> dict: _emit_transition(state, "escalate_vlm", "running") - job_id = state.get("job_id") - emit.log(job_id, "VLMLocal", "INFO", "Stub: VLM escalation not yet implemented") + + with trace_node(state, "escalate_vlm") as span: + job_id = state.get("job_id") + emit.log(job_id, "VLMLocal", "INFO", "Stub: VLM escalation not yet implemented") + span.set_output({"stub": True}) + _emit_transition(state, "escalate_vlm", "done") return {} def node_escalate_cloud(state: DetectState) -> dict: _emit_transition(state, "escalate_cloud", "running") - job_id = state.get("job_id") - emit.log(job_id, "CloudLLM", "INFO", "Stub: cloud LLM escalation not yet implemented") + + with trace_node(state, "escalate_cloud") as span: + job_id = state.get("job_id") + emit.log(job_id, "CloudLLM", "INFO", "Stub: cloud LLM escalation not yet implemented") + span.set_output({"stub": True}) + _emit_transition(state, "escalate_cloud", "done") return {} def node_compile_report(state: DetectState) -> dict: _emit_transition(state, "compile_report", "running") - job_id = state.get("job_id") - profile = _get_profile(state) - detections = state.get("detections", []) - report = profile.aggregate(detections) - report.video_source = state.get("video_path", "") + with trace_node(state, "compile_report") as span: + job_id = state.get("job_id") + profile = _get_profile(state) + detections = state.get("detections", []) + report = profile.aggregate(detections) + report.video_source = state.get("video_path", "") - emit.log(job_id, "Aggregator", "INFO", - f"Report: {len(report.brands)} brands, {len(report.timeline)} detections") - emit.job_complete(job_id, { - "video_source": report.video_source, - "content_type": report.content_type, - "brands": {k: {"total_appearances": v.total_appearances} for k, v in report.brands.items()}, - }) + emit.log(job_id, "Aggregator", "INFO", + f"Report: {len(report.brands)} brands, {len(report.timeline)} detections") + emit.job_complete(job_id, { + "video_source": report.video_source, + "content_type": report.content_type, + "brands": {k: {"total_appearances": v.total_appearances} for k, v in report.brands.items()}, + }) + span.set_output({"brands": len(report.brands), "detections": len(report.timeline)}) + flush_traces() _emit_transition(state, "compile_report", "done") return {"report": report} diff --git a/detect/tracing.py b/detect/tracing.py new file mode 100644 index 0000000..dda8053 --- /dev/null +++ b/detect/tracing.py @@ -0,0 +1,131 @@ +""" +Langfuse tracing for the detection pipeline. + +Provides span helpers that graph nodes use to record timing, frame counts, +and stage-level metadata. The Langfuse client is optional — if not configured +(no LANGFUSE_SECRET_KEY), tracing is a no-op. + +Usage in graph nodes: + from detect.tracing import trace_node + + def node_extract_frames(state): + with trace_node(state, "extract_frames") as span: + ... + span.set_output({"frames": len(frames)}) + return {...} +""" + +from __future__ import annotations + +import logging +import os +import time +from contextlib import contextmanager +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + +_client = None +_enabled: bool | None = None + + +def _get_client(): + """Lazy-init Langfuse client. Returns None if not configured.""" + global _client, _enabled + if _enabled is False: + return None + if _client is not None: + return _client + + secret = os.environ.get("LANGFUSE_SECRET_KEY", "") + if not secret: + _enabled = False + logger.info("Langfuse not configured (no LANGFUSE_SECRET_KEY), tracing disabled") + return None + + try: + from langfuse import Langfuse + _client = Langfuse() + _enabled = True + logger.info("Langfuse tracing enabled") + return _client + except Exception as e: + _enabled = False + logger.warning("Langfuse init failed: %s — tracing disabled", e) + return None + + +@dataclass +class SpanContext: + """Wraps a Langfuse span with convenience methods.""" + _span: object | None = None + _start: float = field(default_factory=time.monotonic) + metadata: dict = field(default_factory=dict) + + def set_output(self, output: dict) -> None: + self.metadata.update(output) + + def set_error(self, error: str) -> None: + self.metadata["error"] = error + + def _finish(self, status: str = "ok") -> None: + elapsed = time.monotonic() - self._start + self.metadata["duration_seconds"] = round(elapsed, 3) + self.metadata["status"] = status + + if self._span is not None: + try: + self._span.update( + output=self.metadata, + level="ERROR" if status == "error" else "DEFAULT", + ) + self._span.end() + except Exception as e: + logger.debug("Failed to end Langfuse span: %s", e) + + +@contextmanager +def trace_node(state: dict, node_name: str): + """ + Context manager that creates a Langfuse span for a pipeline node. + + Usage: + with trace_node(state, "extract_frames") as span: + frames = do_work() + span.set_output({"frames": len(frames)}) + """ + job_id = state.get("job_id", "unknown") + profile = state.get("profile_name", "") + client = _get_client() + + span_obj = None + if client is not None: + try: + trace = client.trace( + name=f"detect:{job_id}", + session_id=job_id, + metadata={"profile": profile}, + ) + span_obj = trace.span( + name=node_name, + input={"job_id": job_id, "profile": profile}, + ) + except Exception as e: + logger.debug("Failed to create Langfuse span: %s", e) + + ctx = SpanContext(_span=span_obj) + try: + yield ctx + ctx._finish("ok") + except Exception: + ctx._finish("error") + raise + + +def flush(): + """Flush pending Langfuse events. Call at pipeline end.""" + if _client is not None: + try: + _client.flush() + except Exception as e: + logger.debug("Langfuse flush failed: %s", e) diff --git a/requirements.txt b/requirements.txt index 2872b40..f19800a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,6 +26,9 @@ google-cloud-run>=0.10.0 # GraphQL strawberry-graphql[fastapi]>=0.311.0 +# Observability +langfuse>=2.0.0 + # Testing pytest>=7.4.0 pytest-django>=4.7.0 diff --git a/tests/detect/manual/test_timeline_cost.py b/tests/detect/manual/test_timeline_cost.py new file mode 100644 index 0000000..6f3bd9f --- /dev/null +++ b/tests/detect/manual/test_timeline_cost.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +""" +Push detection + stats events to test TimelinePanel and CostStatsPanel. + +Simulates a pipeline run with detections spread across video time, escalation +events, and accumulating cost — exercises both new phase 8 panels. + +Usage: + python tests/detect/manual/test_timeline_cost.py [--job JOB_ID] [--port PORT] [--delay SECS] + +Opens: http://mpr.local.ar/detection/?job= +""" + +import argparse +import json +import logging +import time +from datetime import datetime, timezone + +import redis + +logging.basicConfig(level=logging.INFO, format="%(levelname)-7s %(name)s — %(message)s") +logger = logging.getLogger(__name__) + +NODES = ["extract_frames", "filter_scenes", "detect_objects", "run_ocr", + "match_brands", "escalate_vlm", "escalate_cloud", "compile_report"] + +# Detections spread across video time with different sources +DETECTIONS = [ + ("Nike", 0.97, "ocr", 2.0, 0.5), + ("Nike", 0.95, "ocr", 4.5, 1.0), + ("Emirates", 0.92, "ocr", 5.0, 2.0), + ("Adidas", 0.89, "ocr", 8.0, 0.5), + ("Nike", 0.94, "ocr", 12.0, 1.5), + ("Coca-Cola", 0.85, "ocr", 15.0, 0.5), + ("Emirates", 0.88, "ocr", 18.0, 2.0), + ("Adidas", 0.91, "ocr", 22.0, 1.0), + ("Mastercard", 0.78, "local_vlm", 25.0, 0.5), + ("Nike", 0.96, "ocr", 28.0, 1.0), + ("Emirates", 0.90, "ocr", 32.0, 2.0), + ("Heineken", 0.72, "cloud_llm", 35.0, 0.5), + ("Coca-Cola", 0.87, "ocr", 38.0, 0.5), + ("Nike", 0.93, "ocr", 42.0, 1.5), + ("Unknown", 0.65, "cloud_llm", 45.0, 0.5), + ("Adidas", 0.90, "ocr", 48.0, 1.0), + ("Emirates", 0.91, "ocr", 52.0, 2.0), + ("Nike", 0.95, "ocr", 55.0, 1.0), +] + + +def ts(): + return datetime.now(timezone.utc).isoformat() + + +def push(r, key, event): + event["ts"] = event.get("ts", ts()) + r.rpush(key, json.dumps(event)) + return event + + +def push_graph(r, key, active_node, status, delay): + nodes = [] + for n in NODES: + if n == active_node: + nodes.append({"id": n, "status": status}) + elif NODES.index(n) < NODES.index(active_node): + nodes.append({"id": n, "status": "done"}) + else: + nodes.append({"id": n, "status": "pending"}) + push(r, key, {"event": "graph_update", "nodes": nodes}) + time.sleep(delay) + + +def push_stats(r, key, **overrides): + base = { + "event": "stats_update", + "frames_extracted": 0, "frames_after_scene_filter": 0, + "regions_detected": 0, "regions_resolved_by_ocr": 0, + "regions_escalated_to_local_vlm": 0, "regions_escalated_to_cloud_llm": 0, + "cloud_llm_calls": 0, "processing_time_seconds": 0, "estimated_cloud_cost_usd": 0, + } + base.update(overrides) + push(r, key, base) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--job", default="timeline-cost-test") + parser.add_argument("--port", type=int, default=6382) + parser.add_argument("--delay", type=float, default=0.4) + args = parser.parse_args() + + r = redis.Redis(port=args.port, decode_responses=True) + key = f"detect_events:{args.job}" + + r.delete(key) + + logger.info("Pushing %d detections to %s", len(DETECTIONS), key) + logger.info("Open: http://mpr.local.ar/detection/?job=%s", args.job) + input("\nPress Enter to start...") + + delay = args.delay + + # Pipeline stages with progressive stats + push_graph(r, key, "extract_frames", "running", delay) + push_stats(r, key, frames_extracted=120, processing_time_seconds=3.2) + push_graph(r, key, "extract_frames", "done", delay) + + push_graph(r, key, "filter_scenes", "running", delay) + push_stats(r, key, frames_extracted=120, frames_after_scene_filter=45, processing_time_seconds=5.1) + push_graph(r, key, "filter_scenes", "done", delay) + + push_graph(r, key, "detect_objects", "running", delay) + push_stats(r, key, frames_extracted=120, frames_after_scene_filter=45, + regions_detected=38, processing_time_seconds=12.4) + push_graph(r, key, "detect_objects", "done", delay) + + push_graph(r, key, "run_ocr", "running", delay) + push_stats(r, key, frames_extracted=120, frames_after_scene_filter=45, + regions_detected=38, regions_resolved_by_ocr=28, processing_time_seconds=18.7) + push_graph(r, key, "run_ocr", "done", delay) + + # Brand matching — push detections one by one + push_graph(r, key, "match_brands", "running", delay) + + for i, (brand, conf, source, timestamp, duration) in enumerate(DETECTIONS): + if source != "ocr": + continue + push(r, key, {"event": "detection", + "brand": brand, "confidence": conf, "source": source, + "timestamp": timestamp, "duration": duration, + "content_type": "soccer_broadcast", "frame_ref": i * 3}) + logger.info("[%d] %s %.2f %s t=%.1fs", i + 1, brand, conf, source, timestamp) + time.sleep(delay * 0.3) + + push_graph(r, key, "match_brands", "done", delay) + + # VLM escalation + push_graph(r, key, "escalate_vlm", "running", delay) + push(r, key, {"event": "log", "level": "INFO", "stage": "VLMLocal", + "msg": "Processing 3 unresolved crops with moondream2"}) + time.sleep(delay) + + for i, (brand, conf, source, timestamp, duration) in enumerate(DETECTIONS): + if source != "local_vlm": + continue + push(r, key, {"event": "detection", + "brand": brand, "confidence": conf, "source": source, + "timestamp": timestamp, "duration": duration, + "content_type": "soccer_broadcast", "frame_ref": i * 3}) + logger.info("[vlm] %s %.2f t=%.1fs", brand, conf, timestamp) + time.sleep(delay * 0.3) + + push_stats(r, key, frames_extracted=120, frames_after_scene_filter=45, + regions_detected=38, regions_resolved_by_ocr=28, + regions_escalated_to_local_vlm=3, processing_time_seconds=25.1, + estimated_cloud_cost_usd=0) + push_graph(r, key, "escalate_vlm", "done", delay) + + # Cloud escalation + push_graph(r, key, "escalate_cloud", "running", delay) + + for i, (brand, conf, source, timestamp, duration) in enumerate(DETECTIONS): + if source != "cloud_llm": + continue + push(r, key, {"event": "detection", + "brand": brand, "confidence": conf, "source": source, + "timestamp": timestamp, "duration": duration, + "content_type": "soccer_broadcast", "frame_ref": i * 3}) + logger.info("[cloud] %s %.2f t=%.1fs", brand, conf, timestamp) + time.sleep(delay * 0.3) + + push_stats(r, key, frames_extracted=120, frames_after_scene_filter=45, + regions_detected=38, regions_resolved_by_ocr=28, + regions_escalated_to_local_vlm=3, regions_escalated_to_cloud_llm=2, + cloud_llm_calls=2, processing_time_seconds=31.4, + estimated_cloud_cost_usd=0.0042) + push_graph(r, key, "escalate_cloud", "done", delay) + + # Report + push_graph(r, key, "compile_report", "running", delay) + push(r, key, {"event": "log", "level": "INFO", "stage": "Aggregator", + "msg": f"Report: {len(set(d[0] for d in DETECTIONS))} brands, {len(DETECTIONS)} detections"}) + push_graph(r, key, "compile_report", "done", delay) + + push(r, key, {"event": "job_complete", "job_id": args.job, "report": { + "video_source": "soccer_clip.mp4", + "content_type": "soccer_broadcast", + "duration_seconds": 60.0, + }}) + + logger.info("Done. Check Timeline (brand bars over time) and Cost & Stats panels.") + + +if __name__ == "__main__": + main() diff --git a/tests/detect/test_tracing.py b/tests/detect/test_tracing.py new file mode 100644 index 0000000..4f35c97 --- /dev/null +++ b/tests/detect/test_tracing.py @@ -0,0 +1,34 @@ +"""Tests for Langfuse tracing — works without Langfuse configured (no-op mode).""" + +import pytest + +from detect.tracing import trace_node, SpanContext, flush + + +def test_trace_node_noop(): + """Without LANGFUSE_SECRET_KEY, tracing is a no-op but doesn't error.""" + state = {"job_id": "test-job", "profile_name": "soccer_broadcast"} + + with trace_node(state, "extract_frames") as span: + assert isinstance(span, SpanContext) + span.set_output({"frames": 42}) + + assert span.metadata["frames"] == 42 + assert span.metadata["status"] == "ok" + assert "duration_seconds" in span.metadata + + +def test_trace_node_error(): + """Span records error status on exception.""" + state = {"job_id": "test-job"} + + with pytest.raises(ValueError): + with trace_node(state, "bad_node") as span: + raise ValueError("boom") + + assert span.metadata["status"] == "error" + + +def test_flush_noop(): + """Flush works when Langfuse is not configured.""" + flush() diff --git a/ui/detection-app/src/App.vue b/ui/detection-app/src/App.vue index 7fd5b3d..a0713ba 100644 --- a/ui/detection-app/src/App.vue +++ b/ui/detection-app/src/App.vue @@ -7,6 +7,8 @@ import FunnelPanel from './panels/FunnelPanel.vue' import PipelineGraphPanel from './panels/PipelineGraphPanel.vue' import FramePanel from './panels/FramePanel.vue' import BrandTablePanel from './panels/BrandTablePanel.vue' +import TimelinePanel from './panels/TimelinePanel.vue' +import CostStatsPanel from './panels/CostStatsPanel.vue' import type { StatsUpdate } from './types/sse-contract' const jobId = ref(new URLSearchParams(window.location.search).get('job') || 'test-job') @@ -69,6 +71,10 @@ source.connect() + + + + diff --git a/ui/detection-app/src/panels/CostStatsPanel.vue b/ui/detection-app/src/panels/CostStatsPanel.vue new file mode 100644 index 0000000..ecabc46 --- /dev/null +++ b/ui/detection-app/src/panels/CostStatsPanel.vue @@ -0,0 +1,123 @@ + + + + + diff --git a/ui/detection-app/src/panels/TimelinePanel.vue b/ui/detection-app/src/panels/TimelinePanel.vue new file mode 100644 index 0000000..379606f --- /dev/null +++ b/ui/detection-app/src/panels/TimelinePanel.vue @@ -0,0 +1,180 @@ + + + + +