""" FFmpeg transcode module - Transcode media files using ffmpeg-python. """ import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Dict, List, Optional import ffmpeg @dataclass class TranscodeConfig: """Configuration for a transcode operation.""" input_path: str output_path: str # Video video_codec: str = "libx264" video_bitrate: Optional[str] = None video_crf: Optional[int] = None video_preset: Optional[str] = None resolution: Optional[str] = None framerate: Optional[float] = None # Audio audio_codec: str = "aac" audio_bitrate: Optional[str] = None audio_channels: Optional[int] = None audio_samplerate: Optional[int] = None # Trimming trim_start: Optional[float] = None trim_end: Optional[float] = None # Container container: str = "mp4" # Extra args (key-value pairs) extra_args: List[str] = field(default_factory=list) @property def is_copy(self) -> bool: """Check if this is a stream copy (no transcoding).""" return self.video_codec == "copy" and self.audio_codec == "copy" def build_stream(config: TranscodeConfig): """ Build an ffmpeg-python stream from config. Returns the stream object ready to run. """ # Input options input_kwargs = {} if config.trim_start is not None: input_kwargs["ss"] = config.trim_start stream = ffmpeg.input(config.input_path, **input_kwargs) # Output options output_kwargs = { "vcodec": config.video_codec, "acodec": config.audio_codec, } # Trimming duration if config.trim_end is not None: if config.trim_start is not None: output_kwargs["t"] = config.trim_end - config.trim_start else: output_kwargs["t"] = config.trim_end # Video options (skip if copy) if config.video_codec != "copy": if config.video_crf is not None: output_kwargs["crf"] = config.video_crf elif config.video_bitrate: output_kwargs["video_bitrate"] = config.video_bitrate if config.video_preset: output_kwargs["preset"] = config.video_preset if config.resolution: output_kwargs["s"] = config.resolution if config.framerate: output_kwargs["r"] = config.framerate # Audio options (skip if copy) if config.audio_codec != "copy": if config.audio_bitrate: output_kwargs["audio_bitrate"] = config.audio_bitrate if config.audio_channels: output_kwargs["ac"] = config.audio_channels if config.audio_samplerate: output_kwargs["ar"] = config.audio_samplerate # Parse extra args into kwargs extra_kwargs = parse_extra_args(config.extra_args) output_kwargs.update(extra_kwargs) stream = ffmpeg.output(stream, config.output_path, **output_kwargs) stream = ffmpeg.overwrite_output(stream) return stream def parse_extra_args(extra_args: List[str]) -> Dict[str, Any]: """ Parse extra args list into kwargs dict. ["-vtag", "xvid", "-pix_fmt", "yuv420p"] -> {"vtag": "xvid", "pix_fmt": "yuv420p"} """ kwargs = {} i = 0 while i < len(extra_args): key = extra_args[i].lstrip("-") if i + 1 < len(extra_args) and not extra_args[i + 1].startswith("-"): kwargs[key] = extra_args[i + 1] i += 2 else: # Flag without value kwargs[key] = None i += 1 return kwargs def transcode( config: TranscodeConfig, duration: Optional[float] = None, progress_callback: Optional[Callable[[float, Dict[str, Any]], None]] = None, ) -> bool: """ Transcode a media file. Args: config: Transcode configuration duration: Total duration in seconds (for progress calculation, optional) progress_callback: Called with (percent, details_dict) - requires duration Returns: True if successful Raises: ffmpeg.Error: If transcoding fails """ # Ensure output directory exists Path(config.output_path).parent.mkdir(parents=True, exist_ok=True) stream = build_stream(config) if progress_callback and duration: # Run with progress tracking using run_async return _run_with_progress(stream, config, duration, progress_callback) else: # Run synchronously ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) return True def _run_with_progress( stream, config: TranscodeConfig, duration: float, progress_callback: Callable[[float, Dict[str, Any]], None], ) -> bool: """Run FFmpeg with progress tracking using run_async and stderr parsing.""" import re # Calculate effective duration effective_duration = duration if config.trim_start and config.trim_end: effective_duration = config.trim_end - config.trim_start elif config.trim_end: effective_duration = config.trim_end elif config.trim_start: effective_duration = duration - config.trim_start # Run async to get process handle process = ffmpeg.run_async(stream, pipe_stdout=True, pipe_stderr=True) # Parse stderr for progress (time=HH:MM:SS.ms pattern) time_pattern = re.compile(r"time=(\d+):(\d+):(\d+)\.(\d+)") while True: line = process.stderr.readline() if not line: break line = line.decode("utf-8", errors="ignore") match = time_pattern.search(line) if match: hours = int(match.group(1)) minutes = int(match.group(2)) seconds = int(match.group(3)) ms = int(match.group(4)) current_time = hours * 3600 + minutes * 60 + seconds + ms / 100 percent = min(100.0, (current_time / effective_duration) * 100) progress_callback( percent, { "time": current_time, "percent": percent, }, ) # Wait for completion process.wait() if process.returncode != 0: raise ffmpeg.Error( "ffmpeg", stdout=process.stdout.read(), stderr=process.stderr.read() ) # Final callback progress_callback( 100.0, {"time": effective_duration, "percent": 100.0, "done": True} ) return True