Files
mitus/meetus/frame_extractor.py
Mariano Gabriel 118ef04223 embed images
2025-10-28 08:02:45 -03:00

163 lines
6.1 KiB
Python

"""
Extract frames from video files for OCR processing.
Supports both regular interval sampling and scene change detection.
"""
import cv2
import os
from pathlib import Path
from typing import List, Tuple, Optional
import json
import logging
import re
logger = logging.getLogger(__name__)
class FrameExtractor:
"""Extract frames from video files."""
def __init__(self, video_path: str, output_dir: str = "frames", quality: int = 75):
"""
Initialize frame extractor.
Args:
video_path: Path to video file
output_dir: Directory to save extracted frames
quality: JPEG quality for saved frames (0-100)
"""
self.video_path = video_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.quality = quality
def extract_by_interval(self, interval_seconds: int = 5) -> List[Tuple[str, float]]:
"""
Extract frames at regular intervals.
Args:
interval_seconds: Seconds between frame extractions
Returns:
List of (frame_path, timestamp) tuples
"""
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * interval_seconds)
frames_info = []
frame_count = 0
saved_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
timestamp = frame_count / fps
frame_filename = f"frame_{saved_count:05d}_{timestamp:.2f}s.jpg"
frame_path = self.output_dir / frame_filename
# Downscale to 1600px width for smaller file size (but still readable)
height, width = frame.shape[:2]
if width > 1600:
ratio = 1600 / width
new_width = 1600
new_height = int(height * ratio)
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
# Save with configured quality (matches embed quality)
cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality])
frames_info.append((str(frame_path), timestamp))
saved_count += 1
frame_count += 1
cap.release()
logger.info(f"Extracted {saved_count} frames at {interval_seconds}s intervals")
return frames_info
def extract_scene_changes(self, threshold: float = 15.0) -> List[Tuple[str, float]]:
"""
Extract frames only on scene changes using FFmpeg.
More efficient than interval-based extraction.
Args:
threshold: Scene change detection threshold (0-100, lower = more sensitive)
Default: 15.0 (good for clean UIs like Zed)
Higher values (20-30) for busy UIs like VS Code
Lower values (5-10) for very subtle changes
Returns:
List of (frame_path, timestamp) tuples
"""
try:
import ffmpeg
except ImportError:
raise ImportError("ffmpeg-python not installed. Run: pip install ffmpeg-python")
video_name = Path(self.video_path).stem
output_pattern = self.output_dir / f"{video_name}_%05d.jpg"
try:
# Use FFmpeg's scene detection filter with downscaling
stream = ffmpeg.input(self.video_path)
stream = ffmpeg.filter(stream, 'select', f'gt(scene,{threshold/100})')
stream = ffmpeg.filter(stream, 'showinfo')
# Scale to 1600px width (maintains aspect ratio, still readable)
# Use simple conditional: if width > 1600, scale to 1600, else keep original
stream = ffmpeg.filter(stream, 'scale', w='min(1600,iw)', h=-1)
# Convert JPEG quality (0-100) to FFmpeg qscale (2-31, lower=better)
# Rough mapping: qscale ≈ (100 - quality) / 10, clamped to 2-31
qscale = max(2, min(31, int((100 - self.quality) / 10 + 2)))
stream = ffmpeg.output(
stream,
str(output_pattern),
vsync='vfr',
frame_pts=1,
**{'q:v': str(qscale)} # Matches configured quality
)
# Run with stderr capture to get showinfo output
_, stderr = ffmpeg.run(stream, capture_stderr=True, overwrite_output=True)
stderr = stderr.decode('utf-8')
# Parse FFmpeg output to get frame timestamps from showinfo filter
frames_info = []
# Extract timestamps from stderr (showinfo outputs there)
timestamp_pattern = r'pts_time:([\d.]+)'
timestamps = re.findall(timestamp_pattern, stderr)
# Match frames to timestamps
frame_files = sorted(self.output_dir.glob(f"{video_name}_*.jpg"))
for idx, img in enumerate(frame_files):
# Use extracted timestamp or fallback to index-based estimate
timestamp = float(timestamps[idx]) if idx < len(timestamps) else idx * 5.0
frames_info.append((str(img), timestamp))
logger.info(f"Extracted {len(frames_info)} frames at scene changes")
return frames_info
except ffmpeg.Error as e:
logger.error(f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}")
# Fallback to interval extraction
logger.warning("Falling back to interval extraction...")
return self.extract_by_interval()
except Exception as e:
logger.error(f"Unexpected error during scene extraction: {e}")
logger.warning("Falling back to interval extraction...")
return self.extract_by_interval()
def get_video_duration(self) -> float:
"""Get video duration in seconds."""
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps if fps > 0 else 0
cap.release()
return duration