add heavy loggin

This commit is contained in:
2026-03-26 10:59:56 -03:00
parent a85722f96a
commit beb0416280
27 changed files with 502 additions and 64 deletions

View File

@@ -6,6 +6,9 @@ Stages call these instead of constructing dicts or dataclasses directly.
Run context (run_id, parent_job_id) is set once at pipeline start via
set_run_context() and automatically injected into all events.
Log level is set per-run with optional per-stage overrides.
DEBUG events are only pushed when the run (or stage) log level allows it.
"""
from __future__ import annotations
@@ -16,23 +19,53 @@ from datetime import datetime, timezone
from detect.events import push_detect_event
from detect.models import PipelineStats
# Log level ordering for comparison
_LEVEL_ORDER = {"DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3}
# Module-level run context — set once per pipeline invocation
_run_context: dict = {}
_run_log_level: str = "INFO"
_stage_log_levels: dict[str, str] = {} # stage_name → level override
def set_run_context(run_id: str = "", parent_job_id: str = "", run_type: str = "initial"):
def set_run_context(
run_id: str = "",
parent_job_id: str = "",
run_type: str = "initial",
log_level: str = "INFO",
):
"""Set the run context for all subsequent events in this pipeline invocation."""
global _run_context
global _run_context, _run_log_level
_run_context = {
"run_id": run_id,
"parent_job_id": parent_job_id,
"run_type": run_type,
}
_run_log_level = log_level.upper()
_stage_log_levels.clear()
def set_stage_log_level(stage: str, level: str):
"""Override log level for a specific stage."""
_stage_log_levels[stage] = level.upper()
def clear_stage_log_level(stage: str):
"""Remove per-stage log level override."""
_stage_log_levels.pop(stage, None)
def clear_run_context():
global _run_context
global _run_context, _run_log_level
_run_context = {}
_run_log_level = "INFO"
_stage_log_levels.clear()
def _should_emit(level: str, stage: str) -> bool:
"""Check if this log level should be emitted given run/stage settings."""
effective = _stage_log_levels.get(stage, _run_log_level)
return _LEVEL_ORDER.get(level.upper(), 1) >= _LEVEL_ORDER.get(effective, 1)
def _inject_context(payload: dict) -> dict:
@@ -45,6 +78,8 @@ def _inject_context(payload: dict) -> dict:
def log(job_id: str | None, stage: str, level: str, msg: str) -> None:
if not job_id:
return
if not _should_emit(level, stage):
return
payload = {
"level": level,
"stage": stage,

View File

@@ -236,7 +236,8 @@ def node_escalate_vlm(state: DetectState) -> dict:
existing = state.get("detections", [])
_emit_transition(state, "escalate_vlm", "done")
vlm_skipped = os.environ.get("SKIP_VLM", "").strip() == "1"
_emit_transition(state, "escalate_vlm", "skipped" if vlm_skipped else "done")
return {
"detections": existing + vlm_matched,
"unresolved_candidates": still_unresolved,
@@ -268,7 +269,8 @@ def node_escalate_cloud(state: DetectState) -> dict:
existing = state.get("detections", [])
_emit_transition(state, "escalate_cloud", "done")
cloud_skipped = os.environ.get("SKIP_CLOUD", "").strip() == "1"
_emit_transition(state, "escalate_cloud", "skipped" if cloud_skipped else "done")
return {"detections": existing + cloud_matched, "stats": stats}
@@ -302,11 +304,33 @@ _CHECKPOINT_ENABLED = os.environ.get("MPR_CHECKPOINT", "").strip() == "1"
_frames_manifest: dict[str, dict[int, str]] = {} # job_id → manifest (cached per job)
class PipelineCancelled(Exception):
"""Raised when a pipeline run is cancelled."""
pass
# Cancellation hook — set by the run endpoint, checked before each node
_cancel_check: dict[str, callable] = {}
def set_cancel_check(job_id: str, fn):
_cancel_check[job_id] = fn
def clear_cancel_check(job_id: str):
_cancel_check.pop(job_id, None)
def _checkpointing_node(node_name: str, node_fn):
"""Wrap a node function to auto-checkpoint after completion."""
stage_index = NODES.index(node_name)
def wrapper(state: DetectState) -> dict:
job_id = state.get("job_id", "")
check = _cancel_check.get(job_id)
if check and check():
raise PipelineCancelled(f"Cancelled before {node_name}")
result = node_fn(state)
job_id = state.get("job_id", "")

View File

@@ -34,10 +34,16 @@ def _encode_image(image: np.ndarray) -> str:
class InferenceClient:
"""HTTP client for the GPU inference server."""
def __init__(self, base_url: str | None = None, timeout: float = 60.0):
def __init__(self, base_url: str | None = None, timeout: float = 60.0,
job_id: str = "", log_level: str = "INFO"):
self.base_url = (base_url or DEFAULT_URL).rstrip("/")
self.timeout = timeout
self.job_id = job_id
self.log_level = log_level
self.session = requests.Session()
if job_id:
self.session.headers["X-Job-Id"] = job_id
self.session.headers["X-Log-Level"] = log_level
def health(self) -> ServerStatus:
"""Check server health and loaded models."""

View File

@@ -8,6 +8,7 @@ Emits log + stats_update SSE events as it works.
from __future__ import annotations
import tempfile
import time
from pathlib import Path
import ffmpeg
@@ -53,6 +54,8 @@ def extract_frames(
emit.log(job_id, "FrameExtractor", "INFO",
f"Starting extraction: {Path(video_path).name} "
f"({duration:.1f}s, {probe.width}x{probe.height}, fps={config.fps})")
emit.log(job_id, "FrameExtractor", "DEBUG",
f"Probe: codec={probe.video_codec}, bitrate={probe.video_bitrate}, max_frames={config.max_frames}")
with tempfile.TemporaryDirectory() as tmpdir:
pattern = str(Path(tmpdir) / "frame_%06d.jpg")
@@ -65,14 +68,24 @@ def extract_frames(
.overwrite_output()
)
t0 = time.monotonic()
try:
stream.run(capture_stdout=True, capture_stderr=True, quiet=True)
except ffmpeg.Error as e:
stderr = e.stderr.decode() if e.stderr else "unknown error"
emit.log(job_id, "FrameExtractor", "ERROR", f"FFmpeg failed: {stderr[:200]}")
raise RuntimeError(f"FFmpeg failed: {stderr}") from e
ffmpeg_ms = (time.monotonic() - t0) * 1000
emit.log(job_id, "FrameExtractor", "DEBUG", f"FFmpeg decode: {ffmpeg_ms:.0f}ms")
t0 = time.monotonic()
frames = _load_frames(Path(tmpdir), config.fps)
load_ms = (time.monotonic() - t0) * 1000
if frames:
h, w = frames[0].image.shape[:2]
mem_mb = sum(f.image.nbytes for f in frames) / (1024 * 1024)
emit.log(job_id, "FrameExtractor", "DEBUG",
f"Loaded {len(frames)} frames ({w}x{h}) in {load_ms:.0f}ms, {mem_mb:.1f}MB in memory")
emit.log(job_id, "FrameExtractor", "INFO", f"Extracted {len(frames)} frames")
emit.stats(job_id, frames_extracted=len(frames))

View File

@@ -13,6 +13,7 @@ Model instances are cached at module level so they survive across pipeline runs.
from __future__ import annotations
import logging
import time
from typing import TYPE_CHECKING
import numpy as np
@@ -91,7 +92,8 @@ def run_ocr(
# Build these once per pipeline run, not per crop
if inference_url:
from detect.inference import InferenceClient
client = InferenceClient(base_url=inference_url)
from detect.emit import _run_log_level
client = InferenceClient(base_url=inference_url, job_id=job_id or "", log_level=_run_log_level)
else:
model = _get_local_model(config.languages[0])
@@ -108,12 +110,19 @@ def run_ocr(
if crop.size == 0:
continue
t0 = time.monotonic()
if inference_url:
raw_results = client.ocr(image=crop, languages=config.languages)
texts = [{"text": r.text, "confidence": r.confidence} for r in raw_results]
else:
raw = model.ocr(crop)
texts = _parse_ocr_raw(raw, config.min_confidence)
ocr_ms = (time.monotonic() - t0) * 1000
h, w = crop.shape[:2]
text_preview = ", ".join(t["text"][:30] for t in texts) if texts else "(none)"
emit.log(job_id, "OCRStage", "DEBUG",
f"Frame {seq} box {box.x},{box.y} ({w}x{h}): {ocr_ms:.0f}ms → {text_preview}")
for t in texts:
candidates.append(TextCandidate(

View File

@@ -9,6 +9,8 @@ CV stages without losing unique visual content.
from __future__ import annotations
import time
import imagehash
from PIL import Image
@@ -63,8 +65,16 @@ def scene_filter(
emit.log(job_id, "SceneFilter", "INFO",
f"Filtering {len(frames)} frames (hamming_threshold={config.hamming_threshold})")
t0 = time.monotonic()
hashes = _compute_hashes(frames)
hash_ms = (time.monotonic() - t0) * 1000
emit.log(job_id, "SceneFilter", "DEBUG",
f"Computed {len(hashes)} perceptual hashes in {hash_ms:.0f}ms ({hash_ms/max(len(hashes),1):.1f}ms/frame)")
t0 = time.monotonic()
kept = _dedup(frames, hashes, config.hamming_threshold)
dedup_ms = (time.monotonic() - t0) * 1000
emit.log(job_id, "SceneFilter", "DEBUG", f"Dedup pass: {dedup_ms:.0f}ms")
dropped = len(frames) - len(kept)
pct = (dropped / len(frames) * 100) if frames else 0

View File

@@ -13,6 +13,8 @@ from __future__ import annotations
import base64
import io
import logging
import os
import time
import numpy as np
from PIL import Image
@@ -108,6 +110,11 @@ def escalate_cloud(
if not candidates:
return []
if os.environ.get("SKIP_CLOUD", "").strip() == "1":
emit.log(job_id, "CloudLLM", "INFO",
f"SKIP_CLOUD=1, skipping {len(candidates)} crops")
return []
if not has_api_key():
emit.log(job_id, "CloudLLM", "WARNING",
f"No API key set for cloud provider, skipping {len(candidates)} crops")
@@ -120,7 +127,7 @@ def escalate_cloud(
matched: list[BrandDetection] = []
total_cost = 0.0
for candidate in candidates:
for i, candidate in enumerate(candidates):
crop = _crop_image(candidate)
if crop.size == 0:
continue
@@ -133,11 +140,15 @@ def escalate_cloud(
prompt = vlm_prompt_fn(crop_context)
image_b64 = _encode_crop(crop)
t0 = time.monotonic()
try:
result = _call_cloud_api(image_b64, prompt)
except Exception as e:
logger.warning("Cloud LLM failed for '%s': %s", candidate.text, e)
call_ms = (time.monotonic() - t0) * 1000
emit.log(job_id, "CloudLLM", "DEBUG",
f"[{i+1}/{len(candidates)}] FAILED '{candidate.text[:30]}': {e} ({call_ms:.0f}ms)")
continue
call_ms = (time.monotonic() - t0) * 1000
stats.cloud_llm_calls += 1
model_info = provider.models.get(provider.model)
@@ -148,6 +159,11 @@ def escalate_cloud(
brand = result["brand"]
confidence = result["confidence"]
emit.log(job_id, "CloudLLM", "DEBUG",
f"[{i+1}/{len(candidates)}] '{candidate.text[:30]}'"
f"{'' + brand if brand else ''} "
f"(conf={confidence:.2f}, {result['tokens']}tok, ${call_cost:.4f}, {call_ms:.0f}ms)")
if brand and confidence >= min_confidence:
detection = BrandDetection(
brand=brand,

View File

@@ -9,6 +9,8 @@ objects for crops the VLM can identify.
from __future__ import annotations
import logging
import os
import time
import numpy as np
@@ -61,6 +63,11 @@ def escalate_vlm(
if not candidates:
return [], []
if os.environ.get("SKIP_VLM", "").strip() == "1":
emit.log(job_id, "VLMLocal", "INFO",
f"SKIP_VLM=1, skipping {len(candidates)} crops")
return [], candidates
emit.log(job_id, "VLMLocal", "INFO",
f"Processing {len(candidates)} unresolved crops with moondream2")
@@ -69,9 +76,10 @@ def escalate_vlm(
if inference_url:
from detect.inference import InferenceClient
client = InferenceClient(base_url=inference_url)
from detect.emit import _run_log_level
client = InferenceClient(base_url=inference_url, job_id=job_id or "", log_level=_run_log_level)
for candidate in candidates:
for i, candidate in enumerate(candidates):
crop = _crop_image(candidate)
if crop.size == 0:
still_unresolved.append(candidate)
@@ -84,6 +92,7 @@ def escalate_vlm(
)
prompt = vlm_prompt_fn(crop_context)
t0 = time.monotonic()
try:
if inference_url:
result = client.vlm(image=crop, prompt=prompt)
@@ -93,9 +102,16 @@ def escalate_vlm(
else:
brand, confidence, reasoning = _vlm_local(crop, prompt)
except Exception as e:
logger.warning("VLM failed for candidate '%s': %s", candidate.text, e)
vlm_ms = (time.monotonic() - t0) * 1000
emit.log(job_id, "VLMLocal", "DEBUG",
f"[{i+1}/{len(candidates)}] FAILED '{candidate.text[:30]}': {e} ({vlm_ms:.0f}ms)")
still_unresolved.append(candidate)
continue
vlm_ms = (time.monotonic() - t0) * 1000
emit.log(job_id, "VLMLocal", "DEBUG",
f"[{i+1}/{len(candidates)}] '{candidate.text[:30]}'"
f"{'' + brand if brand else '✗ unresolved'} "
f"(conf={confidence:.2f}, {vlm_ms:.0f}ms)")
if brand and confidence >= min_confidence:
detection = BrandDetection(

View File

@@ -14,6 +14,7 @@ from __future__ import annotations
import base64
import io
import logging
import time
from PIL import Image
@@ -32,10 +33,11 @@ def _frame_to_b64(frame: Frame) -> str:
return base64.b64encode(buf.getvalue()).decode()
def _detect_remote(frame: Frame, config: DetectionConfig, inference_url: str) -> list[BoundingBox]:
def _detect_remote(frame: Frame, config: DetectionConfig, inference_url: str,
job_id: str = "", log_level: str = "INFO") -> list[BoundingBox]:
"""Call the inference server over HTTP."""
from detect.inference import InferenceClient
client = InferenceClient(base_url=inference_url)
client = InferenceClient(base_url=inference_url, job_id=job_id, log_level=log_level)
results = client.detect(
image=frame.image,
model=config.model_name,
@@ -99,15 +101,24 @@ def detect_objects(
all_boxes: dict[int, list[BoundingBox]] = {}
total_regions = 0
for frame in frames:
for i, frame in enumerate(frames):
t0 = time.monotonic()
if inference_url:
boxes = _detect_remote(frame, config, inference_url)
from detect.emit import _run_log_level
boxes = _detect_remote(frame, config, inference_url,
job_id=job_id or "", log_level=_run_log_level)
else:
boxes = _detect_local(frame, config)
det_ms = (time.monotonic() - t0) * 1000
all_boxes[frame.sequence] = boxes
total_regions += len(boxes)
emit.log(job_id, "YOLODetector", "DEBUG",
f"Frame {frame.sequence}: {len(boxes)} regions in {det_ms:.0f}ms"
f" [{', '.join(b.label for b in boxes)}]" if boxes else
f"Frame {frame.sequence}: 0 regions in {det_ms:.0f}ms")
if boxes and job_id:
box_dicts = [{"x": b.x, "y": b.y, "w": b.w, "h": b.h,
"confidence": b.confidence, "label": b.label}