""" Executor abstraction for job processing. Supports different backends: - LocalExecutor: FFmpeg via Celery (default) - LambdaExecutor: AWS Lambda (future) """ import os from abc import ABC, abstractmethod from typing import Any, Callable, Dict, Optional from core.ffmpeg.transcode import TranscodeConfig, transcode # Configuration from environment MPR_EXECUTOR = os.environ.get("MPR_EXECUTOR", "local") class Executor(ABC): """Abstract base class for job executors.""" @abstractmethod def run( self, job_id: str, source_path: str, output_path: str, preset: Optional[Dict[str, Any]] = None, trim_start: Optional[float] = None, trim_end: Optional[float] = None, duration: Optional[float] = None, progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, ) -> bool: """ Execute a transcode/trim job. Args: job_id: Unique job identifier source_path: Path to source file output_path: Path for output file preset: Transcode preset dict (optional, None = trim only) trim_start: Trim start time in seconds (optional) trim_end: Trim end time in seconds (optional) duration: Source duration in seconds (for progress calculation) progress_callback: Called with (percent, details_dict) Returns: True if successful """ pass class LocalExecutor(Executor): """Execute jobs locally using FFmpeg.""" def run( self, job_id: str, source_path: str, output_path: str, preset: Optional[Dict[str, Any]] = None, trim_start: Optional[float] = None, trim_end: Optional[float] = None, duration: Optional[float] = None, progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, ) -> bool: """Execute job using local FFmpeg.""" # Build config from preset or use stream copy for trim-only if preset: config = TranscodeConfig( input_path=source_path, output_path=output_path, video_codec=preset.get("video_codec", "libx264"), video_bitrate=preset.get("video_bitrate"), video_crf=preset.get("video_crf"), video_preset=preset.get("video_preset"), resolution=preset.get("resolution"), framerate=preset.get("framerate"), audio_codec=preset.get("audio_codec", "aac"), audio_bitrate=preset.get("audio_bitrate"), audio_channels=preset.get("audio_channels"), audio_samplerate=preset.get("audio_samplerate"), container=preset.get("container", "mp4"), extra_args=preset.get("extra_args", []), trim_start=trim_start, trim_end=trim_end, ) else: # Trim-only: stream copy config = TranscodeConfig( input_path=source_path, output_path=output_path, video_codec="copy", audio_codec="copy", trim_start=trim_start, trim_end=trim_end, ) # Wrapper to convert float percent to int def wrapped_callback(percent: float, details: Dict[str, Any]) -> None: if progress_callback: progress_callback(int(percent), details) return transcode( config, duration=duration, progress_callback=wrapped_callback if progress_callback else None, ) class LambdaExecutor(Executor): """Execute jobs via AWS Step Functions + Lambda.""" def __init__(self): import boto3 region = os.environ.get("AWS_REGION", "us-east-1") self.sfn = boto3.client("stepfunctions", region_name=region) self.state_machine_arn = os.environ["STEP_FUNCTION_ARN"] self.callback_url = os.environ.get("CALLBACK_URL", "") self.callback_api_key = os.environ.get("CALLBACK_API_KEY", "") def run( self, job_id: str, source_path: str, output_path: str, preset: Optional[Dict[str, Any]] = None, trim_start: Optional[float] = None, trim_end: Optional[float] = None, duration: Optional[float] = None, progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, ) -> bool: """Start a Step Functions execution for this job.""" import json payload = { "job_id": job_id, "source_key": source_path, "output_key": output_path, "preset": preset, "trim_start": trim_start, "trim_end": trim_end, "duration": duration, "callback_url": self.callback_url, "api_key": self.callback_api_key, } response = self.sfn.start_execution( stateMachineArn=self.state_machine_arn, name=f"mpr-{job_id}", input=json.dumps(payload), ) # Store execution ARN on the job execution_arn = response["executionArn"] try: from mpr.media_assets.models import TranscodeJob TranscodeJob.objects.filter(id=job_id).update(execution_arn=execution_arn) except Exception: pass return True # Executor registry _executors: Dict[str, type] = { "local": LocalExecutor, "lambda": LambdaExecutor, } _executor_instance: Optional[Executor] = None def get_executor() -> Executor: """Get the configured executor instance.""" global _executor_instance if _executor_instance is None: executor_type = MPR_EXECUTOR.lower() if executor_type not in _executors: raise ValueError(f"Unknown executor type: {executor_type}") _executor_instance = _executors[executor_type]() return _executor_instance