""" Event emission helpers for detection pipeline stages. Single place that knows how to build event payloads. Stages call these instead of constructing dicts or dataclasses directly. Run context (run_id, parent_job_id) is set once at pipeline start via set_run_context() and automatically injected into all events. Log level is set per-run with optional per-stage overrides. DEBUG events are only pushed when the run (or stage) log level allows it. """ from __future__ import annotations import dataclasses from datetime import datetime, timezone from detect.events import push_detect_event from detect.models import PipelineStats # Log level ordering for comparison _LEVEL_ORDER = {"DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3} # Module-level run context — set once per pipeline invocation _run_context: dict = {} _run_log_level: str = "INFO" _stage_log_levels: dict[str, str] = {} # stage_name → level override def set_run_context( run_id: str = "", parent_job_id: str = "", run_type: str = "initial", log_level: str = "INFO", ): """Set the run context for all subsequent events in this pipeline invocation.""" global _run_context, _run_log_level _run_context = { "run_id": run_id, "parent_job_id": parent_job_id, "run_type": run_type, } _run_log_level = log_level.upper() _stage_log_levels.clear() def set_stage_log_level(stage: str, level: str): """Override log level for a specific stage.""" _stage_log_levels[stage] = level.upper() def clear_stage_log_level(stage: str): """Remove per-stage log level override.""" _stage_log_levels.pop(stage, None) def clear_run_context(): global _run_context, _run_log_level _run_context = {} _run_log_level = "INFO" _stage_log_levels.clear() def _should_emit(level: str, stage: str) -> bool: """Check if this log level should be emitted given run/stage settings.""" effective = _stage_log_levels.get(stage, _run_log_level) return _LEVEL_ORDER.get(level.upper(), 1) >= _LEVEL_ORDER.get(effective, 1) def _inject_context(payload: dict) -> dict: """Add run context fields to an event payload.""" if _run_context: payload.update(_run_context) return payload def log(job_id: str | None, stage: str, level: str, msg: str) -> None: if not job_id: return if not _should_emit(level, stage): return payload = { "level": level, "stage": stage, "msg": msg, "ts": datetime.now(timezone.utc).isoformat(), } _inject_context(payload) push_detect_event(job_id, "log", payload) def stats(job_id: str | None, **kwargs) -> None: if not job_id: return s = PipelineStats(**kwargs) payload = dataclasses.asdict(s) _inject_context(payload) push_detect_event(job_id, "stats_update", payload) def frame_update( job_id: str | None, frame_ref: int, timestamp: float, jpeg_b64: str, boxes: list[dict], ) -> None: if not job_id: return payload = { "frame_ref": frame_ref, "timestamp": timestamp, "jpeg_b64": jpeg_b64, "boxes": boxes, } _inject_context(payload) push_detect_event(job_id, "frame_update", payload) def graph_update(job_id: str | None, nodes: list[dict]) -> None: if not job_id: return payload = {"nodes": nodes} _inject_context(payload) push_detect_event(job_id, "graph_update", payload) def detection( job_id: str | None, brand: str, confidence: float, source: str, timestamp: float, duration: float = 0.0, content_type: str = "", frame_ref: int | None = None, ) -> None: if not job_id: return payload = { "brand": brand, "confidence": confidence, "source": source, "timestamp": timestamp, "duration": duration, "content_type": content_type, "frame_ref": frame_ref, } _inject_context(payload) push_detect_event(job_id, "detection", payload) def job_complete(job_id: str | None, report: dict) -> None: if not job_id: return payload = {"job_id": job_id, "report": report} _inject_context(payload) push_detect_event(job_id, "job_complete", payload)