Files
mitus/meetus/ocr_processor.py
Mariano Gabriel 118ef04223 embed images
2025-10-28 08:02:45 -03:00

174 lines
5.9 KiB
Python

"""
OCR processing for extracted video frames.
Supports multiple OCR engines and text deduplication.
"""
from typing import List, Tuple, Dict, Optional
from pathlib import Path
from difflib import SequenceMatcher
import re
import logging
logger = logging.getLogger(__name__)
class OCRProcessor:
"""Process frames with OCR to extract text."""
def __init__(self, engine: str = "tesseract", lang: str = "eng"):
"""
Initialize OCR processor.
Args:
engine: OCR engine to use ('tesseract', 'easyocr', 'paddleocr')
lang: Language code for OCR
"""
self.engine = engine.lower()
self.lang = lang
self._ocr_engine = None
self._init_engine()
def _init_engine(self):
"""Initialize the selected OCR engine."""
if self.engine == "tesseract":
try:
import pytesseract
self._ocr_engine = pytesseract
except ImportError:
raise ImportError("pytesseract not installed. Run: pip install pytesseract")
elif self.engine == "easyocr":
try:
import easyocr
self._ocr_engine = easyocr.Reader([self.lang])
except ImportError:
raise ImportError("easyocr not installed. Run: pip install easyocr")
elif self.engine == "paddleocr":
try:
from paddleocr import PaddleOCR
self._ocr_engine = PaddleOCR(lang=self.lang, use_angle_cls=True, show_log=False)
except ImportError:
raise ImportError("paddleocr not installed. Run: pip install paddleocr")
else:
raise ValueError(f"Unknown OCR engine: {self.engine}")
def extract_text(self, image_path: str, preserve_layout: bool = True) -> str:
"""
Extract text from a single image.
Args:
image_path: Path to image file
preserve_layout: Try to preserve whitespace and layout
Returns:
Extracted text
"""
if self.engine == "tesseract":
from PIL import Image
import pytesseract
image = Image.open(image_path)
# Use PSM 6 (uniform block of text) to preserve layout better
config = '--psm 6' if preserve_layout else ''
text = pytesseract.image_to_string(image, config=config)
elif self.engine == "easyocr":
result = self._ocr_engine.readtext(image_path, detail=0)
text = "\n".join(result)
elif self.engine == "paddleocr":
result = self._ocr_engine.ocr(image_path, cls=True)
if result and result[0]:
text = "\n".join([line[1][0] for line in result[0]])
else:
text = ""
return self._clean_text(text)
def _clean_text(self, text: str, preserve_indentation: bool = True) -> str:
"""
Clean up OCR output.
Args:
text: Raw OCR text
preserve_indentation: Keep leading whitespace on lines
Returns:
Cleaned text
"""
if preserve_indentation:
# Remove excessive blank lines but preserve indentation
lines = text.split('\n')
cleaned_lines = []
for line in lines:
# Keep line if it has content or is single empty line
if line.strip() or (cleaned_lines and cleaned_lines[-1].strip()):
cleaned_lines.append(line)
return '\n'.join(cleaned_lines).strip()
else:
# Original aggressive cleaning
text = re.sub(r'\n\s*\n', '\n', text)
text = re.sub(r' +', ' ', text)
return text.strip()
def process_frames(
self,
frames_info: List[Tuple[str, float]],
deduplicate: bool = True,
similarity_threshold: float = 0.85
) -> List[Dict]:
"""
Process multiple frames and extract text.
Args:
frames_info: List of (frame_path, timestamp) tuples
deduplicate: Whether to remove similar consecutive texts
similarity_threshold: Threshold for considering texts as duplicates (0-1)
Returns:
List of dicts with 'timestamp', 'text', and 'frame_path'
"""
results = []
prev_text = ""
for idx, (frame_path, timestamp) in enumerate(frames_info, 1):
logger.debug(f"Processing frame {idx}/{len(frames_info)} at {timestamp:.2f}s...")
text = self.extract_text(frame_path)
if not text:
logger.debug(f"No text extracted from frame at {timestamp:.2f}s")
continue
# Debug: Show what was extracted
logger.debug(f"Frame {idx} ({timestamp:.2f}s): Extracted {len(text)} chars")
logger.debug(f"Content preview: {text[:150]}{'...' if len(text) > 150 else ''}")
# Deduplicate similar consecutive frames
if deduplicate and prev_text:
similarity = self._text_similarity(prev_text, text)
logger.debug(f"Similarity to previous frame: {similarity:.2f} (threshold: {similarity_threshold})")
if similarity > similarity_threshold:
logger.debug(f"⊘ Skipping duplicate frame at {timestamp:.2f}s (similarity: {similarity:.2f})")
continue
results.append({
'timestamp': timestamp,
'text': text,
'frame_path': frame_path
})
prev_text = text
logger.info(f"Extracted text from {len(results)} frames (deduplication: {deduplicate})")
return results
def _text_similarity(self, text1: str, text2: str) -> float:
"""
Calculate similarity between two texts.
Returns:
Similarity score between 0 and 1
"""
return SequenceMatcher(None, text1, text2).ratio()