163 lines
6.1 KiB
Python
163 lines
6.1 KiB
Python
"""
|
|
Extract frames from video files for OCR processing.
|
|
Supports both regular interval sampling and scene change detection.
|
|
"""
|
|
import cv2
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Optional
|
|
import json
|
|
import logging
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FrameExtractor:
|
|
"""Extract frames from video files."""
|
|
|
|
def __init__(self, video_path: str, output_dir: str = "frames", quality: int = 75):
|
|
"""
|
|
Initialize frame extractor.
|
|
|
|
Args:
|
|
video_path: Path to video file
|
|
output_dir: Directory to save extracted frames
|
|
quality: JPEG quality for saved frames (0-100)
|
|
"""
|
|
self.video_path = video_path
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.quality = quality
|
|
|
|
def extract_by_interval(self, interval_seconds: int = 5) -> List[Tuple[str, float]]:
|
|
"""
|
|
Extract frames at regular intervals.
|
|
|
|
Args:
|
|
interval_seconds: Seconds between frame extractions
|
|
|
|
Returns:
|
|
List of (frame_path, timestamp) tuples
|
|
"""
|
|
cap = cv2.VideoCapture(self.video_path)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
frame_interval = int(fps * interval_seconds)
|
|
|
|
frames_info = []
|
|
frame_count = 0
|
|
saved_count = 0
|
|
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
if frame_count % frame_interval == 0:
|
|
timestamp = frame_count / fps
|
|
frame_filename = f"frame_{saved_count:05d}_{timestamp:.2f}s.jpg"
|
|
frame_path = self.output_dir / frame_filename
|
|
|
|
# Downscale to 1600px width for smaller file size (but still readable)
|
|
height, width = frame.shape[:2]
|
|
if width > 1600:
|
|
ratio = 1600 / width
|
|
new_width = 1600
|
|
new_height = int(height * ratio)
|
|
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
|
|
|
|
# Save with configured quality (matches embed quality)
|
|
cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality])
|
|
frames_info.append((str(frame_path), timestamp))
|
|
saved_count += 1
|
|
|
|
frame_count += 1
|
|
|
|
cap.release()
|
|
logger.info(f"Extracted {saved_count} frames at {interval_seconds}s intervals")
|
|
return frames_info
|
|
|
|
def extract_scene_changes(self, threshold: float = 15.0) -> List[Tuple[str, float]]:
|
|
"""
|
|
Extract frames only on scene changes using FFmpeg.
|
|
More efficient than interval-based extraction.
|
|
|
|
Args:
|
|
threshold: Scene change detection threshold (0-100, lower = more sensitive)
|
|
Default: 15.0 (good for clean UIs like Zed)
|
|
Higher values (20-30) for busy UIs like VS Code
|
|
Lower values (5-10) for very subtle changes
|
|
|
|
Returns:
|
|
List of (frame_path, timestamp) tuples
|
|
"""
|
|
try:
|
|
import ffmpeg
|
|
except ImportError:
|
|
raise ImportError("ffmpeg-python not installed. Run: pip install ffmpeg-python")
|
|
|
|
video_name = Path(self.video_path).stem
|
|
output_pattern = self.output_dir / f"{video_name}_%05d.jpg"
|
|
|
|
try:
|
|
# Use FFmpeg's scene detection filter with downscaling
|
|
stream = ffmpeg.input(self.video_path)
|
|
stream = ffmpeg.filter(stream, 'select', f'gt(scene,{threshold/100})')
|
|
stream = ffmpeg.filter(stream, 'showinfo')
|
|
# Scale to 1600px width (maintains aspect ratio, still readable)
|
|
# Use simple conditional: if width > 1600, scale to 1600, else keep original
|
|
stream = ffmpeg.filter(stream, 'scale', w='min(1600,iw)', h=-1)
|
|
|
|
# Convert JPEG quality (0-100) to FFmpeg qscale (2-31, lower=better)
|
|
# Rough mapping: qscale ≈ (100 - quality) / 10, clamped to 2-31
|
|
qscale = max(2, min(31, int((100 - self.quality) / 10 + 2)))
|
|
|
|
stream = ffmpeg.output(
|
|
stream,
|
|
str(output_pattern),
|
|
vsync='vfr',
|
|
frame_pts=1,
|
|
**{'q:v': str(qscale)} # Matches configured quality
|
|
)
|
|
|
|
# Run with stderr capture to get showinfo output
|
|
_, stderr = ffmpeg.run(stream, capture_stderr=True, overwrite_output=True)
|
|
stderr = stderr.decode('utf-8')
|
|
|
|
# Parse FFmpeg output to get frame timestamps from showinfo filter
|
|
frames_info = []
|
|
|
|
# Extract timestamps from stderr (showinfo outputs there)
|
|
timestamp_pattern = r'pts_time:([\d.]+)'
|
|
timestamps = re.findall(timestamp_pattern, stderr)
|
|
|
|
# Match frames to timestamps
|
|
frame_files = sorted(self.output_dir.glob(f"{video_name}_*.jpg"))
|
|
|
|
for idx, img in enumerate(frame_files):
|
|
# Use extracted timestamp or fallback to index-based estimate
|
|
timestamp = float(timestamps[idx]) if idx < len(timestamps) else idx * 5.0
|
|
frames_info.append((str(img), timestamp))
|
|
|
|
logger.info(f"Extracted {len(frames_info)} frames at scene changes")
|
|
return frames_info
|
|
|
|
except ffmpeg.Error as e:
|
|
logger.error(f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}")
|
|
# Fallback to interval extraction
|
|
logger.warning("Falling back to interval extraction...")
|
|
return self.extract_by_interval()
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during scene extraction: {e}")
|
|
logger.warning("Falling back to interval extraction...")
|
|
return self.extract_by_interval()
|
|
|
|
def get_video_duration(self) -> float:
|
|
"""Get video duration in seconds."""
|
|
cap = cv2.VideoCapture(self.video_path)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
duration = frame_count / fps if fps > 0 else 0
|
|
cap.release()
|
|
return duration
|