"""Main application window — wires Timeline to all components.""" import json import logging from pathlib import Path import gi gi.require_version("Gtk", "4.0") gi.require_version("Adw", "1") gi.require_version("GdkPixbuf", "2.0") from gi.repository import Gtk, Gdk, Adw, GLib, Pango, GdkPixbuf from threading import Thread from cht.config import APP_NAME, SCENE_THRESHOLD, TRANSCRIBE_MIN_CHUNK_S from cht.ui.timeline import Timeline, TimelineControls from cht.ui.monitor import MonitorWidget from cht.ui.waveform import WaveformWidget from cht.ui.frames_panel import FramesPanel from cht.ui.transcript_panel import TranscriptPanel from cht.ui.keyboard import KeyboardManager, KEY_LEFT, KEY_RIGHT, KEY_UP, KEY_DOWN, KEY_RETURN, KEY_KP_ENTER, KEY_ESCAPE, KEY_DELETE from cht.ui import markdown from cht.audio.waveform import WaveformEngine from cht.transcriber.engine import TranscriberEngine, LANGUAGES from cht.stream.manager import StreamManager, list_sessions, delete_sessions from cht.stream.tracker import RecordingTracker from cht.agent.runner import AgentRunner, ACTIONS, check_claude_cli log = logging.getLogger(__name__) class ChtWindow(Adw.ApplicationWindow): def __init__(self, **kwargs): super().__init__(**kwargs) self.set_title(APP_NAME) self.set_default_size(1400, 900) self._streaming = False self._gone_live = False self._stream_mgr = None self._tracker = None self._known_frames = set() # Core components self._timeline = Timeline() self._agent = AgentRunner() self._waveform_engine = WaveformEngine() self._transcriber = TranscriberEngine() self._pending_transcript_audio = [] self._pending_transcript_duration = 0.0 # Panels (own their selection state) self._frames_panel = FramesPanel() self._transcript_panel = TranscriptPanel() # Main layout self._main_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) self._main_paned.set_shrink_start_child(False) self._main_paned.set_shrink_end_child(False) self._main_paned.set_position(450) self._main_paned.set_start_child(self._build_agent_output()) right_box = self._build_right_panels() self._main_paned.set_end_child(right_box) # Header toolbar = Adw.ToolbarView() header = Adw.HeaderBar() header.set_title_widget(Gtk.Label(label=APP_NAME)) self._connect_btn = Gtk.Button(label="Connect") self._connect_btn.add_css_class("suggested-action") self._connect_btn.connect("clicked", self._on_connect_clicked) header.pack_start(self._connect_btn) self._load_btn = Gtk.Button(label="Load Session") self._load_btn.connect("clicked", self._on_load_session_clicked) header.pack_start(self._load_btn) toolbar.add_top_bar(header) toolbar.set_content(self._main_paned) self.set_content(toolbar) self.connect("close-request", self._on_close) # Keyboard shortcuts self._setup_keyboard() # Wire panel signals self._frames_panel.connect("capture-requested", lambda p: self._on_capture_clicked()) self._frames_panel.connect("threshold-changed", lambda p, v: self._on_scene_threshold(v)) # Cross-panel exclusion: selecting frame clears transcript and vice versa self._frames_panel.connect("selection-changed", self._on_frame_selection_changed) self._transcript_panel.connect("selection-changed", self._on_transcript_selection_changed) self._transcript_panel.connect("min-chunk-changed", self._on_min_chunk_changed) self._transcript_panel.connect("lines-per-group-changed", self._on_lines_per_group_changed) log.info("Window initialized") GLib.idle_add(self._check_agent_auth) # -- Cross-panel selection exclusion -- def _on_frame_selection_changed(self, panel): if panel.selected is not None: self._transcript_panel.clear_selection() def _on_transcript_selection_changed(self, panel): if panel.has_selection: self._frames_panel.clear_selection() # -- Connect / Disconnect -- def _on_connect_clicked(self, button): if self._streaming: self._stop_stream(reload_session=True) else: session_id = self._stream_mgr.session_id if self._stream_mgr else None if self._stream_mgr: self._stop_stream() self._start_stream(session_id=session_id) def _on_capture_clicked(self): if self._stream_mgr: self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames) def _on_scene_threshold(self, val): if self._stream_mgr: self._stream_mgr.scene_threshold = val def _on_min_chunk_changed(self, panel, val): import cht.config cht.config.TRANSCRIBE_MIN_CHUNK_S = val def _on_lines_per_group_changed(self, panel, val): import cht.config cht.config.TRANSCRIBE_LINES_PER_GROUP = val # -- Session loading -- def _on_load_session_clicked(self, button): sessions = list_sessions() if not sessions: self._append_agent_output("No previous sessions found.\n") return dialog = Adw.Window(transient_for=self, modal=True) dialog.set_title("Load Session") dialog.set_default_size(500, 400) toolbar = Adw.ToolbarView() header = Adw.HeaderBar() select_all_btn = Gtk.CheckButton(label="All") header.pack_start(select_all_btn) delete_btn = Gtk.Button(label="Delete") delete_btn.add_css_class("destructive-action") header.pack_end(delete_btn) toolbar.add_top_bar(header) scroll = Gtk.ScrolledWindow() scroll.set_vexpand(True) listbox = Gtk.ListBox() listbox.set_selection_mode(Gtk.SelectionMode.NONE) listbox.add_css_class("boxed-list") checks: list[tuple[str, Gtk.CheckButton]] = [] for sid, sdir in sessions: idx = sdir / "frames" / "index.json" nframes = 0 try: nframes = len(json.loads(idx.read_text())) except Exception: pass nrec = len(list((sdir / "stream").glob("recording_*.mp4"))) check = Gtk.CheckButton() checks.append((sid, check)) row = Adw.ActionRow() row.set_title(sid) row.set_subtitle(f"{nframes} frames, {nrec} segments") row.set_activatable(True) row.add_prefix(check) def _on_row_activated(r, s=sid, d=dialog): d.close() self._load_session(s) row.connect("activated", _on_row_activated) listbox.append(row) def _on_select_all(btn): active = btn.get_active() for _, cb in checks: cb.set_active(active) select_all_btn.connect("toggled", _on_select_all) def _on_delete(btn): to_delete = [sid for sid, cb in checks if cb.get_active()] if not to_delete: return current = self._stream_mgr.session_id if self._stream_mgr else None if current in to_delete: to_delete.remove(current) if to_delete: delete_sessions(to_delete) dialog.close() self._on_load_session_clicked(None) delete_btn.connect("clicked", _on_delete) scroll.set_child(listbox) toolbar.set_content(scroll) dialog.set_content(toolbar) dialog.present() def _load_session(self, session_id): """Load an existing session for review (no streaming).""" if self._streaming or self._stream_mgr: self._stop_stream() try: self._stream_mgr = StreamManager.from_existing(session_id) except FileNotFoundError as e: self._append_agent_output(f"Error: {e}\n") return self.set_title(f"{APP_NAME} — {session_id}") self._append_agent_output(f"Loaded session: {session_id}\n") segments = self._stream_mgr.recording_segments if segments: self._monitor.set_recording(segments[0]) duration = self._stream_mgr.total_duration() if duration > 0: self._timeline.set_duration(duration) self._timeline.seek(0) self._append_agent_output( f" Recording: {len(segments)} segment(s), " f"{int(duration)}s duration\n" ) else: self._append_agent_output(" No recordings found (frames only).\n") self._load_existing_frames() self._load_existing_transcript() # Waveform from recording (background) if segments: from cht.stream import ffmpeg as ff def _compute_waveform(): audio_dir = self._stream_mgr.audio_dir audio_dir.mkdir(parents=True, exist_ok=True) full_wav = audio_dir / "full.wav" try: ff.extract_audio_chunk(segments[0], full_wav) self._waveform_engine.compute_full(full_wav) peaks = self._waveform_engine.peaks bucket_dur = self._waveform_engine.bucket_duration GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur) except Exception as e: log.error("Waveform computation failed: %s", e) Thread(target=_compute_waveform, daemon=True, name="waveform_load").start() self._populate_model_dropdown() # -- Streaming -- def _start_stream(self, session_id=None): log.info("Starting stream...") self._connect_btn.set_label("Disconnect") self._connect_btn.remove_css_class("suggested-action") self._connect_btn.add_css_class("destructive-action") self._streaming = True self._gone_live = False self._stream_mgr = StreamManager(session_id=session_id) self._stream_mgr.setup_dirs() self._stream_mgr.start_recorder() self._monitor.set_recording(self._stream_mgr.recording_path) self._monitor.set_live_source(self._stream_mgr.relay_url) self._tracker = RecordingTracker( get_segments=lambda: self._stream_mgr.recording_segments if self._stream_mgr else [], on_duration_update=self._on_duration_update, ) self._tracker.start() self._stream_mgr.start_scene_detector(on_new_frames=self._on_new_scene_frames) self._stream_mgr.start_audio_extractor(on_new_audio=self._on_new_audio) GLib.timeout_add(1000, self._poll_frames) GLib.timeout_add(1000, self._tick_live) GLib.timeout_add(2000, self._check_recorder) # Reload existing data if resuming if session_id: self._load_existing_frames() self._load_existing_transcript() self.set_title(f"{APP_NAME} — {self._stream_mgr.session_id}") log.info("Waiting for sender...") def _go_live_once(self): if self._stream_mgr: log.info("Going LIVE (startup delay elapsed)") self._timeline.go_live() return False def _tick_live(self): if not self._streaming: return False self._timeline.tick_live() return True def _on_duration_update(self, duration): GLib.idle_add(self._timeline.set_duration, duration) if not self._gone_live: self._gone_live = True GLib.idle_add(self._go_live_once) if self._stream_mgr: self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames) def _on_new_scene_frames(self, frames): for f in frames: GLib.idle_add(self._timeline.add_scene_marker, f["timestamp"]) def _on_new_audio(self, wav_path, start_time, duration): if not self._stream_mgr: return self._waveform_engine.append_chunk(wav_path, start_time) peaks = self._waveform_engine.peaks bucket_dur = self._waveform_engine.bucket_duration GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur) self._pending_transcript_audio.append((wav_path, start_time, duration)) self._pending_transcript_duration += duration if self._pending_transcript_duration < TRANSCRIBE_MIN_CHUNK_S: return first_start = self._pending_transcript_audio[0][1] total_dur = self._pending_transcript_duration self._pending_transcript_audio.clear() self._pending_transcript_duration = 0.0 mgr = self._stream_mgr chunk_wav = mgr.audio_dir / f"transcript_{int(first_start):06d}.wav" def _transcribe(): from cht.stream import ffmpeg as ff try: ff.extract_audio_chunk( mgr.recording_path, chunk_wav, start_time=first_start, duration=total_dur, ) except Exception as e: log.error("Transcript audio extraction failed: %s", e) return if not chunk_wav.exists(): return new_segs = self._transcriber.transcribe_chunk(chunk_wav, time_offset=first_start) self._transcriber.save_index(mgr.transcript_dir / "index.json") if new_segs: GLib.idle_add(self._transcript_panel.add_items, new_segs) Thread(target=_transcribe, daemon=True, name="transcriber").start() def _check_recorder(self): if not self._streaming or not self._stream_mgr: return False if not self._stream_mgr.recorder_alive(): log.warning("Recorder died — restarting into new segment") self._stream_mgr.restart_recorder() self._monitor.set_recording(self._stream_mgr.recording_path) return True def _on_live_toggle(self): pos = self._monitor.get_live_position() self._timeline.toggle_live(live_player_pos=pos) def _stop_stream(self, reload_session=False): log.info("Stopping stream...") last_session_id = self._stream_mgr.session_id if self._stream_mgr and not self._stream_mgr.readonly else None if self._tracker: self._tracker.stop() self._tracker = None if self._stream_mgr: if not self._stream_mgr.readonly: self._stream_mgr.stop_all() self._stream_mgr = None self._timeline.reset() self._monitor.reset() self._waveform_engine.reset() self._waveform_widget.set_peaks(None, 0.05) self._transcriber.reset() self._agent.clear_history() self._pending_transcript_audio.clear() self._pending_transcript_duration = 0.0 self._known_frames = set() self._frames_panel.clear() self._transcript_panel.clear() self._connect_btn.set_label("Connect") self._connect_btn.remove_css_class("destructive-action") self._connect_btn.add_css_class("suggested-action") self._streaming = False self.set_title(APP_NAME) if reload_session and last_session_id: GLib.idle_add(self._load_session, last_session_id) def _on_close(self, *args): self.teardown() def teardown(self): """Full cleanup for app exit — safe to call multiple times.""" if self._stream_mgr or self._streaming: self._stop_stream() self._monitor.stop() # -- Layout -- def _build_right_panels(self): right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) # Video + waveform top_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) top_paned.set_shrink_start_child(False) top_paned.set_shrink_end_child(False) self._monitor = MonitorWidget(self._timeline) self._monitor.set_hexpand(True) stream_frame = Gtk.Frame() stream_frame.set_child(self._monitor) top_paned.set_start_child(stream_frame) self._waveform_widget = WaveformWidget(self._timeline) waveform_frame = Gtk.Frame() waveform_frame.set_child(self._waveform_widget) top_paned.set_end_child(waveform_frame) top_paned.set_position(650) right_box.append(top_paned) # Timeline slider self._timeline_controls = TimelineControls(self._timeline) self._timeline_controls.set_live_toggle_callback(self._on_live_toggle) right_box.append(self._timeline_controls) # Frames frames_frame = Gtk.Frame() frames_frame.set_child(self._frames_panel) right_box.append(frames_frame) # Transcript transcript_frame = Gtk.Frame() transcript_frame.set_child(self._transcript_panel) right_box.append(transcript_frame) # Agent input self._agent_input = self._build_agent_input() right_box.append(self._agent_input) return right_box # -- Agent panels -- def _build_agent_output(self): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) header.set_margin_start(8) header.set_margin_end(8) header.set_margin_top(8) header.set_margin_bottom(8) label = Gtk.Label(label="Agent Output") label.add_css_class("heading") label.set_hexpand(True) label.set_halign(Gtk.Align.START) header.append(label) clear_btn = Gtk.Button(label="Clear") clear_btn.add_css_class("flat") clear_btn.connect("clicked", self._on_clear_agent_output) header.append(clear_btn) box.append(header) self._agent_output_view = Gtk.TextView() self._agent_output_view.set_editable(False) self._agent_output_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) self._agent_output_view.set_cursor_visible(False) self._agent_output_view.set_left_margin(8) self._agent_output_view.set_right_margin(8) markdown.setup_tags(self._agent_output_view.get_buffer()) scroll = Gtk.ScrolledWindow() scroll.set_vexpand(True) scroll.set_child(self._agent_output_view) box.append(scroll) frame = Gtk.Frame() frame.set_child(box) return frame def _build_agent_input(self): outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) outer.set_margin_start(4) outer.set_margin_end(4) outer.set_margin_top(4) outer.set_margin_bottom(4) actions_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) for label, verb in ACTIONS.items(): btn = Gtk.Button(label=label) btn.add_css_class("flat") btn.connect("clicked", lambda b, v=verb: self._send_action(v)) actions_box.append(btn) spacer = Gtk.Box() spacer.set_hexpand(True) actions_box.append(spacer) model_label = Gtk.Label(label="Model:") model_label.add_css_class("dim-label") actions_box.append(model_label) self._model_dropdown = Gtk.DropDown.new_from_strings([]) self._model_dropdown.set_size_request(200, -1) self._model_dropdown.connect("notify::selected", self._on_model_changed) actions_box.append(self._model_dropdown) lang_label = Gtk.Label(label="Lang:") lang_label.add_css_class("dim-label") actions_box.append(lang_label) lang_names = list(LANGUAGES.keys()) self._lang_dropdown = Gtk.DropDown.new_from_strings(lang_names) self._lang_dropdown.set_selected(0) self._lang_dropdown.connect("notify::selected", self._on_lang_changed) actions_box.append(self._lang_dropdown) self._history_toggle = Gtk.CheckButton(label="Chat") self._history_toggle.set_tooltip_text("Include conversation history in prompts") self._history_toggle.connect("toggled", lambda b: setattr(self._agent, "include_history", b.get_active())) actions_box.append(self._history_toggle) outer.append(actions_box) input_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) self._input_entry = Gtk.Entry() self._input_entry.set_hexpand(True) self._input_entry.set_placeholder_text("Message agent... (@F1-3 frames, @T1-5 transcript)") self._input_entry.connect("activate", lambda e: self._send_message()) input_row.append(self._input_entry) send_btn = Gtk.Button(label="Send") send_btn.add_css_class("suggested-action") send_btn.connect("clicked", lambda b: self._send_message()) input_row.append(send_btn) outer.append(input_row) frame = Gtk.Frame() frame.set_child(outer) return frame # -- Keyboard -- def _setup_keyboard(self): kb = KeyboardManager() def _entry_focused(): focus = self.get_focus() if focus is None: return False w = focus while w is not None: if w is self._input_entry: return True w = w.get_parent() return False kb.set_passthrough(_entry_focused, except_keys={KEY_ESCAPE}) kb.bind(KEY_LEFT, lambda **_: self._frames_panel.select_adjacent(-1)) kb.bind(KEY_RIGHT, lambda **_: self._frames_panel.select_adjacent(1)) kb.bind(KEY_UP, lambda shift=False, **_: self._transcript_panel.select_adjacent(-1, extend=shift)) kb.bind(KEY_DOWN, lambda shift=False, **_: self._transcript_panel.select_adjacent(1, extend=shift)) kb.bind(KEY_RETURN, lambda **_: self._send_message(self._build_selection_message("answer")) if self._build_selection_message("answer") else None) kb.bind(KEY_KP_ENTER, lambda **_: self._send_message(self._build_selection_message("answer")) if self._build_selection_message("answer") else None) kb.bind(KEY_ESCAPE, lambda **_: (self.set_focus(None), self._frames_panel.clear_selection(), self._transcript_panel.clear_selection())) kb.bind(KEY_DELETE, lambda **_: self._on_clear_agent_output(None)) kb.attach(self) # -- Agent actions -- def _build_selection_message(self, verb: str) -> str | None: parts = [verb] if self._frames_panel.selected: parts.append(f"@{self._frames_panel.selected}") texts = self._transcript_panel.selected_texts if texts: parts.append(" ".join(texts)) return " ".join(parts) if len(parts) > 1 else None def _send_action(self, verb: str): msg = self._build_selection_message(verb) if not msg: self._append_agent_output("Select a frame or transcript first.\n") return self._send_message(msg) def _send_message(self, text: str | None = None): if text is None: text = self._input_entry.get_text().strip() self._input_entry.set_text("") if not text: text = self._build_selection_message("answer") if not text: return if not self._stream_mgr: self._append_agent_output("No active session.\n") return self._append_agent_output(f"\n> {text}\n…\n") self._agent.send( message=text, stream_mgr=self._stream_mgr, tracker=self._tracker, on_chunk=lambda chunk: GLib.idle_add(self._replace_thinking, chunk), on_done=lambda err: GLib.idle_add(self._on_agent_done, err), ) self._thinking_replaced = False self._response_start_mark = None self._response_accum = [] def _replace_thinking(self, chunk: str): buf = self._agent_output_view.get_buffer() if not self._thinking_replaced: self._thinking_replaced = True end = buf.get_end_iter() start = end.copy() start.backward_chars(2) buf.delete(start, end) self._response_start_mark = buf.create_mark( None, buf.get_end_iter(), left_gravity=True ) self._response_accum.append(chunk) self._append_agent_output(chunk) def _on_agent_done(self, err: str | None): if err: self._append_agent_output(f"[Error: {err}]\n") return if self._response_start_mark and self._response_accum: buf = self._agent_output_view.get_buffer() start = buf.get_iter_at_mark(self._response_start_mark) end = buf.get_end_iter() buf.delete(start, end) markdown.render(buf, start, "".join(self._response_accum)) buf.delete_mark(self._response_start_mark) self._append_agent_output("\n") # -- Settings callbacks -- def _on_clear_agent_output(self, _button): self._agent_output_view.get_buffer().set_text("") def _on_lang_changed(self, dropdown, _pspec): idx = dropdown.get_selected() lang_names = list(LANGUAGES.keys()) if idx < len(lang_names): lang_code = LANGUAGES[lang_names[idx]] self._transcriber.language = lang_code log.info("Transcript language: %s (%s)", lang_names[idx], lang_code or "auto") def _on_model_changed(self, dropdown, _pspec): idx = dropdown.get_selected() model = self._agent.available_models[idx] if idx < len(self._agent.available_models) else None if model: self._agent.model = model log.info("Model switched to %s", model) def _populate_model_dropdown(self): models = self._agent.available_models if not models: return string_list = Gtk.StringList.new(models) self._model_dropdown.set_model(string_list) current = self._agent.model for i, m in enumerate(models): if m == current: self._model_dropdown.set_selected(i) break def _check_agent_auth(self): import os if os.environ.get("GROQ_API_KEY") or os.environ.get("OPENAI_API_KEY"): self._populate_model_dropdown() return err = check_claude_cli() if err: self._append_agent_output(f"⚠ {err}\n") else: self._append_agent_output(f"Agent ready ({self._agent.provider_name})\n") self._populate_model_dropdown() def _append_agent_output(self, text: str): buf = self._agent_output_view.get_buffer() buf.insert(buf.get_end_iter(), text) self._agent_output_view.scroll_to_iter(buf.get_end_iter(), 0, False, 0, 0) # -- Data loading -- def _load_existing_frames(self): if not self._stream_mgr: return index_path = self._stream_mgr.frames_dir / "index.json" if not index_path.exists(): self._append_agent_output(" No frames found.\n") return try: index = json.loads(index_path.read_text()) except (json.JSONDecodeError, IOError): return items = [] for entry in index: fpath = Path(entry["path"]) if not fpath.exists(): fpath = self._stream_mgr.frames_dir / fpath.name if not fpath.exists(): continue try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(fpath), 256, 144, True) items.append({"id": entry["id"], "pixbuf": pixbuf, "timestamp": entry.get("timestamp", 0)}) except Exception as e: log.warning("Thumbnail load failed for %s: %s", entry["id"], e) if items: self._frames_panel.load_items(items) self._known_frames = {item["id"] for item in items} self._append_agent_output(f" Loaded {len(items)} frame thumbnails.\n") def _load_existing_transcript(self): if not self._stream_mgr: return transcript_index = self._stream_mgr.transcript_dir / "index.json" if not transcript_index.exists(): return self._transcriber.load_index(transcript_index) segs = self._transcriber.all_segments() if segs: self._transcript_panel.add_items(segs) self._append_agent_output(f" Loaded {len(segs)} transcript segments.\n") def _poll_frames(self): if not self._stream_mgr: return False index_path = self._stream_mgr.frames_dir / "index.json" if not index_path.exists(): return True try: index = json.loads(index_path.read_text()) except (json.JSONDecodeError, IOError): return True for entry in index: fid = entry["id"] if fid in self._known_frames: continue fpath = Path(entry["path"]) if not fpath.exists(): continue self._known_frames.add(fid) timestamp = entry.get("timestamp", 0) try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(fpath), 256, 144, True) auto = not self._transcript_panel.has_selection self._frames_panel.add_item(fid, pixbuf, timestamp, auto_select=auto) except Exception as e: log.warning("Thumbnail load failed for %s: %s", fid, e) return True