""" Pipeline — orchestrates the entire chunker pipeline. Wires: Chunker → ChunkQueue → WorkerPool → ResultCollector → PipelineResult Demonstrates: - Function parameters and defaults (Interview Topic 1) — configurable pipeline - Concurrency (Interview Topic 2) — producer thread + worker pool - OOP design (Interview Topic 4) — composition of pipeline components - Exception handling (Interview Topic 7) — graceful error propagation """ import json import logging import threading import time from pathlib import Path from typing import Any, Callable, Dict, Optional from .chunker import Chunker from .collector import ResultCollector from .exceptions import PipelineError from .models import PipelineResult from .pool import WorkerPool from .queue import ChunkQueue logger = logging.getLogger(__name__) class Pipeline: """ Orchestrates the chunk processing pipeline. The pipeline runs in three stages: 1. Producer thread: Chunker probes file → pushes time-based chunks to ChunkQueue 2. Worker pool: N workers pull from queue → extract mp4 segments → emit results 3. Collector: ResultCollector reassembles results in sequence order Args: source: Path to the source media file chunk_duration: Duration of each chunk in seconds (default: 10.0) num_workers: Number of concurrent worker threads (default: 4) max_retries: Max retry attempts per chunk (default: 3) processor_type: Processor to use — "ffmpeg", "checksum", "simulated_decode", "composite" queue_size: Max chunks buffered in queue (default: 10) event_callback: Optional callback for real-time events output_dir: Directory for output chunk files (required for "ffmpeg" processor) """ def __init__( self, source: str, chunk_duration: float = 10.0, num_workers: int = 4, max_retries: int = 3, processor_type: str = "checksum", queue_size: int = 10, event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None, output_dir: Optional[str] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, ): self.source = source self.chunk_duration = chunk_duration self.num_workers = num_workers self.max_retries = max_retries self.processor_type = processor_type self.queue_size = queue_size self.event_callback = event_callback self.output_dir = output_dir self.start_time = start_time self.end_time = end_time def _emit(self, event_type: str, data: Dict[str, Any]) -> None: """Emit an event if callback is registered.""" if self.event_callback: self.event_callback(event_type, data) def _produce_chunks( self, chunker: Chunker, chunk_queue: ChunkQueue ) -> None: """Producer thread: probe file and enqueue time-based chunks.""" try: for chunk in chunker.chunks(): chunk_queue.put(chunk, timeout=30.0) self._emit("chunk_queued", { "sequence": chunk.sequence, "start_time": chunk.start_time, "end_time": chunk.end_time, "duration": chunk.duration, "queue_size": chunk_queue.qsize(), }) except Exception as e: logger.error(f"Producer error: {e}") self._emit("producer_error", {"error": str(e)}) finally: chunk_queue.close() def _monitor_progress( self, start_time: float, file_size: int, stop_event: threading.Event ) -> None: """Monitor thread: emit pipeline_progress every 500ms.""" while not stop_event.is_set(): elapsed = time.monotonic() - start_time mb = file_size / (1024 * 1024) self._emit("pipeline_progress", { "elapsed": round(elapsed, 2), "throughput_mbps": round(mb / elapsed, 2) if elapsed > 0 else 0, }) stop_event.wait(0.5) def _write_manifest( self, result: PipelineResult, source_duration: float ) -> None: """Write manifest.json to output_dir with segment metadata.""" if not self.output_dir: return manifest = { "source": self.source, "source_duration": source_duration, "chunk_duration": self.chunk_duration, "total_chunks": result.total_chunks, "processed": result.processed, "failed": result.failed, "elapsed_time": result.elapsed_time, "throughput_mbps": result.throughput_mbps, "segments": [ { "sequence": i, "file": f"chunk_{i:04d}.mp4", "start": i * self.chunk_duration, "end": min( (i + 1) * self.chunk_duration, source_duration ), } for i in range(result.total_chunks) if i < result.total_chunks ], } manifest_path = Path(self.output_dir) / "manifest.json" manifest_path.write_text(json.dumps(manifest, indent=2)) logger.info(f"Manifest written to {manifest_path}") def run(self) -> PipelineResult: """ Execute the full pipeline. Returns: PipelineResult with aggregate stats Raises: PipelineError: If the pipeline fails catastrophically """ start_time = time.monotonic() self._emit("pipeline_start", { "source": self.source, "chunk_duration": self.chunk_duration, "num_workers": self.num_workers, "processor_type": self.processor_type, }) try: # Stage 1: Set up chunker (probes file for duration) chunker = Chunker( self.source, self.chunk_duration, start_time=self.start_time, end_time=self.end_time, ) total_chunks = chunker.expected_chunks if total_chunks == 0: self._emit("pipeline_complete", {"total_chunks": 0}) return PipelineResult(chunks_in_order=True) self._emit("pipeline_info", { "file_size": chunker.file_size, "source_duration": chunker.source_duration, "total_chunks": total_chunks, }) # Stage 2: Set up queue and worker pool chunk_queue = ChunkQueue(maxsize=self.queue_size) pool = WorkerPool( num_workers=self.num_workers, chunk_queue=chunk_queue, processor_type=self.processor_type, max_retries=self.max_retries, event_callback=self.event_callback, output_dir=self.output_dir, ) # Stage 3: Start workers, monitor, then produce chunks pool.start() monitor_stop = threading.Event() monitor = threading.Thread( target=self._monitor_progress, args=(start_time, chunker.file_size, monitor_stop), name="progress-monitor", daemon=True, ) monitor.start() producer = threading.Thread( target=self._produce_chunks, args=(chunker, chunk_queue), name="chunk-producer", daemon=True, ) producer.start() # Stage 4: Wait for all workers to finish all_results = pool.wait() producer.join(timeout=5.0) # Stop monitor monitor_stop.set() monitor.join(timeout=2.0) # Stage 5: Collect results in order collector = ResultCollector(total_chunks) for r in all_results: collector.add(r) self._emit("chunk_collected", { "sequence": r.sequence, "success": r.success, "buffered": collector.buffered_count, "emitted": collector.emitted_count, }) # Build result elapsed = time.monotonic() - start_time file_size_mb = chunker.file_size / (1024 * 1024) throughput = file_size_mb / elapsed if elapsed > 0 else 0.0 failed_results = [r for r in all_results if not r.success] total_retries = sum(r.retries for r in all_results) chunk_files = [ r.output_file for r in all_results if r.success and r.output_file ] result = PipelineResult( total_chunks=total_chunks, processed=len(all_results), failed=len(failed_results), retries=total_retries, elapsed_time=elapsed, throughput_mbps=throughput, worker_stats=pool.get_worker_stats(), errors=[r.error for r in failed_results if r.error], chunks_in_order=collector.is_complete, output_dir=self.output_dir, chunk_files=chunk_files, ) # Write manifest if output_dir is set self._write_manifest(result, chunker.source_duration) pool.shutdown() self._emit("pipeline_complete", { "total_chunks": result.total_chunks, "processed": result.processed, "failed": result.failed, "elapsed": result.elapsed_time, "throughput_mbps": result.throughput_mbps, }) return result except PipelineError: raise except Exception as e: self._emit("pipeline_error", {"error": str(e)}) raise PipelineError(f"Pipeline failed: {e}") from e