""" Extract frames from video files for OCR processing. Supports both regular interval sampling and scene change detection. """ import cv2 import os from pathlib import Path from typing import List, Tuple, Optional import subprocess import json import logging logger = logging.getLogger(__name__) class FrameExtractor: """Extract frames from video files.""" def __init__(self, video_path: str, output_dir: str = "frames"): """ Initialize frame extractor. Args: video_path: Path to video file output_dir: Directory to save extracted frames """ self.video_path = video_path self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) def extract_by_interval(self, interval_seconds: int = 5) -> List[Tuple[str, float]]: """ Extract frames at regular intervals. Args: interval_seconds: Seconds between frame extractions Returns: List of (frame_path, timestamp) tuples """ cap = cv2.VideoCapture(self.video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_interval = int(fps * interval_seconds) frames_info = [] frame_count = 0 saved_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % frame_interval == 0: timestamp = frame_count / fps frame_filename = f"frame_{saved_count:05d}_{timestamp:.2f}s.jpg" frame_path = self.output_dir / frame_filename cv2.imwrite(str(frame_path), frame) frames_info.append((str(frame_path), timestamp)) saved_count += 1 frame_count += 1 cap.release() logger.info(f"Extracted {saved_count} frames at {interval_seconds}s intervals") return frames_info def extract_scene_changes(self, threshold: float = 30.0) -> List[Tuple[str, float]]: """ Extract frames only on scene changes using FFmpeg. More efficient than interval-based extraction. Args: threshold: Scene change detection threshold (0-100, lower = more sensitive) Returns: List of (frame_path, timestamp) tuples """ video_name = Path(self.video_path).stem output_pattern = self.output_dir / f"{video_name}_%05d.jpg" # Use FFmpeg's scene detection filter cmd = [ 'ffmpeg', '-i', self.video_path, '-vf', f'select=gt(scene\\,{threshold/100}),showinfo', '-vsync', 'vfr', '-frame_pts', '1', str(output_pattern), '-loglevel', 'info' ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) # Parse FFmpeg output to get frame timestamps from showinfo filter import re frames_info = [] # Extract timestamps from stderr (showinfo outputs there) timestamp_pattern = r'pts_time:([\d.]+)' timestamps = re.findall(timestamp_pattern, result.stderr) # Match frames to timestamps frame_files = sorted(self.output_dir.glob(f"{video_name}_*.jpg")) for idx, img in enumerate(frame_files): # Use extracted timestamp or fallback to index-based estimate timestamp = float(timestamps[idx]) if idx < len(timestamps) else idx * 5.0 frames_info.append((str(img), timestamp)) logger.info(f"Extracted {len(frames_info)} frames at scene changes") return frames_info except subprocess.CalledProcessError as e: logger.error(f"FFmpeg error: {e.stderr}") # Fallback to interval extraction logger.warning("Falling back to interval extraction...") return self.extract_by_interval() def get_video_duration(self) -> float: """Get video duration in seconds.""" cap = cv2.VideoCapture(self.video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps if fps > 0 else 0 cap.release() return duration