diff --git a/core/jobs/handlers/detect.py b/core/jobs/handlers/detect.py new file mode 100644 index 0000000..67ce4e0 --- /dev/null +++ b/core/jobs/handlers/detect.py @@ -0,0 +1,130 @@ +""" +DetectHandler — runs the detection pipeline as a Celery job. + +Supports three modes via payload: + - Initial run: {"video_path": "...", "profile_name": "..."} + - Replay: {"replay_from": "run_ocr", "source_job_id": "...", "config_overrides": {...}} + - Retry: {"retry_from": "escalate_vlm", "source_job_id": "...", "config_overrides": {...}} +""" + +import logging +import os +import uuid +from typing import Any, Callable, Dict, Optional + +from .base import Handler + +logger = logging.getLogger(__name__) + + +class DetectHandler(Handler): + + def process( + self, + job_id: str, + payload: Dict[str, Any], + progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, + ) -> Dict[str, Any]: + + replay_from = payload.get("replay_from") + source_job_id = payload.get("source_job_id") + + if replay_from and source_job_id: + return self._run_replay(job_id, source_job_id, replay_from, payload, progress_callback) + + return self._run_initial(job_id, payload, progress_callback) + + def _run_initial( + self, + job_id: str, + payload: Dict[str, Any], + progress_callback: Optional[Callable], + ) -> Dict[str, Any]: + from detect import emit + from detect.graph import get_pipeline + from detect.state import DetectState + + video_path = payload["video_path"] + profile_name = payload.get("profile_name", "soccer_broadcast") + source_asset_id = payload.get("source_asset_id", "") + checkpoint_enabled = payload.get("checkpoint", os.environ.get("MPR_CHECKPOINT") == "1") + + emit.set_run_context( + run_id=job_id, + parent_job_id=payload.get("parent_job_id", job_id), + run_type="initial", + ) + + logger.info("DetectHandler: initial run job=%s video=%s profile=%s checkpoint=%s", + job_id, video_path, profile_name, checkpoint_enabled) + + if progress_callback: + progress_callback(0, {"stage": "starting"}) + + pipeline = get_pipeline(checkpoint=checkpoint_enabled) + + initial_state = DetectState( + video_path=video_path, + job_id=job_id, + profile_name=profile_name, + source_asset_id=source_asset_id, + ) + + try: + result = pipeline.invoke(initial_state) + finally: + emit.clear_run_context() + + detections = result.get("detections", []) + report = result.get("report") + brands_found = len(report.brands) if report else 0 + + if progress_callback: + progress_callback(100, {"stage": "completed"}) + + return { + "status": "completed", + "job_id": job_id, + "detections": len(detections), + "brands_found": brands_found, + } + + def _run_replay( + self, + job_id: str, + source_job_id: str, + start_stage: str, + payload: Dict[str, Any], + progress_callback: Optional[Callable], + ) -> Dict[str, Any]: + from detect.checkpoint import replay_from + + config_overrides = payload.get("config_overrides", {}) + + logger.info("DetectHandler: replay job=%s from=%s source=%s overrides=%s", + job_id, start_stage, source_job_id, config_overrides) + + if progress_callback: + progress_callback(0, {"stage": f"replaying from {start_stage}"}) + + result = replay_from( + job_id=source_job_id, + start_stage=start_stage, + config_overrides=config_overrides, + ) + + detections = result.get("detections", []) + report = result.get("report") + brands_found = len(report.brands) if report else 0 + + if progress_callback: + progress_callback(100, {"stage": "completed"}) + + return { + "status": "completed", + "job_id": job_id, + "source_job_id": source_job_id, + "replay_from": start_stage, + "detections": len(detections), + "brands_found": brands_found, + } diff --git a/core/jobs/registry.py b/core/jobs/registry.py index 6b9b4f7..9956e31 100644 --- a/core/jobs/registry.py +++ b/core/jobs/registry.py @@ -25,9 +25,11 @@ def _register_defaults() -> None: """Register built-in handlers.""" from .handlers.chunk import ChunkHandler from .handlers.transcode import TranscodeHandler + from .handlers.detect import DetectHandler register_handler("transcode", TranscodeHandler) register_handler("chunk", ChunkHandler) + register_handler("detect", DetectHandler) _register_defaults() diff --git a/ctrl/docker-compose.yml b/ctrl/docker-compose.yml index e3d562f..13402d3 100644 --- a/ctrl/docker-compose.yml +++ b/ctrl/docker-compose.yml @@ -150,7 +150,7 @@ services: build: context: .. dockerfile: ctrl/Dockerfile.worker - command: celery -A admin.mpr worker -l info -Q celery,transcode -c 2 + command: celery -A admin.mpr worker -l info -Q celery,transcode,detect,detect_retry -c 2 environment: <<: *common-env MPR_EXECUTOR: local diff --git a/ctrl/nginx.conf b/ctrl/nginx.conf index 5358c75..db32b21 100644 --- a/ctrl/nginx.conf +++ b/ctrl/nginx.conf @@ -98,6 +98,13 @@ http { proxy_set_header Host $host; } + # Detection API (replay, retry, checkpoints) + location /api/detect/ { + proxy_pass http://fastapi/detect/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + } + # SSE streams — disable buffering for realtime delivery location /api/detect/stream/ { proxy_pass http://fastapi/detect/stream/;