""" Orchestrate the video processing workflow. Coordinates frame extraction, analysis, and transcript merging. """ from pathlib import Path import logging import os import subprocess import shutil from typing import Dict, Any, Optional from .output_manager import OutputManager from .cache_manager import CacheManager from .frame_extractor import FrameExtractor from .ocr_processor import OCRProcessor from .vision_processor import VisionProcessor from .transcript_merger import TranscriptMerger logger = logging.getLogger(__name__) class WorkflowConfig: """Configuration for the processing workflow.""" def __init__(self, **kwargs): """Initialize configuration from keyword arguments.""" # Video and paths self.video_path = Path(kwargs['video']) self.transcript_path = kwargs.get('transcript') self.output_dir = kwargs.get('output_dir', 'output') self.custom_output = kwargs.get('output') # Whisper options self.run_whisper = kwargs.get('run_whisper', False) self.whisper_model = kwargs.get('whisper_model', 'medium') self.diarize = kwargs.get('diarize', False) # Frame extraction self.scene_detection = kwargs.get('scene_detection', False) self.scene_threshold = kwargs.get('scene_threshold', 15.0) self.interval = kwargs.get('interval', 5) # Analysis options self.use_vision = kwargs.get('use_vision', False) self.use_hybrid = kwargs.get('use_hybrid', False) self.hybrid_llm_cleanup = kwargs.get('hybrid_llm_cleanup', False) self.hybrid_llm_model = kwargs.get('hybrid_llm_model', 'llama3.2:3b') self.vision_model = kwargs.get('vision_model', 'llava:13b') self.vision_context = kwargs.get('vision_context', 'meeting') self.ocr_engine = kwargs.get('ocr_engine', 'tesseract') # Validation: can't use both vision and hybrid if self.use_vision and self.use_hybrid: raise ValueError("Cannot use both --use-vision and --use-hybrid. Choose one.") # Validation: LLM cleanup requires hybrid mode if self.hybrid_llm_cleanup and not self.use_hybrid: raise ValueError("--hybrid-llm-cleanup requires --use-hybrid") # Processing options self.no_deduplicate = kwargs.get('no_deduplicate', False) self.no_cache = kwargs.get('no_cache', False) self.skip_cache_frames = kwargs.get('skip_cache_frames', False) self.skip_cache_whisper = kwargs.get('skip_cache_whisper', False) self.skip_cache_analysis = kwargs.get('skip_cache_analysis', False) self.extract_only = kwargs.get('extract_only', False) self.format = kwargs.get('format', 'detailed') self.embed_images = kwargs.get('embed_images', False) self.embed_quality = kwargs.get('embed_quality', 80) def to_dict(self) -> Dict[str, Any]: """Convert config to dictionary for manifest.""" return { "whisper": { "enabled": self.run_whisper, "model": self.whisper_model }, "frame_extraction": { "method": "scene_detection" if self.scene_detection else "interval", "interval_seconds": self.interval if not self.scene_detection else None, "scene_threshold": self.scene_threshold if self.scene_detection else None }, "analysis": { "method": "vision" if self.use_vision else ("hybrid" if self.use_hybrid else "ocr"), "vision_model": self.vision_model if self.use_vision else None, "vision_context": self.vision_context if self.use_vision else None, "ocr_engine": self.ocr_engine if (not self.use_vision) else None, "deduplication": not self.no_deduplicate }, "output_format": self.format } class ProcessingWorkflow: """Orchestrate the complete video processing workflow.""" def __init__(self, config: WorkflowConfig): """ Initialize workflow. Args: config: Workflow configuration """ self.config = config self.output_mgr = OutputManager( config.video_path, config.output_dir, use_cache=not config.no_cache ) self.cache_mgr = CacheManager( self.output_mgr.output_dir, self.output_mgr.frames_dir, config.video_path.stem, use_cache=not config.no_cache, skip_cache_frames=config.skip_cache_frames, skip_cache_whisper=config.skip_cache_whisper, skip_cache_analysis=config.skip_cache_analysis ) def run(self) -> Dict[str, Any]: """ Run the complete processing workflow. Returns: Dictionary with output paths and status """ logger.info("=" * 80) logger.info("MEETING PROCESSOR") logger.info("=" * 80) logger.info(f"Video: {self.config.video_path.name}") # Determine analysis method if self.config.use_vision: analysis_method = f"Vision Model ({self.config.vision_model})" logger.info(f"Analysis: {analysis_method}") logger.info(f"Context: {self.config.vision_context}") elif self.config.use_hybrid: analysis_method = f"Hybrid (OpenCV + {self.config.ocr_engine})" logger.info(f"Analysis: {analysis_method}") else: analysis_method = f"OCR ({self.config.ocr_engine})" logger.info(f"Analysis: {analysis_method}") logger.info(f"Frame extraction: {'Scene detection' if self.config.scene_detection else f'Every {self.config.interval}s'}") logger.info(f"Caching: {'Disabled' if self.config.no_cache else 'Enabled'}") logger.info("=" * 80) # Step 0: Whisper transcription transcript_path = self._run_whisper() # Step 1: Extract frames frames_info = self._extract_frames() if not frames_info: logger.error("No frames extracted") raise RuntimeError("Frame extraction failed") # Step 2: Analyze frames screen_segments = self._analyze_frames(frames_info) if self.config.extract_only: logger.info("Done! (extract-only mode)") return self._build_result(transcript_path, screen_segments) # Step 3: Merge with transcript enhanced_transcript = self._merge_transcripts(transcript_path, screen_segments) # Save manifest self.output_mgr.save_manifest(self.config.to_dict()) # Build final result return self._build_result(transcript_path, screen_segments, enhanced_transcript) def _run_whisper(self) -> Optional[str]: """Run Whisper transcription if requested, or use cached/provided transcript.""" # First, check cache (regardless of run_whisper flag) cached = self.cache_mgr.get_whisper_cache() if cached: return str(cached) # If no cache and not running whisper/diarize, use provided transcript path (if any) if not self.config.run_whisper and not self.config.diarize: return self.config.transcript_path logger.info("=" * 80) logger.info("STEP 0: Running Whisper Transcription") logger.info("=" * 80) # Determine which transcription tool to use use_diarize = getattr(self.config, 'diarize', False) if use_diarize: if not shutil.which("whisperx"): logger.error("WhisperX is not installed. Install it with: pip install whisperx") raise RuntimeError("WhisperX not installed (required for --diarize)") transcribe_cmd = "whisperx" else: if not shutil.which("whisper"): logger.error("Whisper is not installed. Install it with: pip install openai-whisper") raise RuntimeError("Whisper not installed") transcribe_cmd = "whisper" # Unload Ollama model to free GPU memory for Whisper (if using vision) if self.config.use_vision: logger.info("Freeing GPU memory for Whisper...") try: subprocess.run(["ollama", "stop", self.config.vision_model], capture_output=True, check=False) logger.info("✓ Ollama model unloaded") except Exception as e: logger.warning(f"Could not unload Ollama model: {e}") if use_diarize: logger.info(f"Running WhisperX transcription with diarization (model: {self.config.whisper_model})...") else: logger.info(f"Running Whisper transcription (model: {self.config.whisper_model})...") logger.info("This may take a few minutes depending on video length...") # Build command cmd = [ transcribe_cmd, str(self.config.video_path), "--model", self.config.whisper_model, "--output_format", "json", "--output_dir", str(self.output_mgr.output_dir), ] if use_diarize: cmd.append("--diarize") try: # Set up environment with cuDNN library path for whisperx env = os.environ.copy() if use_diarize: import site site_packages = site.getsitepackages()[0] cudnn_path = Path(site_packages) / "nvidia" / "cudnn" / "lib" if cudnn_path.exists(): env["LD_LIBRARY_PATH"] = str(cudnn_path) + ":" + env.get("LD_LIBRARY_PATH", "") subprocess.run(cmd, check=True, capture_output=True, text=True, env=env) transcript_path = self.output_mgr.get_path(f"{self.config.video_path.stem}.json") if transcript_path.exists(): logger.info(f"✓ Whisper transcription completed: {transcript_path.name}") # Debug: Show transcript preview try: import json with open(transcript_path, 'r', encoding='utf-8') as f: whisper_data = json.load(f) if 'segments' in whisper_data: logger.debug(f"Whisper produced {len(whisper_data['segments'])} segments") if whisper_data['segments']: logger.debug(f"First segment: {whisper_data['segments'][0]}") logger.debug(f"Last segment: {whisper_data['segments'][-1]}") if 'text' in whisper_data: text_preview = whisper_data['text'][:200] + "..." if len(whisper_data.get('text', '')) > 200 else whisper_data.get('text', '') logger.debug(f"Transcript preview: {text_preview}") except Exception as e: logger.debug(f"Could not parse whisper output for debug: {e}") logger.info("") return str(transcript_path) else: logger.error("Whisper completed but transcript file not found") raise RuntimeError("Whisper output missing") except subprocess.CalledProcessError as e: logger.error(f"Whisper failed: {e.stderr}") raise def _extract_frames(self): """Extract frames from video.""" logger.info("Step 1: Extracting frames from video...") # Check cache cached_frames = self.cache_mgr.get_frames_cache() if cached_frames: return cached_frames # Clean up old frames if regenerating if self.config.skip_cache_frames and self.output_mgr.frames_dir.exists(): old_frames = list(self.output_mgr.frames_dir.glob("*.jpg")) if old_frames: logger.info(f"Cleaning up {len(old_frames)} old frames...") for old_frame in old_frames: old_frame.unlink() logger.info("✓ Cleanup complete") # Extract frames (use embed quality so saved files match embedded images) if self.config.scene_detection: logger.info(f"Extracting frames with scene detection (threshold={self.config.scene_threshold})...") else: logger.info(f"Extracting frames every {self.config.interval}s...") extractor = FrameExtractor( str(self.config.video_path), str(self.output_mgr.frames_dir), quality=self.config.embed_quality ) if self.config.scene_detection: frames_info = extractor.extract_scene_changes(threshold=self.config.scene_threshold) else: frames_info = extractor.extract_by_interval(self.config.interval) logger.info(f"✓ Extracted {len(frames_info)} frames") return frames_info def _analyze_frames(self, frames_info): """Analyze frames with vision, hybrid, or OCR.""" # Skip analysis if just embedding images if self.config.embed_images: logger.info("Step 2: Skipping analysis (images will be embedded)") # Create minimal segments with just frame paths and timestamps screen_segments = [ { 'timestamp': timestamp, 'text': '', # No text extraction needed 'frame_path': frame_path } for frame_path, timestamp in frames_info ] logger.info(f"✓ Prepared {len(screen_segments)} frames for embedding") return screen_segments # Determine analysis type if self.config.use_vision: analysis_type = 'vision' elif self.config.use_hybrid: analysis_type = 'hybrid' else: analysis_type = 'ocr' # Check cache cached_analysis = self.cache_mgr.get_analysis_cache(analysis_type) if cached_analysis: return cached_analysis if self.config.use_vision: return self._run_vision_analysis(frames_info) elif self.config.use_hybrid: return self._run_hybrid_analysis(frames_info) else: return self._run_ocr_analysis(frames_info) def _run_vision_analysis(self, frames_info): """Run vision analysis on frames.""" logger.info("Step 2: Running vision analysis on extracted frames...") logger.info(f"Loading vision model {self.config.vision_model} to GPU...") # Load audio segments for context if transcript exists audio_segments = [] transcript_path = self.config.transcript_path or self._get_cached_transcript() if transcript_path: transcript_file = Path(transcript_path) if transcript_file.exists(): logger.info("Loading audio transcript for context...") merger = TranscriptMerger() audio_segments = merger.load_whisper_transcript(str(transcript_file)) logger.info(f"✓ Loaded {len(audio_segments)} audio segments for context") try: vision = VisionProcessor(model=self.config.vision_model) screen_segments = vision.process_frames( frames_info, context=self.config.vision_context, deduplicate=not self.config.no_deduplicate, audio_segments=audio_segments ) logger.info(f"✓ Analyzed {len(screen_segments)} frames with vision model") # Debug: Show sample analysis results if screen_segments: logger.debug(f"First analysis result: timestamp={screen_segments[0].get('timestamp')}, text_length={len(screen_segments[0].get('text', ''))}") logger.debug(f"First analysis text preview: {screen_segments[0].get('text', '')[:200]}...") if len(screen_segments) > 1: logger.debug(f"Last analysis result: timestamp={screen_segments[-1].get('timestamp')}, text_length={len(screen_segments[-1].get('text', ''))}") # Cache results self.cache_mgr.save_analysis('vision', screen_segments) return screen_segments except ImportError as e: logger.error(f"{e}") raise def _get_cached_transcript(self) -> Optional[str]: """Get cached Whisper transcript if available.""" cached = self.cache_mgr.get_whisper_cache() return str(cached) if cached else None def _run_hybrid_analysis(self, frames_info): """Run hybrid analysis on frames (OpenCV + OCR).""" if self.config.hybrid_llm_cleanup: logger.info("Step 2: Running hybrid analysis (OpenCV + OCR + LLM cleanup)...") else: logger.info("Step 2: Running hybrid analysis (OpenCV text detection + OCR)...") try: from .hybrid_processor import HybridProcessor hybrid = HybridProcessor( ocr_engine=self.config.ocr_engine, use_llm_cleanup=self.config.hybrid_llm_cleanup, llm_model=self.config.hybrid_llm_model ) screen_segments = hybrid.process_frames( frames_info, deduplicate=not self.config.no_deduplicate ) logger.info(f"✓ Processed {len(screen_segments)} frames with hybrid analysis") # Debug: Show sample hybrid results if screen_segments: logger.debug(f"First hybrid result: timestamp={screen_segments[0].get('timestamp')}, text_length={len(screen_segments[0].get('text', ''))}") logger.debug(f"First hybrid text preview: {screen_segments[0].get('text', '')[:200]}...") if len(screen_segments) > 1: logger.debug(f"Last hybrid result: timestamp={screen_segments[-1].get('timestamp')}, text_length={len(screen_segments[-1].get('text', ''))}") # Cache results self.cache_mgr.save_analysis('hybrid', screen_segments) return screen_segments except ImportError as e: logger.error(f"{e}") raise def _run_ocr_analysis(self, frames_info): """Run OCR analysis on frames.""" logger.info("Step 2: Running OCR on extracted frames...") try: ocr = OCRProcessor(engine=self.config.ocr_engine) screen_segments = ocr.process_frames( frames_info, deduplicate=not self.config.no_deduplicate ) logger.info(f"✓ Processed {len(screen_segments)} frames with OCR") # Debug: Show sample OCR results if screen_segments: logger.debug(f"First OCR result: timestamp={screen_segments[0].get('timestamp')}, text_length={len(screen_segments[0].get('text', ''))}") logger.debug(f"First OCR text preview: {screen_segments[0].get('text', '')[:200]}...") if len(screen_segments) > 1: logger.debug(f"Last OCR result: timestamp={screen_segments[-1].get('timestamp')}, text_length={len(screen_segments[-1].get('text', ''))}") # Cache results self.cache_mgr.save_analysis('ocr', screen_segments) return screen_segments except ImportError as e: logger.error(f"{e}") logger.error(f"To install {self.config.ocr_engine}:") logger.error(f" pip install {self.config.ocr_engine}") raise def _merge_transcripts(self, transcript_path, screen_segments): """Merge audio and screen transcripts.""" merger = TranscriptMerger( embed_images=self.config.embed_images, embed_quality=self.config.embed_quality ) # Load audio transcript if available audio_segments = [] if transcript_path: logger.info("Step 3: Merging with Whisper transcript...") transcript_file = Path(transcript_path) if not transcript_file.exists(): logger.warning(f"Transcript not found: {transcript_path}") logger.info("Proceeding with screen content only...") else: # Group audio into 30-second intervals for cleaner reference timestamps audio_segments = merger.load_whisper_transcript(str(transcript_file), group_interval=30) logger.info(f"✓ Loaded {len(audio_segments)} audio segments") else: logger.info("No transcript provided, using screen content only...") # Merge and format merged = merger.merge_transcripts(audio_segments, screen_segments) formatted = merger.format_for_claude(merged, format_style=self.config.format) # Save output if self.config.custom_output: output_path = self.config.custom_output else: output_path = self.output_mgr.get_path(f"{self.config.video_path.stem}_enhanced.txt") merger.save_transcript(formatted, str(output_path)) logger.info("=" * 80) logger.info("✓ PROCESSING COMPLETE!") logger.info("=" * 80) logger.info(f"Output directory: {self.output_mgr.output_dir}") logger.info(f"Enhanced transcript: {Path(output_path).name}") logger.info("") return output_path def _build_result(self, transcript_path=None, screen_segments=None, enhanced_transcript=None): """Build result dictionary.""" # Determine analysis filename if self.config.use_vision: analysis_type = 'vision' elif self.config.use_hybrid: analysis_type = 'hybrid' else: analysis_type = 'ocr' return { "output_dir": str(self.output_mgr.output_dir), "transcript": transcript_path, "analysis": f"{self.config.video_path.stem}_{analysis_type}.json", "frames_count": len(screen_segments) if screen_segments else 0, "enhanced_transcript": enhanced_transcript, "manifest": str(self.output_mgr.get_path("manifest.json")) }