""" OCR processing for extracted video frames. Supports multiple OCR engines and text deduplication. """ from typing import List, Tuple, Dict, Optional from pathlib import Path from difflib import SequenceMatcher import re import logging logger = logging.getLogger(__name__) class OCRProcessor: """Process frames with OCR to extract text.""" def __init__(self, engine: str = "tesseract", lang: str = "eng"): """ Initialize OCR processor. Args: engine: OCR engine to use ('tesseract', 'easyocr', 'paddleocr') lang: Language code for OCR """ self.engine = engine.lower() self.lang = lang self._ocr_engine = None self._init_engine() def _init_engine(self): """Initialize the selected OCR engine.""" if self.engine == "tesseract": try: import pytesseract self._ocr_engine = pytesseract except ImportError: raise ImportError("pytesseract not installed. Run: pip install pytesseract") elif self.engine == "easyocr": try: import easyocr self._ocr_engine = easyocr.Reader([self.lang]) except ImportError: raise ImportError("easyocr not installed. Run: pip install easyocr") elif self.engine == "paddleocr": try: from paddleocr import PaddleOCR self._ocr_engine = PaddleOCR(lang=self.lang, use_angle_cls=True, show_log=False) except ImportError: raise ImportError("paddleocr not installed. Run: pip install paddleocr") else: raise ValueError(f"Unknown OCR engine: {self.engine}") def extract_text(self, image_path: str) -> str: """ Extract text from a single image. Args: image_path: Path to image file Returns: Extracted text """ if self.engine == "tesseract": from PIL import Image image = Image.open(image_path) text = self._ocr_engine.image_to_string(image) elif self.engine == "easyocr": result = self._ocr_engine.readtext(image_path, detail=0) text = "\n".join(result) elif self.engine == "paddleocr": result = self._ocr_engine.ocr(image_path, cls=True) if result and result[0]: text = "\n".join([line[1][0] for line in result[0]]) else: text = "" return self._clean_text(text) def _clean_text(self, text: str) -> str: """Clean up OCR output.""" # Remove excessive whitespace text = re.sub(r'\n\s*\n', '\n', text) text = re.sub(r' +', ' ', text) return text.strip() def process_frames( self, frames_info: List[Tuple[str, float]], deduplicate: bool = True, similarity_threshold: float = 0.85 ) -> List[Dict]: """ Process multiple frames and extract text. Args: frames_info: List of (frame_path, timestamp) tuples deduplicate: Whether to remove similar consecutive texts similarity_threshold: Threshold for considering texts as duplicates (0-1) Returns: List of dicts with 'timestamp', 'text', and 'frame_path' """ results = [] prev_text = "" for frame_path, timestamp in frames_info: logger.debug(f"Processing frame at {timestamp:.2f}s...") text = self.extract_text(frame_path) if not text: continue # Deduplicate similar consecutive frames if deduplicate: similarity = self._text_similarity(prev_text, text) if similarity > similarity_threshold: logger.debug(f"Skipping duplicate frame at {timestamp:.2f}s (similarity: {similarity:.2f})") continue results.append({ 'timestamp': timestamp, 'text': text, 'frame_path': frame_path }) prev_text = text logger.info(f"Extracted text from {len(results)} frames (deduplication: {deduplicate})") return results def _text_similarity(self, text1: str, text2: str) -> float: """ Calculate similarity between two texts. Returns: Similarity score between 0 and 1 """ return SequenceMatcher(None, text1, text2).ratio()