126 lines
4.1 KiB
Python
126 lines
4.1 KiB
Python
"""
|
|
WorkerPool — manages N worker threads via ThreadPoolExecutor.
|
|
|
|
Demonstrates: Python concurrency — threading (Interview Topic 2).
|
|
"""
|
|
|
|
import logging
|
|
import threading
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
from .models import ChunkResult
|
|
from .processor import (
|
|
ChecksumProcessor,
|
|
CompositeProcessor,
|
|
FFmpegExtractProcessor,
|
|
Processor,
|
|
SimulatedDecodeProcessor,
|
|
)
|
|
from .queue import ChunkQueue
|
|
from .worker import Worker
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_processor(
|
|
processor_type: str = "checksum",
|
|
output_dir: Optional[str] = None,
|
|
) -> Processor:
|
|
"""Factory for processor instances."""
|
|
if processor_type == "ffmpeg":
|
|
if not output_dir:
|
|
raise ValueError("output_dir required for ffmpeg processor")
|
|
return FFmpegExtractProcessor(output_dir=output_dir)
|
|
elif processor_type == "checksum":
|
|
return ChecksumProcessor()
|
|
elif processor_type == "simulated_decode":
|
|
return SimulatedDecodeProcessor()
|
|
elif processor_type == "composite":
|
|
return CompositeProcessor([
|
|
ChecksumProcessor(),
|
|
SimulatedDecodeProcessor(ms_per_second=50.0),
|
|
])
|
|
else:
|
|
raise ValueError(f"Unknown processor type: {processor_type}")
|
|
|
|
|
|
class WorkerPool:
|
|
"""
|
|
Manages N worker threads that process chunks concurrently.
|
|
|
|
Args:
|
|
num_workers: Number of concurrent worker threads (default: 4)
|
|
chunk_queue: Shared queue to pull chunks from
|
|
processor_type: Type of processor for each worker (default: "checksum")
|
|
max_retries: Max retry attempts per chunk (default: 3)
|
|
event_callback: Optional callback for real-time events
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
num_workers: int = 4,
|
|
chunk_queue: Optional[ChunkQueue] = None,
|
|
processor_type: str = "checksum",
|
|
max_retries: int = 3,
|
|
event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None,
|
|
output_dir: Optional[str] = None,
|
|
):
|
|
self.num_workers = num_workers
|
|
self.chunk_queue = chunk_queue or ChunkQueue()
|
|
self.processor_type = processor_type
|
|
self.max_retries = max_retries
|
|
self.event_callback = event_callback
|
|
self.output_dir = output_dir
|
|
self.shutdown_event = threading.Event()
|
|
self._executor: Optional[ThreadPoolExecutor] = None
|
|
self._futures: List[Future] = []
|
|
self._workers: List[Worker] = []
|
|
|
|
def start(self) -> None:
|
|
"""Start all worker threads."""
|
|
self._executor = ThreadPoolExecutor(
|
|
max_workers=self.num_workers,
|
|
thread_name_prefix="chunk-worker",
|
|
)
|
|
|
|
for i in range(self.num_workers):
|
|
worker = Worker(
|
|
worker_id=f"worker-{i}",
|
|
chunk_queue=self.chunk_queue,
|
|
processor=create_processor(self.processor_type, output_dir=self.output_dir),
|
|
max_retries=self.max_retries,
|
|
event_callback=self.event_callback,
|
|
)
|
|
self._workers.append(worker)
|
|
future = self._executor.submit(worker.run)
|
|
self._futures.append(future)
|
|
|
|
logger.info(f"WorkerPool started with {self.num_workers} workers")
|
|
|
|
def wait(self) -> List[ChunkResult]:
|
|
"""Wait for all workers to finish and collect results."""
|
|
all_results = []
|
|
for future in self._futures:
|
|
results = future.result()
|
|
all_results.extend(results)
|
|
return all_results
|
|
|
|
def shutdown(self) -> None:
|
|
"""Signal shutdown and cleanup."""
|
|
self.shutdown_event.set()
|
|
self.chunk_queue.close()
|
|
if self._executor:
|
|
self._executor.shutdown(wait=True)
|
|
|
|
def get_worker_stats(self) -> Dict[str, Any]:
|
|
"""Get per-worker statistics."""
|
|
return {
|
|
w.worker_id: {
|
|
"processed": w.processed_count,
|
|
"errors": w.error_count,
|
|
"retries": w.retry_count,
|
|
}
|
|
for w in self._workers
|
|
}
|