Files
mitus/meetus/frame_extractor.py
Mariano Gabriel cdf7ad1199 update prompts
2025-10-20 17:36:31 -03:00

130 lines
4.2 KiB
Python

"""
Extract frames from video files for OCR processing.
Supports both regular interval sampling and scene change detection.
"""
import cv2
import os
from pathlib import Path
from typing import List, Tuple, Optional
import subprocess
import json
import logging
logger = logging.getLogger(__name__)
class FrameExtractor:
"""Extract frames from video files."""
def __init__(self, video_path: str, output_dir: str = "frames"):
"""
Initialize frame extractor.
Args:
video_path: Path to video file
output_dir: Directory to save extracted frames
"""
self.video_path = video_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def extract_by_interval(self, interval_seconds: int = 5) -> List[Tuple[str, float]]:
"""
Extract frames at regular intervals.
Args:
interval_seconds: Seconds between frame extractions
Returns:
List of (frame_path, timestamp) tuples
"""
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * interval_seconds)
frames_info = []
frame_count = 0
saved_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
timestamp = frame_count / fps
frame_filename = f"frame_{saved_count:05d}_{timestamp:.2f}s.jpg"
frame_path = self.output_dir / frame_filename
cv2.imwrite(str(frame_path), frame)
frames_info.append((str(frame_path), timestamp))
saved_count += 1
frame_count += 1
cap.release()
logger.info(f"Extracted {saved_count} frames at {interval_seconds}s intervals")
return frames_info
def extract_scene_changes(self, threshold: float = 30.0) -> List[Tuple[str, float]]:
"""
Extract frames only on scene changes using FFmpeg.
More efficient than interval-based extraction.
Args:
threshold: Scene change detection threshold (0-100, lower = more sensitive)
Returns:
List of (frame_path, timestamp) tuples
"""
video_name = Path(self.video_path).stem
output_pattern = self.output_dir / f"{video_name}_%05d.jpg"
# Use FFmpeg's scene detection filter
cmd = [
'ffmpeg',
'-i', self.video_path,
'-vf', f'select=gt(scene\\,{threshold/100}),showinfo',
'-vsync', 'vfr',
'-frame_pts', '1',
str(output_pattern),
'-loglevel', 'info'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Parse FFmpeg output to get frame timestamps from showinfo filter
import re
frames_info = []
# Extract timestamps from stderr (showinfo outputs there)
timestamp_pattern = r'pts_time:([\d.]+)'
timestamps = re.findall(timestamp_pattern, result.stderr)
# Match frames to timestamps
frame_files = sorted(self.output_dir.glob(f"{video_name}_*.jpg"))
for idx, img in enumerate(frame_files):
# Use extracted timestamp or fallback to index-based estimate
timestamp = float(timestamps[idx]) if idx < len(timestamps) else idx * 5.0
frames_info.append((str(img), timestamp))
logger.info(f"Extracted {len(frames_info)} frames at scene changes")
return frames_info
except subprocess.CalledProcessError as e:
logger.error(f"FFmpeg error: {e.stderr}")
# Fallback to interval extraction
logger.warning("Falling back to interval extraction...")
return self.extract_by_interval()
def get_video_duration(self) -> float:
"""Get video duration in seconds."""
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps if fps > 0 else 0
cap.release()
return duration