150 lines
4.7 KiB
Python
150 lines
4.7 KiB
Python
"""
|
|
Executor abstraction for job processing.
|
|
|
|
Supports different backends:
|
|
- LocalExecutor: FFmpeg via Celery (default)
|
|
- LambdaExecutor: AWS Lambda (future)
|
|
"""
|
|
|
|
import os
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
from core.ffmpeg.transcode import TranscodeConfig, transcode
|
|
|
|
# Configuration from environment
|
|
MPR_EXECUTOR = os.environ.get("MPR_EXECUTOR", "local")
|
|
|
|
|
|
class Executor(ABC):
|
|
"""Abstract base class for job executors."""
|
|
|
|
@abstractmethod
|
|
def run(
|
|
self,
|
|
job_id: str,
|
|
source_path: str,
|
|
output_path: str,
|
|
preset: Optional[Dict[str, Any]] = None,
|
|
trim_start: Optional[float] = None,
|
|
trim_end: Optional[float] = None,
|
|
duration: Optional[float] = None,
|
|
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
|
) -> bool:
|
|
"""
|
|
Execute a transcode/trim job.
|
|
|
|
Args:
|
|
job_id: Unique job identifier
|
|
source_path: Path to source file
|
|
output_path: Path for output file
|
|
preset: Transcode preset dict (optional, None = trim only)
|
|
trim_start: Trim start time in seconds (optional)
|
|
trim_end: Trim end time in seconds (optional)
|
|
duration: Source duration in seconds (for progress calculation)
|
|
progress_callback: Called with (percent, details_dict)
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
pass
|
|
|
|
|
|
class LocalExecutor(Executor):
|
|
"""Execute jobs locally using FFmpeg."""
|
|
|
|
def run(
|
|
self,
|
|
job_id: str,
|
|
source_path: str,
|
|
output_path: str,
|
|
preset: Optional[Dict[str, Any]] = None,
|
|
trim_start: Optional[float] = None,
|
|
trim_end: Optional[float] = None,
|
|
duration: Optional[float] = None,
|
|
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
|
) -> bool:
|
|
"""Execute job using local FFmpeg."""
|
|
|
|
# Build config from preset or use stream copy for trim-only
|
|
if preset:
|
|
config = TranscodeConfig(
|
|
input_path=source_path,
|
|
output_path=output_path,
|
|
video_codec=preset.get("video_codec", "libx264"),
|
|
video_bitrate=preset.get("video_bitrate"),
|
|
video_crf=preset.get("video_crf"),
|
|
video_preset=preset.get("video_preset"),
|
|
resolution=preset.get("resolution"),
|
|
framerate=preset.get("framerate"),
|
|
audio_codec=preset.get("audio_codec", "aac"),
|
|
audio_bitrate=preset.get("audio_bitrate"),
|
|
audio_channels=preset.get("audio_channels"),
|
|
audio_samplerate=preset.get("audio_samplerate"),
|
|
container=preset.get("container", "mp4"),
|
|
extra_args=preset.get("extra_args", []),
|
|
trim_start=trim_start,
|
|
trim_end=trim_end,
|
|
)
|
|
else:
|
|
# Trim-only: stream copy
|
|
config = TranscodeConfig(
|
|
input_path=source_path,
|
|
output_path=output_path,
|
|
video_codec="copy",
|
|
audio_codec="copy",
|
|
trim_start=trim_start,
|
|
trim_end=trim_end,
|
|
)
|
|
|
|
# Wrapper to convert float percent to int
|
|
def wrapped_callback(percent: float, details: Dict[str, Any]) -> None:
|
|
if progress_callback:
|
|
progress_callback(int(percent), details)
|
|
|
|
return transcode(
|
|
config,
|
|
duration=duration,
|
|
progress_callback=wrapped_callback if progress_callback else None,
|
|
)
|
|
|
|
|
|
class LambdaExecutor(Executor):
|
|
"""Execute jobs via AWS Lambda (future implementation)."""
|
|
|
|
def run(
|
|
self,
|
|
job_id: str,
|
|
source_path: str,
|
|
output_path: str,
|
|
preset: Optional[Dict[str, Any]] = None,
|
|
trim_start: Optional[float] = None,
|
|
trim_end: Optional[float] = None,
|
|
duration: Optional[float] = None,
|
|
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
|
) -> bool:
|
|
"""Execute job via AWS Lambda."""
|
|
raise NotImplementedError("LambdaExecutor not yet implemented")
|
|
|
|
|
|
# Executor registry
|
|
_executors: Dict[str, type] = {
|
|
"local": LocalExecutor,
|
|
"lambda": LambdaExecutor,
|
|
}
|
|
|
|
_executor_instance: Optional[Executor] = None
|
|
|
|
|
|
def get_executor() -> Executor:
|
|
"""Get the configured executor instance."""
|
|
global _executor_instance
|
|
|
|
if _executor_instance is None:
|
|
executor_type = MPR_EXECUTOR.lower()
|
|
if executor_type not in _executors:
|
|
raise ValueError(f"Unknown executor type: {executor_type}")
|
|
_executor_instance = _executors[executor_type]()
|
|
|
|
return _executor_instance
|