Files
mitus/meetus/frame_extractor.py
2025-10-28 05:52:31 -03:00

145 lines
5.1 KiB
Python

"""
Extract frames from video files for OCR processing.
Supports both regular interval sampling and scene change detection.
"""
import cv2
import os
from pathlib import Path
from typing import List, Tuple, Optional
import json
import logging
import re
logger = logging.getLogger(__name__)
class FrameExtractor:
"""Extract frames from video files."""
def __init__(self, video_path: str, output_dir: str = "frames"):
"""
Initialize frame extractor.
Args:
video_path: Path to video file
output_dir: Directory to save extracted frames
"""
self.video_path = video_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def extract_by_interval(self, interval_seconds: int = 5) -> List[Tuple[str, float]]:
"""
Extract frames at regular intervals.
Args:
interval_seconds: Seconds between frame extractions
Returns:
List of (frame_path, timestamp) tuples
"""
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * interval_seconds)
frames_info = []
frame_count = 0
saved_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
timestamp = frame_count / fps
frame_filename = f"frame_{saved_count:05d}_{timestamp:.2f}s.jpg"
frame_path = self.output_dir / frame_filename
# Use high quality for text readability (95 = high quality JPEG)
cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
frames_info.append((str(frame_path), timestamp))
saved_count += 1
frame_count += 1
cap.release()
logger.info(f"Extracted {saved_count} frames at {interval_seconds}s intervals")
return frames_info
def extract_scene_changes(self, threshold: float = 15.0) -> List[Tuple[str, float]]:
"""
Extract frames only on scene changes using FFmpeg.
More efficient than interval-based extraction.
Args:
threshold: Scene change detection threshold (0-100, lower = more sensitive)
Default: 15.0 (good for clean UIs like Zed)
Higher values (20-30) for busy UIs like VS Code
Lower values (5-10) for very subtle changes
Returns:
List of (frame_path, timestamp) tuples
"""
try:
import ffmpeg
except ImportError:
raise ImportError("ffmpeg-python not installed. Run: pip install ffmpeg-python")
video_name = Path(self.video_path).stem
output_pattern = self.output_dir / f"{video_name}_%05d.jpg"
try:
# Use FFmpeg's scene detection filter with high quality output
stream = ffmpeg.input(self.video_path)
stream = ffmpeg.filter(stream, 'select', f'gt(scene,{threshold/100})')
stream = ffmpeg.filter(stream, 'showinfo')
stream = ffmpeg.output(
stream,
str(output_pattern),
vsync='vfr',
frame_pts=1,
**{'q:v': '2'} # High quality JPEG
)
# Run with stderr capture to get showinfo output
_, stderr = ffmpeg.run(stream, capture_stderr=True, overwrite_output=True)
stderr = stderr.decode('utf-8')
# Parse FFmpeg output to get frame timestamps from showinfo filter
frames_info = []
# Extract timestamps from stderr (showinfo outputs there)
timestamp_pattern = r'pts_time:([\d.]+)'
timestamps = re.findall(timestamp_pattern, stderr)
# Match frames to timestamps
frame_files = sorted(self.output_dir.glob(f"{video_name}_*.jpg"))
for idx, img in enumerate(frame_files):
# Use extracted timestamp or fallback to index-based estimate
timestamp = float(timestamps[idx]) if idx < len(timestamps) else idx * 5.0
frames_info.append((str(img), timestamp))
logger.info(f"Extracted {len(frames_info)} frames at scene changes")
return frames_info
except ffmpeg.Error as e:
logger.error(f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}")
# Fallback to interval extraction
logger.warning("Falling back to interval extraction...")
return self.extract_by_interval()
except Exception as e:
logger.error(f"Unexpected error during scene extraction: {e}")
logger.warning("Falling back to interval extraction...")
return self.extract_by_interval()
def get_video_duration(self) -> float:
"""Get video duration in seconds."""
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps if fps > 0 else 0
cap.release()
return duration