""" ChunkHandler — job handler that wraps the chunker Pipeline. Downloads source from S3/MinIO, runs FFmpeg chunking pipeline, writes mp4 segments + manifest to media/out/chunks/{job_id}/. Pushes real-time events to Redis for SSE consumption. """ import logging import os from typing import Any, Callable, Dict, Optional from core.events import push_event as push_chunk_event from core.chunker import Pipeline from core.storage import BUCKET_IN, download_to_temp from .base import Handler logger = logging.getLogger(__name__) MEDIA_OUT_DIR = os.environ.get("MEDIA_OUT_DIR", "/app/media/out") class ChunkHandler(Handler): """ Handles chunk processing jobs by delegating to the chunker Pipeline. Expected payload keys: source_key: str — S3 key of the source file in BUCKET_IN chunk_duration: float — seconds per chunk (default: 10.0) num_workers: int — concurrent workers (default: 4) max_retries: int — retries per chunk (default: 3) processor_type: str — "ffmpeg", "checksum", "simulated_decode", "composite" queue_size: int — max queue depth (default: 10) """ def process( self, job_id: str, payload: Dict[str, Any], progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, ) -> Dict[str, Any]: source_key = payload["source_key"] processor_type = payload.get("processor_type", "ffmpeg") logger.info(f"ChunkHandler starting job {job_id}: {source_key}") # Download source from S3/MinIO push_chunk_event(job_id, "pipeline_start", {"status": "downloading", "source_key": source_key}) tmp_source = download_to_temp(BUCKET_IN, source_key) # Output directory: media/out/chunks/{job_id}/ output_dir = os.path.join(MEDIA_OUT_DIR, "chunks", job_id) if processor_type == "ffmpeg": os.makedirs(output_dir, exist_ok=True) try: def event_bridge(event_type: str, data: Dict[str, Any]) -> None: """Bridge pipeline events to Redis + optional progress callback.""" push_chunk_event(job_id, event_type, data) if progress_callback and event_type == "pipeline_complete": progress_callback(100, data) elif progress_callback and event_type == "chunk_done": total = data.get("total_chunks", 1) if total > 0: pct = min(int((data.get("sequence", 0) + 1) / total * 100), 99) progress_callback(pct, data) pipeline = Pipeline( source=tmp_source, chunk_duration=payload.get("chunk_duration", 10.0), num_workers=payload.get("num_workers", 4), max_retries=payload.get("max_retries", 3), processor_type=processor_type, queue_size=payload.get("queue_size", 10), event_callback=event_bridge, output_dir=output_dir if processor_type == "ffmpeg" else None, start_time=payload.get("start_time"), end_time=payload.get("end_time"), ) result = pipeline.run() # Files are already in media/out/chunks/{job_id}/ output_prefix = f"chunks/{job_id}" output_files = [ f"{output_prefix}/{os.path.basename(f)}" for f in result.chunk_files ] push_chunk_event(job_id, "pipeline_complete", { "status": "completed", "total_chunks": result.total_chunks, "processed": result.processed, "failed": result.failed, "elapsed": result.elapsed_time, "throughput_mbps": result.throughput_mbps, }) return { "status": "completed" if result.failed == 0 else "completed_with_errors", "total_chunks": result.total_chunks, "processed": result.processed, "failed": result.failed, "retries": result.retries, "elapsed_time": result.elapsed_time, "throughput_mbps": result.throughput_mbps, "worker_stats": result.worker_stats, "errors": result.errors, "chunks_in_order": result.chunks_in_order, "output_prefix": output_prefix, "output_files": output_files, } except Exception as e: push_chunk_event(job_id, "pipeline_error", {"status": "failed", "error": str(e)}) raise finally: # Cleanup temp source file only (output dir is persistent) try: os.unlink(tmp_source) except OSError: pass