""" OCR processing for extracted video frames. Supports multiple OCR engines and text deduplication. """ from typing import List, Tuple, Dict, Optional from pathlib import Path from difflib import SequenceMatcher import re import logging logger = logging.getLogger(__name__) class OCRProcessor: """Process frames with OCR to extract text.""" def __init__(self, engine: str = "tesseract", lang: str = "eng"): """ Initialize OCR processor. Args: engine: OCR engine to use ('tesseract', 'easyocr', 'paddleocr') lang: Language code for OCR """ self.engine = engine.lower() self.lang = lang self._ocr_engine = None self._init_engine() def _init_engine(self): """Initialize the selected OCR engine.""" if self.engine == "tesseract": try: import pytesseract self._ocr_engine = pytesseract except ImportError: raise ImportError("pytesseract not installed. Run: pip install pytesseract") elif self.engine == "easyocr": try: import easyocr self._ocr_engine = easyocr.Reader([self.lang]) except ImportError: raise ImportError("easyocr not installed. Run: pip install easyocr") elif self.engine == "paddleocr": try: from paddleocr import PaddleOCR self._ocr_engine = PaddleOCR(lang=self.lang, use_angle_cls=True, show_log=False) except ImportError: raise ImportError("paddleocr not installed. Run: pip install paddleocr") else: raise ValueError(f"Unknown OCR engine: {self.engine}") def extract_text(self, image_path: str, preserve_layout: bool = True) -> str: """ Extract text from a single image. Args: image_path: Path to image file preserve_layout: Try to preserve whitespace and layout Returns: Extracted text """ if self.engine == "tesseract": from PIL import Image import pytesseract image = Image.open(image_path) # Use PSM 6 (uniform block of text) to preserve layout better config = '--psm 6' if preserve_layout else '' text = pytesseract.image_to_string(image, config=config) elif self.engine == "easyocr": result = self._ocr_engine.readtext(image_path, detail=0) text = "\n".join(result) elif self.engine == "paddleocr": result = self._ocr_engine.ocr(image_path, cls=True) if result and result[0]: text = "\n".join([line[1][0] for line in result[0]]) else: text = "" return self._clean_text(text) def _clean_text(self, text: str, preserve_indentation: bool = True) -> str: """ Clean up OCR output. Args: text: Raw OCR text preserve_indentation: Keep leading whitespace on lines Returns: Cleaned text """ if preserve_indentation: # Remove excessive blank lines but preserve indentation lines = text.split('\n') cleaned_lines = [] for line in lines: # Keep line if it has content or is single empty line if line.strip() or (cleaned_lines and cleaned_lines[-1].strip()): cleaned_lines.append(line) return '\n'.join(cleaned_lines).strip() else: # Original aggressive cleaning text = re.sub(r'\n\s*\n', '\n', text) text = re.sub(r' +', ' ', text) return text.strip() def process_frames( self, frames_info: List[Tuple[str, float]], deduplicate: bool = True, similarity_threshold: float = 0.85 ) -> List[Dict]: """ Process multiple frames and extract text. Args: frames_info: List of (frame_path, timestamp) tuples deduplicate: Whether to remove similar consecutive texts similarity_threshold: Threshold for considering texts as duplicates (0-1) Returns: List of dicts with 'timestamp', 'text', and 'frame_path' """ results = [] prev_text = "" for idx, (frame_path, timestamp) in enumerate(frames_info, 1): logger.debug(f"Processing frame {idx}/{len(frames_info)} at {timestamp:.2f}s...") text = self.extract_text(frame_path) if not text: logger.debug(f"No text extracted from frame at {timestamp:.2f}s") continue # Debug: Show what was extracted logger.debug(f"Frame {idx} ({timestamp:.2f}s): Extracted {len(text)} chars") logger.debug(f"Content preview: {text[:150]}{'...' if len(text) > 150 else ''}") # Deduplicate similar consecutive frames if deduplicate and prev_text: similarity = self._text_similarity(prev_text, text) logger.debug(f"Similarity to previous frame: {similarity:.2f} (threshold: {similarity_threshold})") if similarity > similarity_threshold: logger.debug(f"⊘ Skipping duplicate frame at {timestamp:.2f}s (similarity: {similarity:.2f})") continue results.append({ 'timestamp': timestamp, 'text': text, 'frame_path': frame_path }) prev_text = text logger.info(f"Extracted text from {len(results)} frames (deduplication: {deduplicate})") return results def _text_similarity(self, text1: str, text2: str) -> float: """ Calculate similarity between two texts. Returns: Similarity score between 0 and 1 """ return SequenceMatcher(None, text1, text2).ratio()