356 lines
12 KiB
Python
356 lines
12 KiB
Python
"""
|
|
Hybrid frame analysis: OpenCV text detection + OCR for accurate extraction.
|
|
Better than pure vision models which tend to hallucinate text content.
|
|
"""
|
|
from typing import List, Tuple, Dict, Optional
|
|
from pathlib import Path
|
|
import logging
|
|
import cv2
|
|
import numpy as np
|
|
from difflib import SequenceMatcher
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HybridProcessor:
|
|
"""Combine OpenCV text detection with OCR for accurate text extraction."""
|
|
|
|
def __init__(self, ocr_engine: str = "tesseract", min_confidence: float = 0.5,
|
|
use_llm_cleanup: bool = False, llm_model: Optional[str] = None):
|
|
"""
|
|
Initialize hybrid processor.
|
|
|
|
Args:
|
|
ocr_engine: OCR engine to use ('tesseract', 'easyocr', 'paddleocr')
|
|
min_confidence: Minimum confidence for text detection (0-1)
|
|
use_llm_cleanup: Use LLM to clean up OCR output and preserve formatting
|
|
llm_model: Ollama model for cleanup (default: llama3.2:3b for speed)
|
|
"""
|
|
from .ocr_processor import OCRProcessor
|
|
|
|
self.ocr = OCRProcessor(engine=ocr_engine)
|
|
self.min_confidence = min_confidence
|
|
self.use_llm_cleanup = use_llm_cleanup
|
|
self.llm_model = llm_model or "llama3.2:3b"
|
|
self._llm_client = None
|
|
|
|
if use_llm_cleanup:
|
|
self._init_llm()
|
|
|
|
def _init_llm(self):
|
|
"""Initialize Ollama client for LLM cleanup."""
|
|
try:
|
|
import ollama
|
|
self._llm_client = ollama
|
|
logger.info(f"LLM cleanup enabled using {self.llm_model}")
|
|
except ImportError:
|
|
logger.warning("ollama package not installed. LLM cleanup disabled.")
|
|
self.use_llm_cleanup = False
|
|
|
|
def _cleanup_with_llm(self, raw_text: str) -> str:
|
|
"""
|
|
Use LLM to clean up OCR output and preserve code formatting.
|
|
|
|
Args:
|
|
raw_text: Raw OCR output
|
|
|
|
Returns:
|
|
Cleaned up text with proper formatting
|
|
"""
|
|
if not self.use_llm_cleanup or not self._llm_client:
|
|
return raw_text
|
|
|
|
prompt = """You are cleaning up OCR output from a code editor screenshot.
|
|
|
|
Your task:
|
|
1. Fix any obvious OCR errors (l→1, O→0, etc.)
|
|
2. Preserve or restore code indentation and structure
|
|
3. Keep the exact text content - don't add explanations or comments
|
|
4. If it's code, maintain proper spacing and formatting
|
|
5. Return ONLY the cleaned text, nothing else
|
|
|
|
OCR Text:
|
|
"""
|
|
|
|
try:
|
|
response = self._llm_client.generate(
|
|
model=self.llm_model,
|
|
prompt=prompt + raw_text,
|
|
options={"temperature": 0.1} # Low temperature for accuracy
|
|
)
|
|
cleaned = response['response'].strip()
|
|
logger.debug(f"LLM cleanup: {len(raw_text)} → {len(cleaned)} chars")
|
|
return cleaned
|
|
except Exception as e:
|
|
logger.warning(f"LLM cleanup failed: {e}, using raw OCR output")
|
|
return raw_text
|
|
|
|
def detect_text_regions(self, image_path: str, min_area: int = 100) -> List[Tuple[int, int, int, int]]:
|
|
"""
|
|
Detect text regions in image using OpenCV.
|
|
|
|
Args:
|
|
image_path: Path to image file
|
|
min_area: Minimum area for text region (pixels)
|
|
|
|
Returns:
|
|
List of bounding boxes (x, y, w, h)
|
|
"""
|
|
# Read image
|
|
img = cv2.imread(image_path)
|
|
if img is None:
|
|
logger.warning(f"Could not read image: {image_path}")
|
|
return []
|
|
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Method 1: Morphological operations to find text regions
|
|
# Works well for solid text blocks
|
|
regions = self._detect_by_morphology(gray, min_area)
|
|
|
|
if not regions:
|
|
logger.debug(f"No text regions detected in {Path(image_path).name}")
|
|
|
|
return regions
|
|
|
|
def _detect_by_morphology(self, gray: np.ndarray, min_area: int) -> List[Tuple[int, int, int, int]]:
|
|
"""
|
|
Detect text regions using morphological operations.
|
|
Fast and works well for solid text blocks (code editors, terminals).
|
|
|
|
Args:
|
|
gray: Grayscale image
|
|
min_area: Minimum area for region
|
|
|
|
Returns:
|
|
List of bounding boxes (x, y, w, h)
|
|
"""
|
|
# Apply adaptive threshold to handle varying lighting
|
|
binary = cv2.adaptiveThreshold(
|
|
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY_INV, 11, 2
|
|
)
|
|
|
|
# Morphological operations to connect text regions
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 3)) # Horizontal kernel for text lines
|
|
dilated = cv2.dilate(binary, kernel, iterations=2)
|
|
|
|
# Find contours
|
|
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
# Filter and extract bounding boxes
|
|
regions = []
|
|
for contour in contours:
|
|
x, y, w, h = cv2.boundingRect(contour)
|
|
area = w * h
|
|
|
|
# Filter by area and aspect ratio
|
|
if area > min_area and w > 20 and h > 10: # Reasonable text dimensions
|
|
regions.append((x, y, w, h))
|
|
|
|
# Merge overlapping regions
|
|
regions = self._merge_overlapping_regions(regions)
|
|
|
|
logger.debug(f"Detected {len(regions)} text regions using morphology")
|
|
return regions
|
|
|
|
def _merge_overlapping_regions(
|
|
self, regions: List[Tuple[int, int, int, int]],
|
|
overlap_threshold: float = 0.3
|
|
) -> List[Tuple[int, int, int, int]]:
|
|
"""
|
|
Merge overlapping bounding boxes.
|
|
|
|
Args:
|
|
regions: List of (x, y, w, h) tuples
|
|
overlap_threshold: Minimum overlap ratio to merge
|
|
|
|
Returns:
|
|
Merged regions
|
|
"""
|
|
if not regions:
|
|
return []
|
|
|
|
# Sort by y-coordinate (top to bottom)
|
|
regions = sorted(regions, key=lambda r: r[1])
|
|
|
|
merged = []
|
|
current = list(regions[0])
|
|
|
|
for region in regions[1:]:
|
|
x, y, w, h = region
|
|
cx, cy, cw, ch = current
|
|
|
|
# Check for overlap
|
|
x_overlap = max(0, min(cx + cw, x + w) - max(cx, x))
|
|
y_overlap = max(0, min(cy + ch, y + h) - max(cy, y))
|
|
overlap_area = x_overlap * y_overlap
|
|
|
|
current_area = cw * ch
|
|
region_area = w * h
|
|
min_area = min(current_area, region_area)
|
|
|
|
if overlap_area / min_area > overlap_threshold:
|
|
# Merge regions
|
|
new_x = min(cx, x)
|
|
new_y = min(cy, y)
|
|
new_x2 = max(cx + cw, x + w)
|
|
new_y2 = max(cy + ch, y + h)
|
|
current = [new_x, new_y, new_x2 - new_x, new_y2 - new_y]
|
|
else:
|
|
merged.append(tuple(current))
|
|
current = list(region)
|
|
|
|
merged.append(tuple(current))
|
|
return merged
|
|
|
|
def extract_text_from_region(self, image_path: str, region: Tuple[int, int, int, int]) -> str:
|
|
"""
|
|
Extract text from a specific region using OCR.
|
|
|
|
Args:
|
|
image_path: Path to image file
|
|
region: Bounding box (x, y, w, h)
|
|
|
|
Returns:
|
|
Extracted text
|
|
"""
|
|
from PIL import Image
|
|
|
|
# Load image and crop region
|
|
img = Image.open(image_path)
|
|
x, y, w, h = region
|
|
cropped = img.crop((x, y, x + w, y + h))
|
|
|
|
# Save to temp file for OCR (or use in-memory)
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
|
|
cropped.save(tmp.name)
|
|
text = self.ocr.extract_text(tmp.name)
|
|
|
|
# Clean up temp file
|
|
Path(tmp.name).unlink()
|
|
|
|
return text
|
|
|
|
def analyze_frame(self, image_path: str) -> str:
|
|
"""
|
|
Analyze a frame: detect text regions and OCR them.
|
|
|
|
Args:
|
|
image_path: Path to image file
|
|
|
|
Returns:
|
|
Combined text from all detected regions
|
|
"""
|
|
# Detect text regions
|
|
regions = self.detect_text_regions(image_path)
|
|
|
|
if not regions:
|
|
# Fallback to full-frame OCR if no regions detected
|
|
logger.debug(f"No regions detected, using full-frame OCR for {Path(image_path).name}")
|
|
raw_text = self.ocr.extract_text(image_path)
|
|
return self._cleanup_with_llm(raw_text) if self.use_llm_cleanup else raw_text
|
|
|
|
# Sort regions by reading order (top-to-bottom, left-to-right)
|
|
regions = self._sort_regions_by_reading_order(regions)
|
|
|
|
# Extract text from each region
|
|
texts = []
|
|
for idx, region in enumerate(regions):
|
|
x, y, w, h = region
|
|
text = self.extract_text_from_region(image_path, region)
|
|
if text.strip():
|
|
# Add visual separator with region info
|
|
section_header = f"[Region {idx+1} at y={y}]"
|
|
texts.append(f"{section_header}\n{text.strip()}")
|
|
logger.debug(f"Region {idx+1}/{len(regions)} (y={y}): Extracted {len(text)} chars")
|
|
|
|
combined = ("\n\n" + "="*60 + "\n\n").join(texts)
|
|
logger.debug(f"Total extracted from {len(regions)} regions: {len(combined)} chars")
|
|
|
|
# Apply LLM cleanup if enabled
|
|
if self.use_llm_cleanup:
|
|
combined = self._cleanup_with_llm(combined)
|
|
|
|
return combined
|
|
|
|
def _sort_regions_by_reading_order(self, regions: List[Tuple[int, int, int, int]]) -> List[Tuple[int, int, int, int]]:
|
|
"""
|
|
Sort regions in reading order (top-to-bottom, left-to-right).
|
|
|
|
Args:
|
|
regions: List of (x, y, w, h) tuples
|
|
|
|
Returns:
|
|
Sorted regions
|
|
"""
|
|
# Sort primarily by y (top to bottom), secondarily by x (left to right)
|
|
# Group regions that are on roughly the same line (within 20px)
|
|
sorted_regions = sorted(regions, key=lambda r: (r[1] // 20, r[0]))
|
|
return sorted_regions
|
|
|
|
def process_frames(
|
|
self,
|
|
frames_info: List[Tuple[str, float]],
|
|
deduplicate: bool = True,
|
|
similarity_threshold: float = 0.85
|
|
) -> List[Dict]:
|
|
"""
|
|
Process multiple frames with hybrid analysis.
|
|
|
|
Args:
|
|
frames_info: List of (frame_path, timestamp) tuples
|
|
deduplicate: Whether to remove similar consecutive analyses
|
|
similarity_threshold: Threshold for considering analyses as duplicates (0-1)
|
|
|
|
Returns:
|
|
List of dicts with 'timestamp', 'text', and 'frame_path'
|
|
"""
|
|
results = []
|
|
prev_text = ""
|
|
|
|
total = len(frames_info)
|
|
logger.info(f"Starting hybrid analysis of {total} frames...")
|
|
|
|
for idx, (frame_path, timestamp) in enumerate(frames_info, 1):
|
|
logger.info(f"Analyzing frame {idx}/{total} at {timestamp:.2f}s...")
|
|
|
|
text = self.analyze_frame(frame_path)
|
|
|
|
if not text:
|
|
logger.warning(f"No content extracted from frame at {timestamp:.2f}s")
|
|
continue
|
|
|
|
# Debug: Show what was extracted
|
|
logger.debug(f"Frame {idx} ({timestamp:.2f}s): Extracted {len(text)} chars")
|
|
logger.debug(f"Content preview: {text[:150]}{'...' if len(text) > 150 else ''}")
|
|
|
|
# Deduplicate similar consecutive frames
|
|
if deduplicate and prev_text:
|
|
similarity = self._text_similarity(prev_text, text)
|
|
logger.debug(f"Similarity to previous frame: {similarity:.2f} (threshold: {similarity_threshold})")
|
|
if similarity > similarity_threshold:
|
|
logger.debug(f"⊘ Skipping duplicate frame at {timestamp:.2f}s (similarity: {similarity:.2f})")
|
|
continue
|
|
|
|
results.append({
|
|
'timestamp': timestamp,
|
|
'text': text,
|
|
'frame_path': frame_path
|
|
})
|
|
|
|
prev_text = text
|
|
|
|
logger.info(f"Extracted content from {len(results)} frames (deduplication: {deduplicate})")
|
|
return results
|
|
|
|
def _text_similarity(self, text1: str, text2: str) -> float:
|
|
"""
|
|
Calculate similarity between two texts.
|
|
|
|
Returns:
|
|
Similarity score between 0 and 1
|
|
"""
|
|
return SequenceMatcher(None, text1, text2).ratio()
|