init commit

This commit is contained in:
Mariano Gabriel
2025-10-19 22:17:38 -03:00
commit 93e0c06d38
10 changed files with 969 additions and 0 deletions

0
meetus/__init__.py Normal file
View File

119
meetus/frame_extractor.py Normal file
View File

@@ -0,0 +1,119 @@
"""
Extract frames from video files for OCR processing.
Supports both regular interval sampling and scene change detection.
"""
import cv2
import os
from pathlib import Path
from typing import List, Tuple, Optional
import subprocess
import json
import logging
logger = logging.getLogger(__name__)
class FrameExtractor:
"""Extract frames from video files."""
def __init__(self, video_path: str, output_dir: str = "frames"):
"""
Initialize frame extractor.
Args:
video_path: Path to video file
output_dir: Directory to save extracted frames
"""
self.video_path = video_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def extract_by_interval(self, interval_seconds: int = 5) -> List[Tuple[str, float]]:
"""
Extract frames at regular intervals.
Args:
interval_seconds: Seconds between frame extractions
Returns:
List of (frame_path, timestamp) tuples
"""
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * interval_seconds)
frames_info = []
frame_count = 0
saved_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
timestamp = frame_count / fps
frame_filename = f"frame_{saved_count:05d}_{timestamp:.2f}s.jpg"
frame_path = self.output_dir / frame_filename
cv2.imwrite(str(frame_path), frame)
frames_info.append((str(frame_path), timestamp))
saved_count += 1
frame_count += 1
cap.release()
logger.info(f"Extracted {saved_count} frames at {interval_seconds}s intervals")
return frames_info
def extract_scene_changes(self, threshold: float = 30.0) -> List[Tuple[str, float]]:
"""
Extract frames only on scene changes using FFmpeg.
More efficient than interval-based extraction.
Args:
threshold: Scene change detection threshold (0-100, lower = more sensitive)
Returns:
List of (frame_path, timestamp) tuples
"""
video_name = Path(self.video_path).stem
output_pattern = self.output_dir / f"{video_name}_%05d.jpg"
# Use FFmpeg's scene detection filter
cmd = [
'ffmpeg',
'-i', self.video_path,
'-vf', f'select=gt(scene\\,{threshold/100}),showinfo',
'-vsync', 'vfr',
'-frame_pts', '1',
str(output_pattern),
'-loglevel', 'info'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Parse output to get frame timestamps
frames_info = []
for img in sorted(self.output_dir.glob(f"{video_name}_*.jpg")):
# Extract timestamp from filename or use FFprobe
frames_info.append((str(img), 0.0)) # Timestamp extraction can be enhanced
logger.info(f"Extracted {len(frames_info)} frames at scene changes")
return frames_info
except subprocess.CalledProcessError as e:
logger.error(f"FFmpeg error: {e.stderr}")
# Fallback to interval extraction
logger.warning("Falling back to interval extraction...")
return self.extract_by_interval()
def get_video_duration(self) -> float:
"""Get video duration in seconds."""
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps if fps > 0 else 0
cap.release()
return duration

143
meetus/ocr_processor.py Normal file
View File

@@ -0,0 +1,143 @@
"""
OCR processing for extracted video frames.
Supports multiple OCR engines and text deduplication.
"""
from typing import List, Tuple, Dict, Optional
from pathlib import Path
from difflib import SequenceMatcher
import re
import logging
logger = logging.getLogger(__name__)
class OCRProcessor:
"""Process frames with OCR to extract text."""
def __init__(self, engine: str = "tesseract", lang: str = "eng"):
"""
Initialize OCR processor.
Args:
engine: OCR engine to use ('tesseract', 'easyocr', 'paddleocr')
lang: Language code for OCR
"""
self.engine = engine.lower()
self.lang = lang
self._ocr_engine = None
self._init_engine()
def _init_engine(self):
"""Initialize the selected OCR engine."""
if self.engine == "tesseract":
try:
import pytesseract
self._ocr_engine = pytesseract
except ImportError:
raise ImportError("pytesseract not installed. Run: pip install pytesseract")
elif self.engine == "easyocr":
try:
import easyocr
self._ocr_engine = easyocr.Reader([self.lang])
except ImportError:
raise ImportError("easyocr not installed. Run: pip install easyocr")
elif self.engine == "paddleocr":
try:
from paddleocr import PaddleOCR
self._ocr_engine = PaddleOCR(lang=self.lang, use_angle_cls=True, show_log=False)
except ImportError:
raise ImportError("paddleocr not installed. Run: pip install paddleocr")
else:
raise ValueError(f"Unknown OCR engine: {self.engine}")
def extract_text(self, image_path: str) -> str:
"""
Extract text from a single image.
Args:
image_path: Path to image file
Returns:
Extracted text
"""
if self.engine == "tesseract":
from PIL import Image
image = Image.open(image_path)
text = self._ocr_engine.image_to_string(image)
elif self.engine == "easyocr":
result = self._ocr_engine.readtext(image_path, detail=0)
text = "\n".join(result)
elif self.engine == "paddleocr":
result = self._ocr_engine.ocr(image_path, cls=True)
if result and result[0]:
text = "\n".join([line[1][0] for line in result[0]])
else:
text = ""
return self._clean_text(text)
def _clean_text(self, text: str) -> str:
"""Clean up OCR output."""
# Remove excessive whitespace
text = re.sub(r'\n\s*\n', '\n', text)
text = re.sub(r' +', ' ', text)
return text.strip()
def process_frames(
self,
frames_info: List[Tuple[str, float]],
deduplicate: bool = True,
similarity_threshold: float = 0.85
) -> List[Dict]:
"""
Process multiple frames and extract text.
Args:
frames_info: List of (frame_path, timestamp) tuples
deduplicate: Whether to remove similar consecutive texts
similarity_threshold: Threshold for considering texts as duplicates (0-1)
Returns:
List of dicts with 'timestamp', 'text', and 'frame_path'
"""
results = []
prev_text = ""
for frame_path, timestamp in frames_info:
logger.debug(f"Processing frame at {timestamp:.2f}s...")
text = self.extract_text(frame_path)
if not text:
continue
# Deduplicate similar consecutive frames
if deduplicate:
similarity = self._text_similarity(prev_text, text)
if similarity > similarity_threshold:
logger.debug(f"Skipping duplicate frame at {timestamp:.2f}s (similarity: {similarity:.2f})")
continue
results.append({
'timestamp': timestamp,
'text': text,
'frame_path': frame_path
})
prev_text = text
logger.info(f"Extracted text from {len(results)} frames (deduplication: {deduplicate})")
return results
def _text_similarity(self, text1: str, text2: str) -> float:
"""
Calculate similarity between two texts.
Returns:
Similarity score between 0 and 1
"""
return SequenceMatcher(None, text1, text2).ratio()

173
meetus/transcript_merger.py Normal file
View File

@@ -0,0 +1,173 @@
"""
Merge Whisper transcripts with OCR screen content.
Creates a unified, timestamped transcript for Claude summarization.
"""
from typing import List, Dict, Optional
import json
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class TranscriptMerger:
"""Merge audio transcripts with screen OCR text."""
def __init__(self):
"""Initialize transcript merger."""
pass
def load_whisper_transcript(self, transcript_path: str) -> List[Dict]:
"""
Load Whisper transcript from file.
Supports both JSON format (with timestamps) and plain text.
Args:
transcript_path: Path to transcript file
Returns:
List of dicts with 'timestamp' (optional) and 'text'
"""
path = Path(transcript_path)
if path.suffix == '.json':
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle different Whisper output formats
if isinstance(data, dict) and 'segments' in data:
# Standard Whisper JSON format
return [
{
'timestamp': seg.get('start', 0),
'text': seg['text'].strip(),
'type': 'audio'
}
for seg in data['segments']
]
elif isinstance(data, list):
# List of segments
return [
{
'timestamp': seg.get('start', seg.get('timestamp', 0)),
'text': seg['text'].strip(),
'type': 'audio'
}
for seg in data
]
else:
# Plain text file - no timestamps
with open(path, 'r', encoding='utf-8') as f:
text = f.read().strip()
return [{
'timestamp': 0,
'text': text,
'type': 'audio'
}]
def merge_transcripts(
self,
audio_segments: List[Dict],
screen_segments: List[Dict]
) -> List[Dict]:
"""
Merge audio and screen transcripts by timestamp.
Args:
audio_segments: List of audio transcript segments
screen_segments: List of screen OCR segments
Returns:
Merged list sorted by timestamp
"""
# Mark segment types
for seg in audio_segments:
seg['type'] = 'audio'
for seg in screen_segments:
seg['type'] = 'screen'
# Combine and sort by timestamp
all_segments = audio_segments + screen_segments
all_segments.sort(key=lambda x: x['timestamp'])
return all_segments
def format_for_claude(
self,
merged_segments: List[Dict],
format_style: str = "detailed"
) -> str:
"""
Format merged transcript for Claude processing.
Args:
merged_segments: Merged transcript segments
format_style: 'detailed' or 'compact'
Returns:
Formatted transcript string
"""
if format_style == "detailed":
return self._format_detailed(merged_segments)
else:
return self._format_compact(merged_segments)
def _format_detailed(self, segments: List[Dict]) -> str:
"""Format with clear visual separation between audio and screen content."""
lines = []
lines.append("=" * 80)
lines.append("ENHANCED MEETING TRANSCRIPT")
lines.append("Audio transcript + Screen content")
lines.append("=" * 80)
lines.append("")
for seg in segments:
timestamp = self._format_timestamp(seg['timestamp'])
if seg['type'] == 'audio':
lines.append(f"[{timestamp}] SPEAKER:")
lines.append(f" {seg['text']}")
lines.append("")
else: # screen
lines.append(f"[{timestamp}] SCREEN CONTENT:")
# Indent screen text for visibility
screen_text = seg['text'].replace('\n', '\n | ')
lines.append(f" | {screen_text}")
lines.append("")
return "\n".join(lines)
def _format_compact(self, segments: List[Dict]) -> str:
"""Compact format for shorter transcripts."""
lines = []
for seg in segments:
timestamp = self._format_timestamp(seg['timestamp'])
prefix = "SPEAKER" if seg['type'] == 'audio' else "SCREEN"
text = seg['text'].replace('\n', ' ')[:200] # Truncate long screen text
lines.append(f"[{timestamp}] {prefix}: {text}")
return "\n".join(lines)
def _format_timestamp(self, seconds: float) -> str:
"""Format timestamp as MM:SS."""
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes:02d}:{secs:02d}"
def save_transcript(self, formatted_text: str, output_path: str):
"""
Save formatted transcript to file.
Args:
formatted_text: Formatted transcript
output_path: Output file path
"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(formatted_text)
logger.info(f"Saved enhanced transcript to: {output_path}")

0
meetus/utils/__init__.py Normal file
View File