"""Main application window — wires Timeline to all components.""" import logging import gi gi.require_version("Gtk", "4.0") gi.require_version("Adw", "1") gi.require_version("GdkPixbuf", "2.0") from gi.repository import Gtk, Gdk, Adw, GLib, GdkPixbuf from threading import Thread from cht.config import APP_NAME from cht.ui.timeline import Timeline, TimelineControls from cht.ui.monitor import MonitorWidget from cht.ui.waveform import WaveformWidget from cht.ui.frames_panel import FramesPanel from cht.ui.transcript_panel import TranscriptPanel from cht.ui.keyboard import KeyboardManager, KEY_LEFT, KEY_RIGHT, KEY_UP, KEY_DOWN, KEY_RETURN, KEY_KP_ENTER, KEY_ESCAPE, KEY_DELETE from cht.ui.agent_output import AgentOutputPanel from cht.ui.agent_input import AgentInputPanel from cht.audio.waveform import WaveformEngine from cht.transcriber.engine import TranscriberEngine from cht.stream.manager import StreamManager, list_sessions from cht.stream.lifecycle import StreamLifecycle from cht.ui.session_dialog import SessionDialog from cht.session import load_frame_index from cht.agent.runner import AgentRunner, check_claude_cli log = logging.getLogger(__name__) class ChtWindow(Adw.ApplicationWindow): def __init__(self, **kwargs): super().__init__(**kwargs) self.set_title(APP_NAME) self.set_default_size(1400, 900) self._known_frames = set() # Core components self._timeline = Timeline() self._agent = AgentRunner() self._waveform_engine = WaveformEngine() self._transcriber = TranscriberEngine() # Stream lifecycle (owns streaming state, recorder, tracker, audio buffering) # Lambdas used because panels/widgets aren't created yet at this point. self._lifecycle = StreamLifecycle( timeline=self._timeline, waveform_engine=self._waveform_engine, transcriber=self._transcriber, on_new_frames=lambda frames: None, # frame polling handles new frames on_waveform_update=lambda peaks, bd: self._waveform_widget.set_peaks(peaks, bd), on_transcript_ready=lambda segs: self._transcript_panel.add_items(segs), on_scene_marker=lambda ts: self._timeline.add_scene_marker(ts), on_recorder_restarted=lambda path: self._monitor.set_recording(path), ) # Panels (own their selection state) self._frames_panel = FramesPanel() self._transcript_panel = TranscriptPanel() # Main layout self._main_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) self._main_paned.set_shrink_start_child(False) self._main_paned.set_shrink_end_child(False) self._main_paned.set_position(450) self._agent_output = AgentOutputPanel() self._main_paned.set_start_child(self._agent_output) right_box = self._build_right_panels() self._main_paned.set_end_child(right_box) # Header toolbar = Adw.ToolbarView() header = Adw.HeaderBar() header.set_title_widget(Gtk.Label(label=APP_NAME)) self._connect_btn = Gtk.Button(label="Connect") self._connect_btn.add_css_class("suggested-action") self._connect_btn.connect("clicked", self._on_connect_clicked) header.pack_start(self._connect_btn) self._load_btn = Gtk.Button(label="Load Session") self._load_btn.connect("clicked", self._on_load_session_clicked) header.pack_start(self._load_btn) toolbar.add_top_bar(header) toolbar.set_content(self._main_paned) self.set_content(toolbar) self.connect("close-request", self._on_close) # Keyboard shortcuts self._setup_keyboard() # Wire panel signals self._frames_panel.connect("capture-requested", lambda p: self._on_capture_clicked()) self._frames_panel.connect("threshold-changed", lambda p, v: self._on_scene_threshold(v)) # Cross-panel exclusion: selecting frame clears transcript and vice versa self._frames_panel.connect("selection-changed", self._on_frame_selection_changed) self._transcript_panel.connect("selection-changed", self._on_transcript_selection_changed) self._transcript_panel.connect("min-chunk-changed", self._on_min_chunk_changed) self._transcript_panel.connect("lines-per-group-changed", self._on_lines_per_group_changed) log.info("Window initialized") GLib.idle_add(self._check_agent_auth) # -- Cross-panel selection exclusion -- def _on_frame_selection_changed(self, panel): if panel.selected is not None: self._transcript_panel.clear_selection() def _on_transcript_selection_changed(self, panel): if panel.has_selection: self._frames_panel.clear_selection() # -- Connect / Disconnect -- def _on_connect_clicked(self, button): if self._lifecycle.is_streaming: self._stop_stream(reload_session=True) else: session_id = self._lifecycle.stream_mgr.session_id if self._lifecycle.stream_mgr else None if self._lifecycle.stream_mgr: self._stop_stream() self._start_stream(session_id=session_id) def _on_capture_clicked(self): if self._lifecycle.stream_mgr: self._lifecycle.stream_mgr.capture_now( on_new_frames=self._lifecycle._handle_new_scene_frames ) def _on_scene_threshold(self, val): if self._lifecycle.stream_mgr: self._lifecycle.stream_mgr.scene_threshold = val def _on_min_chunk_changed(self, panel, val): import cht.config cht.config.TRANSCRIBE_MIN_CHUNK_S = val def _on_lines_per_group_changed(self, panel, val): import cht.config cht.config.TRANSCRIBE_LINES_PER_GROUP = val # -- Session loading -- def _on_load_session_clicked(self, button): sessions = list_sessions() if not sessions: self._agent_output.append("No previous sessions found.\n") return dialog = SessionDialog(self) dialog.set_current_session( self._lifecycle.stream_mgr.session_id if self._lifecycle.stream_mgr else None ) dialog.connect("session-selected", lambda d, sid: self._load_session(sid)) dialog.present() def _load_session(self, session_id): """Load an existing session for review (no streaming).""" if self._lifecycle.is_streaming or self._lifecycle.stream_mgr: self._stop_stream() try: mgr = StreamManager.from_existing(session_id) except FileNotFoundError as e: self._agent_output.append(f"Error: {e}\n") return self._lifecycle.set_manager_readonly(mgr) self.set_title(f"{APP_NAME} — {session_id}") self._agent_output.append(f"Loaded session: {session_id}\n") segments = mgr.recording_segments if segments: self._monitor.set_recording(segments[0]) duration = mgr.total_duration() if duration > 0: self._timeline.set_duration(duration) self._timeline.seek(0) self._agent_output.append( f" Recording: {len(segments)} segment(s), " f"{int(duration)}s duration\n" ) else: self._agent_output.append(" No recordings found (frames only).\n") self._load_existing_frames() self._load_existing_transcript() # Waveform from recording (background) if segments: from cht.stream import ffmpeg as ff def _compute_waveform(): audio_dir = mgr.audio_dir audio_dir.mkdir(parents=True, exist_ok=True) full_wav = audio_dir / "full.wav" try: ff.extract_audio_chunk(segments[0], full_wav) self._waveform_engine.compute_full(full_wav) peaks = self._waveform_engine.peaks bucket_dur = self._waveform_engine.bucket_duration GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur) except Exception as e: log.error("Waveform computation failed: %s", e) Thread(target=_compute_waveform, daemon=True, name="waveform_load").start() self._populate_model_dropdown() # -- Streaming -- def _start_stream(self, session_id=None): log.info("Starting stream...") self._connect_btn.set_label("Disconnect") self._connect_btn.remove_css_class("suggested-action") self._connect_btn.add_css_class("destructive-action") mgr = self._lifecycle.start(session_id=session_id) self._monitor.set_recording(mgr.recording_path) self._monitor.set_live_source(mgr.relay_url) GLib.timeout_add(1000, self._poll_frames) # Reload existing data if resuming if session_id: self._load_existing_frames() self._load_existing_transcript() self.set_title(f"{APP_NAME} — {mgr.session_id}") log.info("Waiting for sender...") def _on_live_toggle(self): pos = self._monitor.get_live_position() self._timeline.toggle_live(live_player_pos=pos) def _stop_stream(self, reload_session=False): log.info("Stopping stream...") mgr = self._lifecycle.stream_mgr last_session_id = mgr.session_id if mgr and not mgr.readonly else None self._lifecycle.stop() self._timeline.reset() self._monitor.reset() self._waveform_engine.reset() self._waveform_widget.set_peaks(None, 0.05) self._transcriber.reset() self._agent.clear_history() self._known_frames = set() self._frames_panel.clear() self._transcript_panel.clear() self._connect_btn.set_label("Connect") self._connect_btn.remove_css_class("destructive-action") self._connect_btn.add_css_class("suggested-action") self.set_title(APP_NAME) if reload_session and last_session_id: GLib.idle_add(self._load_session, last_session_id) def _on_close(self, *args): self.teardown() def teardown(self): """Full cleanup for app exit — safe to call multiple times.""" if self._lifecycle.stream_mgr or self._lifecycle.is_streaming: self._stop_stream() self._monitor.stop() # -- Layout -- def _build_right_panels(self): right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) # Video + waveform top_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) top_paned.set_shrink_start_child(False) top_paned.set_shrink_end_child(False) self._monitor = MonitorWidget(self._timeline) self._monitor.set_hexpand(True) stream_frame = Gtk.Frame() stream_frame.set_child(self._monitor) top_paned.set_start_child(stream_frame) self._waveform_widget = WaveformWidget(self._timeline) waveform_frame = Gtk.Frame() waveform_frame.set_child(self._waveform_widget) top_paned.set_end_child(waveform_frame) top_paned.set_position(650) right_box.append(top_paned) # Timeline slider self._timeline_controls = TimelineControls(self._timeline) self._timeline_controls.set_live_toggle_callback(self._on_live_toggle) right_box.append(self._timeline_controls) # Frames frames_frame = Gtk.Frame() frames_frame.set_child(self._frames_panel) right_box.append(frames_frame) # Transcript transcript_frame = Gtk.Frame() transcript_frame.set_child(self._transcript_panel) right_box.append(transcript_frame) # Agent input self._agent_input = AgentInputPanel() self._agent_input.connect("send-requested", lambda p, text: self._send_message(text or None)) self._agent_input.connect("action-requested", lambda p, verb: self._send_action(verb)) self._agent_input.connect("model-changed", self._on_model_changed) self._agent_input.connect("lang-changed", self._on_lang_changed) self._agent_input.connect("history-toggled", lambda p, v: setattr(self._agent, "include_history", v)) right_box.append(self._agent_input) return right_box # -- Keyboard -- def _setup_keyboard(self): kb = KeyboardManager() def _entry_focused(): focus = self.get_focus() if focus is None: return False w = focus while w is not None: if w is self._agent_input.entry: return True w = w.get_parent() return False kb.set_passthrough(_entry_focused, except_keys={KEY_ESCAPE}) kb.bind(KEY_LEFT, lambda **_: self._frames_panel.select_adjacent(-1)) kb.bind(KEY_RIGHT, lambda **_: self._frames_panel.select_adjacent(1)) kb.bind(KEY_UP, lambda shift=False, **_: self._transcript_panel.select_adjacent(-1, extend=shift)) kb.bind(KEY_DOWN, lambda shift=False, **_: self._transcript_panel.select_adjacent(1, extend=shift)) kb.bind(KEY_RETURN, lambda **_: self._send_message(self._build_selection_message("answer")) if self._build_selection_message("answer") else None) kb.bind(KEY_KP_ENTER, lambda **_: self._send_message(self._build_selection_message("answer")) if self._build_selection_message("answer") else None) kb.bind(KEY_ESCAPE, lambda **_: (self.set_focus(None), self._frames_panel.clear_selection(), self._transcript_panel.clear_selection())) kb.bind(KEY_DELETE, lambda **_: self._agent_output.clear()) kb.attach(self) # -- Agent actions -- def _build_selection_message(self, verb: str) -> str | None: parts = [verb] if self._frames_panel.selected: parts.append(f"@{self._frames_panel.selected}") texts = self._transcript_panel.selected_texts if texts: parts.append(" ".join(texts)) return " ".join(parts) if len(parts) > 1 else None def _send_action(self, verb: str): msg = self._build_selection_message(verb) if not msg: self._agent_output.append("Select a frame or transcript first.\n") return self._send_message(msg) def _send_message(self, text: str | None = None): if not text: text = self._build_selection_message("answer") if not text: return if not self._lifecycle.stream_mgr: self._agent_output.append("No active session.\n") return self._agent_output.append(f"\n> {text}\n…\n") self._agent_output.begin_response() self._agent.send( message=text, stream_mgr=self._lifecycle.stream_mgr, tracker=self._lifecycle.tracker, on_chunk=lambda chunk: GLib.idle_add(self._agent_output.replace_thinking, chunk), on_done=lambda err: GLib.idle_add(self._agent_output.finish_response, err), ) # -- Settings callbacks -- def _on_lang_changed(self, _panel, lang_code): self._transcriber.language = lang_code or None log.info("Transcript language: %s", lang_code or "auto") def _on_model_changed(self, _panel, model): self._agent.model = model log.info("Model switched to %s", model) def _populate_model_dropdown(self): self._agent_input.populate_models( self._agent.available_models, self._agent.model ) def _check_agent_auth(self): import os if os.environ.get("GROQ_API_KEY") or os.environ.get("OPENAI_API_KEY"): self._populate_model_dropdown() return err = check_claude_cli() if err: self._agent_output.append(f"⚠ {err}\n") else: self._agent_output.append(f"Agent ready ({self._agent.provider_name})\n") self._populate_model_dropdown() # -- Data loading -- def _load_existing_frames(self): if not self._lifecycle.stream_mgr: return entries = load_frame_index(self._lifecycle.stream_mgr.frames_dir) if not entries: self._agent_output.append(" No frames found.\n") return items = [] for entry in entries: try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(entry["path"]), 256, 144, True) items.append({"id": entry["id"], "pixbuf": pixbuf, "timestamp": entry["timestamp"]}) except Exception as e: log.warning("Thumbnail load failed for %s: %s", entry["id"], e) if items: self._frames_panel.load_items(items) self._known_frames = {item["id"] for item in items} self._agent_output.append(f" Loaded {len(items)} frame thumbnails.\n") def _load_existing_transcript(self): if not self._lifecycle.stream_mgr: return transcript_index = self._lifecycle.stream_mgr.transcript_dir / "index.json" if not transcript_index.exists(): return self._transcriber.load_index(transcript_index) segs = self._transcriber.all_segments() if segs: self._transcript_panel.add_items(segs) self._agent_output.append(f" Loaded {len(segs)} transcript segments.\n") def _poll_frames(self): if not self._lifecycle.stream_mgr: return False for entry in load_frame_index(self._lifecycle.stream_mgr.frames_dir): fid = entry["id"] if fid in self._known_frames: continue self._known_frames.add(fid) try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(entry["path"]), 256, 144, True) auto = not self._transcript_panel.has_selection self._frames_panel.add_item(fid, pixbuf, entry["timestamp"], auto_select=auto) except Exception as e: log.warning("Thumbnail load failed for %s: %s", fid, e) return True