""" Hybrid frame analysis: OpenCV text detection + OCR for accurate extraction. Better than pure vision models which tend to hallucinate text content. """ from typing import List, Tuple, Dict, Optional from pathlib import Path import logging import cv2 import numpy as np from difflib import SequenceMatcher logger = logging.getLogger(__name__) class HybridProcessor: """Combine OpenCV text detection with OCR for accurate text extraction.""" def __init__(self, ocr_engine: str = "tesseract", min_confidence: float = 0.5, use_llm_cleanup: bool = False, llm_model: Optional[str] = None): """ Initialize hybrid processor. Args: ocr_engine: OCR engine to use ('tesseract', 'easyocr', 'paddleocr') min_confidence: Minimum confidence for text detection (0-1) use_llm_cleanup: Use LLM to clean up OCR output and preserve formatting llm_model: Ollama model for cleanup (default: llama3.2:3b for speed) """ from .ocr_processor import OCRProcessor self.ocr = OCRProcessor(engine=ocr_engine) self.min_confidence = min_confidence self.use_llm_cleanup = use_llm_cleanup self.llm_model = llm_model or "llama3.2:3b" self._llm_client = None if use_llm_cleanup: self._init_llm() def _init_llm(self): """Initialize Ollama client for LLM cleanup.""" try: import ollama self._llm_client = ollama logger.info(f"LLM cleanup enabled using {self.llm_model}") except ImportError: logger.warning("ollama package not installed. LLM cleanup disabled.") self.use_llm_cleanup = False def _cleanup_with_llm(self, raw_text: str) -> str: """ Use LLM to clean up OCR output and preserve code formatting. Args: raw_text: Raw OCR output Returns: Cleaned up text with proper formatting """ if not self.use_llm_cleanup or not self._llm_client: return raw_text prompt = """You are cleaning up OCR output from a code editor screenshot. Your task: 1. Fix any obvious OCR errors (l→1, O→0, etc.) 2. Preserve or restore code indentation and structure 3. Keep the exact text content - don't add explanations or comments 4. If it's code, maintain proper spacing and formatting 5. Return ONLY the cleaned text, nothing else OCR Text: """ try: response = self._llm_client.generate( model=self.llm_model, prompt=prompt + raw_text, options={"temperature": 0.1} # Low temperature for accuracy ) cleaned = response['response'].strip() logger.debug(f"LLM cleanup: {len(raw_text)} → {len(cleaned)} chars") return cleaned except Exception as e: logger.warning(f"LLM cleanup failed: {e}, using raw OCR output") return raw_text def detect_text_regions(self, image_path: str, min_area: int = 100) -> List[Tuple[int, int, int, int]]: """ Detect text regions in image using OpenCV. Args: image_path: Path to image file min_area: Minimum area for text region (pixels) Returns: List of bounding boxes (x, y, w, h) """ # Read image img = cv2.imread(image_path) if img is None: logger.warning(f"Could not read image: {image_path}") return [] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Method 1: Morphological operations to find text regions # Works well for solid text blocks regions = self._detect_by_morphology(gray, min_area) if not regions: logger.debug(f"No text regions detected in {Path(image_path).name}") return regions def _detect_by_morphology(self, gray: np.ndarray, min_area: int) -> List[Tuple[int, int, int, int]]: """ Detect text regions using morphological operations. Fast and works well for solid text blocks (code editors, terminals). Args: gray: Grayscale image min_area: Minimum area for region Returns: List of bounding boxes (x, y, w, h) """ # Apply adaptive threshold to handle varying lighting binary = cv2.adaptiveThreshold( gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2 ) # Morphological operations to connect text regions kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 3)) # Horizontal kernel for text lines dilated = cv2.dilate(binary, kernel, iterations=2) # Find contours contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # Filter and extract bounding boxes regions = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) area = w * h # Filter by area and aspect ratio if area > min_area and w > 20 and h > 10: # Reasonable text dimensions regions.append((x, y, w, h)) # Merge overlapping regions regions = self._merge_overlapping_regions(regions) logger.debug(f"Detected {len(regions)} text regions using morphology") return regions def _merge_overlapping_regions( self, regions: List[Tuple[int, int, int, int]], overlap_threshold: float = 0.3 ) -> List[Tuple[int, int, int, int]]: """ Merge overlapping bounding boxes. Args: regions: List of (x, y, w, h) tuples overlap_threshold: Minimum overlap ratio to merge Returns: Merged regions """ if not regions: return [] # Sort by y-coordinate (top to bottom) regions = sorted(regions, key=lambda r: r[1]) merged = [] current = list(regions[0]) for region in regions[1:]: x, y, w, h = region cx, cy, cw, ch = current # Check for overlap x_overlap = max(0, min(cx + cw, x + w) - max(cx, x)) y_overlap = max(0, min(cy + ch, y + h) - max(cy, y)) overlap_area = x_overlap * y_overlap current_area = cw * ch region_area = w * h min_area = min(current_area, region_area) if overlap_area / min_area > overlap_threshold: # Merge regions new_x = min(cx, x) new_y = min(cy, y) new_x2 = max(cx + cw, x + w) new_y2 = max(cy + ch, y + h) current = [new_x, new_y, new_x2 - new_x, new_y2 - new_y] else: merged.append(tuple(current)) current = list(region) merged.append(tuple(current)) return merged def extract_text_from_region(self, image_path: str, region: Tuple[int, int, int, int]) -> str: """ Extract text from a specific region using OCR. Args: image_path: Path to image file region: Bounding box (x, y, w, h) Returns: Extracted text """ from PIL import Image # Load image and crop region img = Image.open(image_path) x, y, w, h = region cropped = img.crop((x, y, x + w, y + h)) # Save to temp file for OCR (or use in-memory) import tempfile with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: cropped.save(tmp.name) text = self.ocr.extract_text(tmp.name) # Clean up temp file Path(tmp.name).unlink() return text def analyze_frame(self, image_path: str) -> str: """ Analyze a frame: detect text regions and OCR them. Args: image_path: Path to image file Returns: Combined text from all detected regions """ # Detect text regions regions = self.detect_text_regions(image_path) if not regions: # Fallback to full-frame OCR if no regions detected logger.debug(f"No regions detected, using full-frame OCR for {Path(image_path).name}") raw_text = self.ocr.extract_text(image_path) return self._cleanup_with_llm(raw_text) if self.use_llm_cleanup else raw_text # Sort regions by reading order (top-to-bottom, left-to-right) regions = self._sort_regions_by_reading_order(regions) # Extract text from each region texts = [] for idx, region in enumerate(regions): x, y, w, h = region text = self.extract_text_from_region(image_path, region) if text.strip(): # Add visual separator with region info section_header = f"[Region {idx+1} at y={y}]" texts.append(f"{section_header}\n{text.strip()}") logger.debug(f"Region {idx+1}/{len(regions)} (y={y}): Extracted {len(text)} chars") combined = ("\n\n" + "="*60 + "\n\n").join(texts) logger.debug(f"Total extracted from {len(regions)} regions: {len(combined)} chars") # Apply LLM cleanup if enabled if self.use_llm_cleanup: combined = self._cleanup_with_llm(combined) return combined def _sort_regions_by_reading_order(self, regions: List[Tuple[int, int, int, int]]) -> List[Tuple[int, int, int, int]]: """ Sort regions in reading order (top-to-bottom, left-to-right). Args: regions: List of (x, y, w, h) tuples Returns: Sorted regions """ # Sort primarily by y (top to bottom), secondarily by x (left to right) # Group regions that are on roughly the same line (within 20px) sorted_regions = sorted(regions, key=lambda r: (r[1] // 20, r[0])) return sorted_regions def process_frames( self, frames_info: List[Tuple[str, float]], deduplicate: bool = True, similarity_threshold: float = 0.85 ) -> List[Dict]: """ Process multiple frames with hybrid analysis. Args: frames_info: List of (frame_path, timestamp) tuples deduplicate: Whether to remove similar consecutive analyses similarity_threshold: Threshold for considering analyses as duplicates (0-1) Returns: List of dicts with 'timestamp', 'text', and 'frame_path' """ results = [] prev_text = "" total = len(frames_info) logger.info(f"Starting hybrid analysis of {total} frames...") for idx, (frame_path, timestamp) in enumerate(frames_info, 1): logger.info(f"Analyzing frame {idx}/{total} at {timestamp:.2f}s...") text = self.analyze_frame(frame_path) if not text: logger.warning(f"No content extracted from frame at {timestamp:.2f}s") continue # Debug: Show what was extracted logger.debug(f"Frame {idx} ({timestamp:.2f}s): Extracted {len(text)} chars") logger.debug(f"Content preview: {text[:150]}{'...' if len(text) > 150 else ''}") # Deduplicate similar consecutive frames if deduplicate and prev_text: similarity = self._text_similarity(prev_text, text) logger.debug(f"Similarity to previous frame: {similarity:.2f} (threshold: {similarity_threshold})") if similarity > similarity_threshold: logger.debug(f"⊘ Skipping duplicate frame at {timestamp:.2f}s (similarity: {similarity:.2f})") continue results.append({ 'timestamp': timestamp, 'text': text, 'frame_path': frame_path }) prev_text = text logger.info(f"Extracted content from {len(results)} frames (deduplication: {deduplicate})") return results def _text_similarity(self, text1: str, text2: str) -> float: """ Calculate similarity between two texts. Returns: Similarity score between 0 and 1 """ return SequenceMatcher(None, text1, text2).ratio()