"""Main application window — wires Timeline to all components.""" import logging from pathlib import Path import gi gi.require_version("Gtk", "4.0") gi.require_version("Adw", "1") gi.require_version("GdkPixbuf", "2.0") from gi.repository import Gtk, Gdk, Adw, GLib, GdkPixbuf from threading import Thread from cht.config import APP_NAME from cht.ui.timeline import Timeline, TimelineControls from cht.ui.monitor import MonitorWidget from cht.ui.waveform import WaveformWidget from cht.ui.frames_panel import FramesPanel from cht.ui.transcript_panel import TranscriptPanel from cht.ui.keyboard import KeyboardManager, KEY_LEFT, KEY_RIGHT, KEY_UP, KEY_DOWN, KEY_RETURN, KEY_KP_ENTER, KEY_ESCAPE, KEY_DELETE from cht.ui.agent_output import AgentOutputPanel from cht.ui.agent_input import AgentInputPanel from cht.audio.waveform import WaveformEngine from cht.transcriber.engine import TranscriberEngine from cht.stream.manager import StreamManager, list_sessions from cht.stream.lifecycle import StreamLifecycle from cht.ui.session_dialog import SessionDialog from cht.session import load_frame_index, load_segment_manifest, rebuild_manifest, global_time_to_segment from cht.scrub.manager import ProxyManager from cht.agent.runner import AgentRunner, check_claude_cli from cht.telemetry import Telemetry log = logging.getLogger(__name__) class ChtWindow(Adw.ApplicationWindow): def __init__(self, **kwargs): super().__init__(**kwargs) self.set_title(APP_NAME) self.set_default_size(1400, 900) self._known_frames = set() self._proxy_mgr = None self._manifest = [] self._pending_scrub_global = 0.0 self._scrub_pending = False # throttle flag for scrub updates self._telemetry = None # Core components self._timeline = Timeline() self._agent = AgentRunner() self._waveform_engine = WaveformEngine() self._transcriber = TranscriberEngine() # Stream lifecycle (owns streaming state, recorder, tracker, audio buffering) # Lambdas used because panels/widgets aren't created yet at this point. self._lifecycle = StreamLifecycle( timeline=self._timeline, waveform_engine=self._waveform_engine, transcriber=self._transcriber, on_new_frames=lambda frames: None, # frame polling handles new frames on_waveform_update=lambda peaks, bd: self._waveform_widget.set_peaks(peaks, bd), on_transcript_ready=lambda segs: self._transcript_panel.add_items(segs), on_scene_marker=lambda ts: self._timeline.add_scene_marker(ts), on_recorder_restarted=lambda path: self._monitor.set_recording(path), on_manifest_updated=lambda: self._update_scrub_bar_manifest(), ) # Panels (own their selection state) self._frames_panel = FramesPanel() self._transcript_panel = TranscriptPanel() # Main layout self._main_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) self._main_paned.set_shrink_start_child(False) self._main_paned.set_shrink_end_child(False) self._main_paned.set_position(450) self._agent_output = AgentOutputPanel() self._main_paned.set_start_child(self._agent_output) right_box = self._build_right_panels() self._main_paned.set_end_child(right_box) # Header toolbar = Adw.ToolbarView() header = Adw.HeaderBar() header.set_title_widget(Gtk.Label(label=APP_NAME)) self._connect_btn = Gtk.Button(label="Connect") self._connect_btn.add_css_class("suggested-action") self._connect_btn.connect("clicked", self._on_connect_clicked) header.pack_start(self._connect_btn) self._load_btn = Gtk.Button(label="Load Session") self._load_btn.connect("clicked", self._on_load_session_clicked) header.pack_start(self._load_btn) toolbar.add_top_bar(header) toolbar.set_content(self._main_paned) self.set_content(toolbar) self.connect("close-request", self._on_close) # Keyboard shortcuts self._setup_keyboard() # Wire panel signals self._frames_panel.connect("capture-requested", lambda p: self._on_capture_clicked()) self._frames_panel.connect("threshold-changed", lambda p, v: self._on_scene_threshold(v)) # Cross-panel exclusion: selecting frame clears transcript and vice versa self._frames_panel.connect("selection-changed", self._on_frame_selection_changed) self._transcript_panel.connect("selection-changed", self._on_transcript_selection_changed) self._transcript_panel.connect("min-chunk-changed", self._on_min_chunk_changed) self._transcript_panel.connect("lines-per-group-changed", self._on_lines_per_group_changed) # Seek-to-timestamp from panels (double-click) self._frames_panel.connect("seek-requested", self._on_panel_seek) self._transcript_panel.connect("seek-requested", self._on_panel_seek) log.info("Window initialized") GLib.idle_add(self._check_agent_auth) # -- Cross-panel selection exclusion -- def _on_frame_selection_changed(self, panel): if panel.selected is not None: self._transcript_panel.clear_selection() ts = panel._timestamps.get(panel.selected) if ts is not None: self._timeline_controls.scrub_bar.set_cursor(ts) def _on_transcript_selection_changed(self, panel): if panel.has_selection: self._frames_panel.clear_selection() last = panel.selected[-1] if panel.selected else None if last: ts = panel._timestamps.get(last) if ts is not None: self._timeline_controls.scrub_bar.set_cursor(ts) # -- Connect / Disconnect -- def _on_connect_clicked(self, button): if self._lifecycle.is_streaming: self._stop_stream(reload_session=True) else: session_id = self._lifecycle.stream_mgr.session_id if self._lifecycle.stream_mgr else None if self._lifecycle.stream_mgr: self._stop_stream() self._start_stream(session_id=session_id) def _on_capture_clicked(self): if not self._timeline.state.live and self._manifest: # Scrub mode: capture full-res from current scrub position self._capture_at_scrub_position() elif self._lifecycle.stream_mgr: # Live mode: capture from current recording position self._lifecycle.stream_mgr.capture_now( on_new_frames=self._lifecycle._handle_new_scene_frames ) def _on_scene_threshold(self, val): if self._lifecycle.stream_mgr and not self._lifecycle.stream_mgr.readonly: old = self._lifecycle.stream_mgr.scene_threshold self._lifecycle.stream_mgr.update_scene_threshold(val) if self._telemetry: self._telemetry.event("scene_threshold_changed", {"from": old, "to": val}) def _on_min_chunk_changed(self, panel, val): import cht.config old = cht.config.TRANSCRIBE_MIN_CHUNK_S cht.config.TRANSCRIBE_MIN_CHUNK_S = val if self._telemetry: self._telemetry.event("min_chunk_changed", {"from": old, "to": val}) def _on_lines_per_group_changed(self, panel, val): import cht.config cht.config.TRANSCRIBE_LINES_PER_GROUP = val # -- Session loading -- def _on_load_session_clicked(self, button): sessions = list_sessions() if not sessions: self._agent_output.append("No previous sessions found.\n") return dialog = SessionDialog(self) dialog.set_current_session( self._lifecycle.stream_mgr.session_id if self._lifecycle.stream_mgr else None ) dialog.connect("session-selected", lambda d, sid: self._load_session(sid)) dialog.present() def _load_session(self, session_id): """Load an existing session for review (no streaming).""" if self._lifecycle.is_streaming or self._lifecycle.stream_mgr: self._stop_stream() try: mgr = StreamManager.from_existing(session_id) except FileNotFoundError as e: self._agent_output.append(f"Error: {e}\n") return self._lifecycle.set_manager_readonly(mgr) self.set_title(f"{APP_NAME} — {session_id}") self._agent_output.append(f"Loaded session: {session_id}\n") segments = mgr.recording_segments if segments: self._monitor.set_recording(segments[0]) duration = mgr.total_duration() if duration > 0: self._timeline.set_duration(duration) self._timeline.seek(0) self._agent_output.append( f" Recording: {len(segments)} segment(s), " f"{int(duration)}s duration\n" ) else: self._agent_output.append(" No recordings found (frames only).\n") self._load_existing_frames() self._load_existing_transcript() # Waveform from recording (background) if segments: from cht.stream import ffmpeg as ff def _compute_waveform(): audio_dir = mgr.audio_dir audio_dir.mkdir(parents=True, exist_ok=True) full_wav = audio_dir / "full.wav" try: ff.extract_audio_chunk(segments[0], full_wav) self._waveform_engine.compute_full(full_wav) peaks = self._waveform_engine.peaks bucket_dur = self._waveform_engine.bucket_duration GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur) except Exception as e: log.error("Waveform computation failed: %s", e) Thread(target=_compute_waveform, daemon=True, name="waveform_load").start() self._update_scrub_bar_manifest() self._populate_model_dropdown() def _reload_waveform(self, mgr): """Recompute waveform from existing segments in background.""" segments = mgr.recording_segments if not segments: return from cht.stream import ffmpeg as ff def _compute(): audio_dir = mgr.audio_dir audio_dir.mkdir(parents=True, exist_ok=True) full_wav = audio_dir / "full.wav" try: ff.extract_audio_chunk(segments[0], full_wav) self._waveform_engine.compute_full(full_wav) peaks = self._waveform_engine.peaks bucket_dur = self._waveform_engine.bucket_duration GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur) except Exception as e: log.error("Waveform reload failed: %s", e) Thread(target=_compute, daemon=True, name="waveform_reload").start() # -- Streaming -- def _start_stream(self, session_id=None): log.info("Starting stream...") self._connect_btn.set_label("Disconnect") self._connect_btn.remove_css_class("suggested-action") self._connect_btn.add_css_class("destructive-action") mgr = self._lifecycle.start(session_id=session_id) self._telemetry = Telemetry(mgr.session_dir) mgr.telemetry = self._telemetry self._monitor.set_recording(mgr.recording_path) self._monitor.set_live_source(mgr.relay_url) GLib.timeout_add(1000, self._poll_frames) # Reload existing data if resuming if session_id: self._load_existing_frames() self._load_existing_transcript() self._reload_waveform(mgr) self.set_title(f"{APP_NAME} — {mgr.session_id}") log.info("Waiting for sender...") def _on_live_toggle(self): if self._timeline.state.live: if self._telemetry: self._telemetry.event("mode_switch", {"from": "live", "to": "scrub"}) self._timeline.toggle_live(live_player_pos=self._monitor.get_live_position()) self._update_scrub_bar_manifest() else: if self._telemetry: self._telemetry.event("mode_switch", {"from": "scrub", "to": "live"}) mgr = self._lifecycle.stream_mgr if mgr: self._monitor.set_recording(mgr.recording_path) self._timeline.toggle_live() # Catch up on anything that arrived while scrubbing self._update_scrub_bar_manifest() # Scroll panels to latest items self._frames_panel.scroll_to_end() self._transcript_panel.scroll_to_end() # -- Scrub -- def _update_scrub_bar_manifest(self): """Refresh the scrub bar with the current session's segment manifest and frames.""" mgr = self._lifecycle.stream_mgr if not mgr: return self._manifest = load_segment_manifest(mgr.session_dir) if not self._manifest: self._manifest = rebuild_manifest(mgr.session_dir) scrub_bar = self._timeline_controls.scrub_bar scrub_bar.set_manifest(self._manifest) # Feed frame thumbnails to the scrub bar frames = load_frame_index(mgr.frames_dir) scrub_bar.set_frames([{"timestamp": f["timestamp"], "path": str(f["path"])} for f in frames]) def _on_segment_activated(self, scrub_bar, segment_index): """User clicked/dragged into a segment — request its proxy.""" if not self._manifest or segment_index >= len(self._manifest): return seg = self._manifest[segment_index] seg_path = Path(seg["path"]) if not self._proxy_mgr: mgr = self._lifecycle.stream_mgr sid = mgr.session_id if mgr else "unknown" self._proxy_mgr = ProxyManager(sid) scrub_bar.set_proxy_state(segment_index, "generating") def _on_ready(proxy_path): scrub_bar.set_proxy_state(segment_index, "ready") scrub_bar.set_active_segment(segment_index) self._monitor.set_scrub_source(proxy_path, global_offset=seg["global_offset"]) # Seek to current cursor position (set by scrub-position signal) gt = self._timeline.state.cursor local = gt - seg["global_offset"] self._monitor.scrub_to(max(0.0, local)) self._proxy_mgr.request(seg_path, on_ready=_on_ready) def _on_panel_seek(self, panel, timestamp): """Handle seek request from frames or transcript panel (double-click).""" if not self._manifest: return seg, local_time = global_time_to_segment(self._manifest, timestamp) if not seg: return self._pending_scrub_global = timestamp self._on_segment_activated(self._timeline_controls.scrub_bar, seg["index"]) def _on_scrub_position(self, scrub_bar, global_time): """User is scrubbing — drive monitor directly, throttled.""" global_time = max(0.0, min(global_time, self._timeline.state.duration)) self._timeline.state.cursor = global_time self._timeline.state.live = False self._timeline.state.paused = True # Update scrub bar cursor directly (cheap) scrub_bar.set_cursor(global_time) # Throttle monitor seeks to avoid flooding mpv if not self._scrub_pending: self._scrub_pending = True seg, local_time = global_time_to_segment(self._manifest, global_time) if seg: self._monitor.scrub_to(local_time) GLib.timeout_add(16, self._scrub_tick) # ~60fps cap def _scrub_tick(self): """Release throttle so next scrub motion can update monitor.""" self._scrub_pending = False cursor = self._timeline.state.cursor # Apply latest cursor position to monitor seg, local_time = global_time_to_segment(self._manifest, cursor) if seg: self._monitor.scrub_to(local_time) # Sync waveform, time labels, etc. at throttled rate self._timeline.emit("changed") # Highlight nearest frame/transcript self._frames_panel.highlight_nearest(cursor) self._transcript_panel.highlight_nearest(cursor) return False def _capture_at_scrub_position(self): """Capture a full-res frame at the current scrub position.""" mgr = self._lifecycle.stream_mgr if not mgr or not self._manifest: return seg, local_time = global_time_to_segment( self._manifest, self._timeline.state.cursor ) if not seg: return seg_path = Path(seg["path"]) global_time = self._timeline.state.cursor from cht.stream import ffmpeg as ff import json def _capture(): index_path = mgr.frames_dir / "index.json" index = json.loads(index_path.read_text()) if index_path.exists() else [] frame_num = len(index) + 1 frame_id = f"F{frame_num:04d}" frame_path = mgr.frames_dir / f"{frame_id}.jpg" try: ff.extract_frame_at(seg_path, frame_path, local_time) except Exception as e: log.error("Scrub capture failed: %s", e) return if not frame_path.exists(): return entry = { "id": frame_id, "timestamp": global_time, "path": str(frame_path), "sent_to_agent": False, } index.append(entry) index_path.write_text(json.dumps(index, indent=2)) log.info("Scrub capture: %s at %.1fs (local %.1fs in %s)", frame_id, global_time, local_time, seg_path.name) # Reload frames to show the new capture GLib.idle_add(self._load_existing_frames) Thread(target=_capture, daemon=True, name="scrub_capture").start() def _manual_segment_cut(self): """Ctrl+R: manually cut recording into a new segment.""" if not self._lifecycle.is_streaming: return log.info("Manual segment cut requested") self._lifecycle._rotate_segment() self._agent_output.append("Segment cut.\n") def _stop_stream(self, reload_session=False): log.info("Stopping stream...") mgr = self._lifecycle.stream_mgr last_session_id = mgr.session_id if mgr and not mgr.readonly else None if self._telemetry: self._telemetry.close() self._telemetry = None self._lifecycle.stop() if self._proxy_mgr: self._proxy_mgr.cancel() self._proxy_mgr = None self._manifest = [] self._timeline.reset() self._timeline_controls.scrub_bar.set_manifest([]) self._monitor.reset() self._waveform_engine.reset() self._waveform_widget.set_peaks(None, 0.05) self._transcriber.reset() self._agent.clear_history() self._known_frames = set() self._frames_panel.clear() self._transcript_panel.clear() self._connect_btn.set_label("Connect") self._connect_btn.remove_css_class("destructive-action") self._connect_btn.add_css_class("suggested-action") self.set_title(APP_NAME) if reload_session and last_session_id: GLib.idle_add(self._load_session, last_session_id) def _on_close(self, *args): self.teardown() def teardown(self): """Full cleanup for app exit — safe to call multiple times.""" if self._lifecycle.stream_mgr or self._lifecycle.is_streaming: self._stop_stream() self._monitor.stop() # -- Layout -- def _build_right_panels(self): right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) # Video + waveform top_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) top_paned.set_shrink_start_child(False) top_paned.set_shrink_end_child(False) self._monitor = MonitorWidget(self._timeline) self._monitor.set_hexpand(True) stream_frame = Gtk.Frame() stream_frame.set_child(self._monitor) top_paned.set_start_child(stream_frame) self._waveform_widget = WaveformWidget(self._timeline) waveform_frame = Gtk.Frame() waveform_frame.set_child(self._waveform_widget) top_paned.set_end_child(waveform_frame) top_paned.set_position(650) right_box.append(top_paned) # Timeline controls + scrub bar self._timeline_controls = TimelineControls(self._timeline) self._timeline_controls.set_live_toggle_callback(self._on_live_toggle) self._timeline_controls.scrub_bar.connect("segment-activated", self._on_segment_activated) self._timeline_controls.scrub_bar.connect("scrub-position", self._on_scrub_position) right_box.append(self._timeline_controls) # Frames frames_frame = Gtk.Frame() frames_frame.set_child(self._frames_panel) right_box.append(frames_frame) # Transcript transcript_frame = Gtk.Frame() transcript_frame.set_child(self._transcript_panel) right_box.append(transcript_frame) # Agent input self._agent_input = AgentInputPanel() self._agent_input.connect("send-requested", lambda p, text: self._send_message(text or None)) self._agent_input.connect("action-requested", lambda p, verb: self._send_action(verb)) self._agent_input.connect("model-changed", self._on_model_changed) self._agent_input.connect("lang-changed", self._on_lang_changed) self._agent_input.connect("history-toggled", lambda p, v: setattr(self._agent, "include_history", v)) right_box.append(self._agent_input) return right_box # -- Keyboard -- def _setup_keyboard(self): kb = KeyboardManager() def _entry_focused(): focus = self.get_focus() if focus is None: return False w = focus while w is not None: if w is self._agent_input.entry: return True w = w.get_parent() return False kb.set_passthrough(_entry_focused, except_keys={KEY_ESCAPE}) kb.bind(KEY_LEFT, lambda **_: self._frames_panel.select_adjacent(-1)) kb.bind(KEY_RIGHT, lambda **_: self._frames_panel.select_adjacent(1)) kb.bind(KEY_UP, lambda shift=False, **_: self._transcript_panel.select_adjacent(-1, extend=shift)) kb.bind(KEY_DOWN, lambda shift=False, **_: self._transcript_panel.select_adjacent(1, extend=shift)) kb.bind(KEY_RETURN, lambda **_: self._send_message(self._build_selection_message("answer")) if self._build_selection_message("answer") else None) kb.bind(KEY_KP_ENTER, lambda **_: self._send_message(self._build_selection_message("answer")) if self._build_selection_message("answer") else None) kb.bind(KEY_ESCAPE, lambda **_: (self.set_focus(None), self._frames_panel.clear_selection(), self._transcript_panel.clear_selection())) kb.bind(KEY_DELETE, lambda **_: self._agent_output.clear()) kb.attach(self) # Ctrl+R: manual segment cut ctrl_r = Gtk.ShortcutController() ctrl_r.add_shortcut(Gtk.Shortcut( trigger=Gtk.ShortcutTrigger.parse_string("r"), action=Gtk.CallbackAction.new(lambda *_: self._manual_segment_cut()), )) self.add_controller(ctrl_r) # -- Agent actions -- def _build_selection_message(self, verb: str) -> str | None: parts = [verb] if self._frames_panel.selected: parts.append(f"@{self._frames_panel.selected}") texts = self._transcript_panel.selected_texts if texts: parts.append(" ".join(texts)) return " ".join(parts) if len(parts) > 1 else None def _send_action(self, verb: str): msg = self._build_selection_message(verb) if not msg: self._agent_output.append("Select a frame or transcript first.\n") return self._send_message(msg) def _send_message(self, text: str | None = None): if not text: text = self._build_selection_message("answer") if not text: return if not self._lifecycle.stream_mgr: self._agent_output.append("No active session.\n") return self._agent_output.append(f"\n> {text}\n…\n") self._agent_output.begin_response() self._agent.send( message=text, stream_mgr=self._lifecycle.stream_mgr, tracker=self._lifecycle.tracker, on_chunk=lambda chunk: GLib.idle_add(self._agent_output.replace_thinking, chunk), on_done=lambda err: GLib.idle_add(self._agent_output.finish_response, err), ) # -- Settings callbacks -- def _on_lang_changed(self, _panel, lang_code): self._transcriber.language = lang_code or None log.info("Transcript language: %s", lang_code or "auto") def _on_model_changed(self, _panel, model): self._agent.model = model log.info("Model switched to %s", model) def _populate_model_dropdown(self): self._agent_input.populate_models( self._agent.available_models, self._agent.model ) def _check_agent_auth(self): import os if os.environ.get("GROQ_API_KEY") or os.environ.get("OPENAI_API_KEY"): self._populate_model_dropdown() return err = check_claude_cli() if err: self._agent_output.append(f"⚠ {err}\n") else: self._agent_output.append(f"Agent ready ({self._agent.provider_name})\n") self._populate_model_dropdown() # -- Data loading -- def _load_existing_frames(self): if not self._lifecycle.stream_mgr: return entries = load_frame_index(self._lifecycle.stream_mgr.frames_dir) if not entries: self._agent_output.append(" No frames found.\n") return items = [] for entry in entries: try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(entry["path"]), 256, 144, True) items.append({"id": entry["id"], "pixbuf": pixbuf, "timestamp": entry["timestamp"]}) except Exception as e: log.warning("Thumbnail load failed for %s: %s", entry["id"], e) if items: self._frames_panel.load_items(items) self._known_frames = {item["id"] for item in items} self._agent_output.append(f" Loaded {len(items)} frame thumbnails.\n") # Update scrub bar thumbnails — reuse already-loaded pixbufs self._timeline_controls.scrub_bar.set_frames_from_pixbufs( [{"timestamp": it["timestamp"], "pixbuf": it["pixbuf"]} for it in items] ) def _load_existing_transcript(self): if not self._lifecycle.stream_mgr: return transcript_index = self._lifecycle.stream_mgr.transcript_dir / "index.json" if not transcript_index.exists(): return self._transcriber.load_index(transcript_index) segs = self._transcriber.all_segments() if segs: self._transcript_panel.add_items(segs) self._agent_output.append(f" Loaded {len(segs)} transcript segments.\n") def _poll_frames(self): if not self._lifecycle.stream_mgr: return False entries = load_frame_index(self._lifecycle.stream_mgr.frames_dir) if entries and not self._known_frames: log.info("Poll: found %d frames, known %d", len(entries), len(self._known_frames)) for entry in entries: fid = entry["id"] if fid in self._known_frames: continue self._known_frames.add(fid) try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(entry["path"]), 256, 144, True) auto = self._timeline.state.live and not self._transcript_panel.has_selection self._frames_panel.add_item(fid, pixbuf, entry["timestamp"], auto_select=auto) self._timeline_controls.scrub_bar.add_frame_from_pixbuf(entry["timestamp"], pixbuf) except Exception as e: log.warning("Thumbnail load failed for %s: %s", fid, e) return True