""" WorkerPool — manages N worker threads via ThreadPoolExecutor. Demonstrates: Python concurrency — threading (Interview Topic 2). """ import logging import threading from concurrent.futures import Future, ThreadPoolExecutor from typing import Any, Callable, Dict, List, Optional from .models import ChunkResult from .processor import ( ChecksumProcessor, CompositeProcessor, FFmpegExtractProcessor, Processor, SimulatedDecodeProcessor, ) from .queue import ChunkQueue from .worker import Worker logger = logging.getLogger(__name__) def create_processor( processor_type: str = "checksum", output_dir: Optional[str] = None, ) -> Processor: """Factory for processor instances.""" if processor_type == "ffmpeg": if not output_dir: raise ValueError("output_dir required for ffmpeg processor") return FFmpegExtractProcessor(output_dir=output_dir) elif processor_type == "checksum": return ChecksumProcessor() elif processor_type == "simulated_decode": return SimulatedDecodeProcessor() elif processor_type == "composite": return CompositeProcessor([ ChecksumProcessor(), SimulatedDecodeProcessor(ms_per_second=50.0), ]) else: raise ValueError(f"Unknown processor type: {processor_type}") class WorkerPool: """ Manages N worker threads that process chunks concurrently. Args: num_workers: Number of concurrent worker threads (default: 4) chunk_queue: Shared queue to pull chunks from processor_type: Type of processor for each worker (default: "checksum") max_retries: Max retry attempts per chunk (default: 3) event_callback: Optional callback for real-time events """ def __init__( self, num_workers: int = 4, chunk_queue: Optional[ChunkQueue] = None, processor_type: str = "checksum", max_retries: int = 3, event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None, output_dir: Optional[str] = None, ): self.num_workers = num_workers self.chunk_queue = chunk_queue or ChunkQueue() self.processor_type = processor_type self.max_retries = max_retries self.event_callback = event_callback self.output_dir = output_dir self.shutdown_event = threading.Event() self._executor: Optional[ThreadPoolExecutor] = None self._futures: List[Future] = [] self._workers: List[Worker] = [] def start(self) -> None: """Start all worker threads.""" self._executor = ThreadPoolExecutor( max_workers=self.num_workers, thread_name_prefix="chunk-worker", ) for i in range(self.num_workers): worker = Worker( worker_id=f"worker-{i}", chunk_queue=self.chunk_queue, processor=create_processor(self.processor_type, output_dir=self.output_dir), max_retries=self.max_retries, event_callback=self.event_callback, ) self._workers.append(worker) future = self._executor.submit(worker.run) self._futures.append(future) logger.info(f"WorkerPool started with {self.num_workers} workers") def wait(self) -> List[ChunkResult]: """Wait for all workers to finish and collect results.""" all_results = [] for future in self._futures: results = future.result() all_results.extend(results) return all_results def shutdown(self) -> None: """Signal shutdown and cleanup.""" self.shutdown_event.set() self.chunk_queue.close() if self._executor: self._executor.shutdown(wait=True) def get_worker_stats(self) -> Dict[str, Any]: """Get per-worker statistics.""" return { w.worker_id: { "processed": w.processed_count, "errors": w.error_count, "retries": w.retry_count, } for w in self._workers }