"""Main application window — wires Timeline to all components.""" import json import logging from pathlib import Path import gi gi.require_version("Gtk", "4.0") gi.require_version("Adw", "1") gi.require_version("GdkPixbuf", "2.0") from gi.repository import Gtk, Gdk, Adw, GLib, Pango, GdkPixbuf from threading import Thread from cht.config import APP_NAME, SCENE_THRESHOLD from cht.ui.timeline import Timeline, TimelineControls from cht.ui.monitor import MonitorWidget from cht.ui.waveform import WaveformWidget from cht.audio.waveform import WaveformEngine from cht.transcriber.engine import TranscriberEngine, LANGUAGES from cht.stream.manager import StreamManager, list_sessions from cht.stream.tracker import RecordingTracker from cht.agent.runner import AgentRunner, ACTIONS, check_claude_cli log = logging.getLogger(__name__) class ChtWindow(Adw.ApplicationWindow): def __init__(self, **kwargs): super().__init__(**kwargs) self.set_title(APP_NAME) self.set_default_size(1400, 900) self._streaming = False self._gone_live = False self._stream_mgr = None self._tracker = None self._known_frames = set() self._selected_frame = None # currently selected frame ID self._frame_widgets = {} # frame_id → outer Box widget self._frame_order = [] # ordered list of frame IDs self._transcript_order = [] # ordered list of transcript segment IDs self._transcript_rows = {} # segment_id → ListBoxRow self._transcript_texts = {} # segment_id → text (clean, no timestamps) self._selected_transcripts = [] # ordered list of selected transcript IDs # Timeline is the central state machine self._timeline = Timeline() self._agent = AgentRunner() self._waveform_engine = WaveformEngine() self._transcriber = TranscriberEngine() # Main layout self._main_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) self._main_paned.set_shrink_start_child(False) self._main_paned.set_shrink_end_child(False) self._main_paned.set_position(450) self._main_paned.set_start_child(self._build_agent_output()) right_box = self._build_right_panels() self._main_paned.set_end_child(right_box) # Header toolbar = Adw.ToolbarView() header = Adw.HeaderBar() header.set_title_widget(Gtk.Label(label=APP_NAME)) self._connect_btn = Gtk.Button(label="Connect") self._connect_btn.add_css_class("suggested-action") self._connect_btn.connect("clicked", self._on_connect_clicked) header.pack_start(self._connect_btn) self._load_btn = Gtk.Button(label="Load Session") self._load_btn.connect("clicked", self._on_load_session_clicked) header.pack_start(self._load_btn) toolbar.add_top_bar(header) toolbar.set_content(self._main_paned) self.set_content(toolbar) self.connect("close-request", self._on_close) # Global keyboard shortcuts for frame navigation (capture phase # so we intercept before GTK activates focused buttons) key_ctrl = Gtk.EventControllerKey() key_ctrl.set_propagation_phase(Gtk.PropagationPhase.CAPTURE) key_ctrl.connect("key-pressed", self._on_key_pressed) self.add_controller(key_ctrl) log.info("Window initialized") GLib.idle_add(self._check_agent_auth) def _on_connect_clicked(self, button): if self._streaming: self._stop_stream(reload_session=True) else: # If a session is loaded, continue it; otherwise start fresh session_id = self._stream_mgr.session_id if self._stream_mgr else None if self._stream_mgr: self._stop_stream() # clean teardown first self._start_stream(session_id=session_id) def _on_load_session_clicked(self, button): sessions = list_sessions() if not sessions: self._append_agent_output("No previous sessions found.\n") return dialog = Adw.Window(transient_for=self, modal=True) dialog.set_title("Load Session") dialog.set_default_size(500, 400) toolbar = Adw.ToolbarView() header = Adw.HeaderBar() toolbar.add_top_bar(header) scroll = Gtk.ScrolledWindow() scroll.set_vexpand(True) listbox = Gtk.ListBox() listbox.set_selection_mode(Gtk.SelectionMode.NONE) listbox.add_css_class("boxed-list") for sid, sdir in sessions: idx = sdir / "frames" / "index.json" nframes = 0 try: nframes = len(json.loads(idx.read_text())) except Exception: pass nrec = len(list((sdir / "stream").glob("recording_*.mp4"))) row = Adw.ActionRow() row.set_title(sid) row.set_subtitle(f"{nframes} frames, {nrec} segments") row.set_activatable(True) def _on_row_activated(r, s=sid, d=dialog): d.close() self._load_session(s) row.connect("activated", _on_row_activated) listbox.append(row) scroll.set_child(listbox) toolbar.set_content(scroll) dialog.set_content(toolbar) dialog.present() def _load_session(self, session_id): """Load an existing session for review (no streaming).""" # Stop any active stream or previous loaded session if self._streaming or self._stream_mgr: self._stop_stream() try: self._stream_mgr = StreamManager.from_existing(session_id) except FileNotFoundError as e: self._append_agent_output(f"Error: {e}\n") return self.set_title(f"{APP_NAME} — {session_id}") self._append_agent_output(f"Loaded session: {session_id}\n") # Load recording into review player if segments exist segments = self._stream_mgr.recording_segments if segments: self._monitor.set_recording(segments[0]) # Probe total duration and set timeline duration = self._stream_mgr.total_duration() if duration > 0: self._timeline.set_duration(duration) self._timeline.seek(0) # enter scrub mode at start self._append_agent_output( f" Recording: {len(segments)} segment(s), " f"{int(duration)}s duration\n" ) else: self._append_agent_output(" No recordings found (frames only).\n") # Load existing frames into the strip self._load_existing_frames() # Load existing transcript transcript_index = self._stream_mgr.transcript_dir / "index.json" if transcript_index.exists(): self._transcriber.load_index(transcript_index) segs = self._transcriber.all_segments() if segs: self._append_transcript_segments(segs) self._append_agent_output(f" Loaded {len(segs)} transcript segments.\n") # Compute waveform from existing recordings (background thread) if segments: from cht.stream import ffmpeg as ff def _compute_waveform(): audio_dir = self._stream_mgr.audio_dir audio_dir.mkdir(parents=True, exist_ok=True) full_wav = audio_dir / "full.wav" try: ff.extract_audio_chunk(segments[0], full_wav) self._waveform_engine.compute_full(full_wav) peaks = self._waveform_engine.peaks bucket_dur = self._waveform_engine.bucket_duration GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur) except Exception as e: log.error("Waveform computation failed: %s", e) Thread(target=_compute_waveform, daemon=True, name="waveform_load").start() # Set up agent auth/model if not already done self._populate_model_dropdown() def _start_stream(self, session_id=None): log.info("Starting stream...") self._connect_btn.set_label("Disconnect") self._connect_btn.remove_css_class("suggested-action") self._connect_btn.add_css_class("destructive-action") self._streaming = True self._gone_live = False # Continue existing session or create new one self._stream_mgr = StreamManager(session_id=session_id) self._stream_mgr.setup_dirs() # Start ffmpeg recorder (listens for sender, relays to UDP) self._stream_mgr.start_recorder() # Tell monitor where the recording will be and what URL to stream live from self._monitor.set_recording(self._stream_mgr.recording_path) self._monitor.set_live_source(self._stream_mgr.relay_url) # Start tracking recording duration (across segments) self._tracker = RecordingTracker( get_segments=lambda: self._stream_mgr.recording_segments if self._stream_mgr else [], on_duration_update=self._on_duration_update, ) self._tracker.start() # Start scene detection self._stream_mgr.start_scene_detector(on_new_frames=self._on_new_scene_frames) # Start audio extraction (waveform + transcription) self._stream_mgr.start_audio_extractor(on_new_audio=self._on_new_audio) # Start polling for frame thumbnails GLib.timeout_add(1000, self._poll_frames) # Tick the LIVE cursor every second GLib.timeout_add(1000, self._tick_live) # Watchdog: restart recorder on crash/disconnect GLib.timeout_add(2000, self._check_recorder) # If resuming a session, reload existing frames/transcript/waveform if session_id: self._load_existing_frames() transcript_index = self._stream_mgr.transcript_dir / "index.json" if transcript_index.exists(): self._transcriber.load_index(transcript_index) segs = self._transcriber.all_segments() if segs: self._append_transcript_segments(segs) self.set_title(f"{APP_NAME} — {self._stream_mgr.session_id}") log.info("Waiting for sender...") def _go_live_once(self): """Called once after startup delay — go LIVE.""" if self._stream_mgr: log.info("Going LIVE (startup delay elapsed)") self._timeline.go_live() return False # one-shot def _tick_live(self): """Tick cursor in LIVE mode so timer advances smoothly.""" if not self._streaming: return False self._timeline.tick_live() return True def _on_duration_update(self, duration): """Called from RecordingTracker thread.""" GLib.idle_add(self._timeline.set_duration, duration) if not self._gone_live: self._gone_live = True GLib.idle_add(self._go_live_once) # Capture initial frame — scene detector only fires on changes if self._stream_mgr: self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames) def _on_new_scene_frames(self, frames): """Called from scene detector thread when new frames are found.""" for f in frames: GLib.idle_add(self._timeline.add_scene_marker, f["timestamp"]) def _on_new_audio(self, wav_path, start_time, duration): """Called from audio extractor thread with new WAV chunk.""" if not self._stream_mgr: return # Compute waveform peaks (fast, ~1ms) self._waveform_engine.append_chunk(wav_path, start_time) peaks = self._waveform_engine.peaks bucket_dur = self._waveform_engine.bucket_duration GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur) # Transcribe in separate thread (GPU-bound, ~1-2s per chunk) mgr = self._stream_mgr # capture ref before thread starts def _transcribe(): new_segs = self._transcriber.transcribe_chunk(wav_path, time_offset=start_time) if mgr: self._transcriber.save_index(mgr.transcript_dir / "index.json") if new_segs: GLib.idle_add(self._append_transcript_segments, new_segs) Thread(target=_transcribe, daemon=True, name="transcriber").start() def _check_recorder(self): """Watchdog: restart recorder if it died (sender disconnect, etc).""" if not self._streaming or not self._stream_mgr: return False # stop polling if not self._stream_mgr.recorder_alive(): log.warning("Recorder died — restarting into new segment") self._stream_mgr.restart_recorder() # Re-point monitor to new recording segment self._monitor.set_recording(self._stream_mgr.recording_path) return True # keep polling def _on_live_toggle(self): """LIVE button handler — passes the live player's current position.""" pos = self._monitor.get_live_position() self._timeline.toggle_live(live_player_pos=pos) def _stop_stream(self, reload_session=False): log.info("Stopping stream...") # Remember session for reload last_session_id = self._stream_mgr.session_id if self._stream_mgr and not self._stream_mgr.readonly else None # Stop background threads first (sets stop flags, kills procs) if self._tracker: self._tracker.stop() self._tracker = None if self._stream_mgr: if not self._stream_mgr.readonly: self._stream_mgr.stop_all() self._stream_mgr = None # Then clean up UI self._timeline.reset() self._monitor.reset() self._waveform_engine.reset() self._waveform_widget.set_peaks(None, 0.05) self._transcriber.reset() self._agent.clear_history() self._transcript_order.clear() self._transcript_rows.clear() self._transcript_texts.clear() self._selected_transcripts.clear() while child := self._transcript_list.get_first_child(): self._transcript_list.remove(child) self._known_frames = set() self._selected_frame = None self._frame_widgets = {} self._frame_order = [] # Clear frame strip while child := self._frames_strip.get_first_child(): self._frames_strip.remove(child) self._connect_btn.set_label("Connect") self._connect_btn.remove_css_class("destructive-action") self._connect_btn.add_css_class("suggested-action") self._streaming = False self.set_title(APP_NAME) # Reload last session in review mode if reload_session and last_session_id: GLib.idle_add(self._load_session, last_session_id) def _on_close(self, *args): self.teardown() def teardown(self): """Full cleanup for app exit — safe to call multiple times.""" if self._stream_mgr or self._streaming: self._stop_stream() # Terminate mpv players and GL contexts (only on app exit) self._monitor.stop() # -- Right panels -- def _build_right_panels(self): right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) # Top row: player + waveform placeholder top_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) top_paned.set_shrink_start_child(False) top_paned.set_shrink_end_child(False) self._monitor = MonitorWidget(self._timeline) self._monitor.set_hexpand(True) stream_frame = Gtk.Frame() stream_frame.set_child(self._monitor) top_paned.set_start_child(stream_frame) self._waveform_widget = WaveformWidget(self._timeline) waveform_frame = Gtk.Frame() waveform_frame.set_child(self._waveform_widget) top_paned.set_end_child(waveform_frame) top_paned.set_position(650) right_box.append(top_paned) # Shared timeline slider (spans under player + waveform) self._timeline_controls = TimelineControls(self._timeline) self._timeline_controls.set_live_toggle_callback(self._on_live_toggle) right_box.append(self._timeline_controls) # Frames extracted self._frames_panel = self._build_frames_panel() right_box.append(self._frames_panel) # Transcript self._transcript_panel = self._build_transcript_panel() right_box.append(self._transcript_panel) # Agent input self._agent_input = self._build_agent_input() right_box.append(self._agent_input) return right_box def _build_placeholder(self, title, height=200, width=-1): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) label = Gtk.Label(label=title) label.add_css_class("heading") label.set_margin_top(4) label.set_margin_bottom(4) box.append(label) area = Gtk.DrawingArea() area.set_content_height(height) if width > 0: area.set_content_width(width) area.set_vexpand(False) area.set_hexpand(True) box.append(area) frame = Gtk.Frame() frame.set_child(box) return frame def _on_capture_clicked(self, button): if self._stream_mgr: self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames) def _on_scene_threshold_changed(self, scale): val = scale.get_value() self._scene_label.set_label(f"Frames (scene: {val:.2f})") if self._stream_mgr: self._stream_mgr.scene_threshold = val def _build_frames_panel(self): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) header.set_margin_top(4) header.set_margin_bottom(4) header.set_margin_start(8) header.set_margin_end(8) self._scene_label = Gtk.Label(label=f"Frames (scene: {SCENE_THRESHOLD:.2f})") self._scene_label.add_css_class("heading") header.append(self._scene_label) scale = Gtk.Scale.new_with_range(Gtk.Orientation.HORIZONTAL, 0.01, 0.50, 0.01) scale.set_value(SCENE_THRESHOLD) scale.set_hexpand(True) scale.set_draw_value(False) scale.connect("value-changed", self._on_scene_threshold_changed) header.append(scale) capture_btn = Gtk.Button(label="Capture") capture_btn.add_css_class("flat") capture_btn.connect("clicked", self._on_capture_clicked) header.append(capture_btn) box.append(header) # Horizontal scrolling strip — storyboard style self._frames_scroll = Gtk.ScrolledWindow() self._frames_scroll.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.NEVER) self._frames_scroll.set_min_content_height(168) self._frames_scroll.set_size_request(-1, 168) # 144px thumb + label + padding self._frames_strip = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) self._frames_strip.set_margin_start(4) self._frames_strip.set_margin_end(4) self._frames_strip.set_margin_top(4) self._frames_strip.set_margin_bottom(4) self._frames_scroll.set_child(self._frames_strip) box.append(self._frames_scroll) frame = Gtk.Frame() frame.set_child(box) return frame def _build_transcript_panel(self): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) label = Gtk.Label(label="Transcript") label.add_css_class("heading") label.set_margin_top(4) label.set_margin_bottom(4) box.append(label) self._transcript_list = Gtk.ListBox() self._transcript_list.set_selection_mode(Gtk.SelectionMode.NONE) self._transcript_scroll = Gtk.ScrolledWindow() self._transcript_scroll.set_vexpand(True) self._transcript_scroll.set_min_content_height(150) self._transcript_scroll.set_child(self._transcript_list) box.append(self._transcript_scroll) frame = Gtk.Frame() frame.set_child(box) return frame # -- Agent panels -- def _build_agent_output(self): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) header.set_margin_start(8) header.set_margin_end(8) header.set_margin_top(8) header.set_margin_bottom(8) label = Gtk.Label(label="Agent Output") label.add_css_class("heading") label.set_hexpand(True) label.set_halign(Gtk.Align.START) header.append(label) clear_btn = Gtk.Button(label="Clear") clear_btn.add_css_class("flat") clear_btn.connect("clicked", self._on_clear_agent_output) header.append(clear_btn) box.append(header) self._agent_output_view = Gtk.TextView() self._agent_output_view.set_editable(False) self._agent_output_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) self._agent_output_view.set_cursor_visible(False) self._agent_output_view.set_left_margin(8) self._agent_output_view.set_right_margin(8) self._setup_md_tags(self._agent_output_view.get_buffer()) scroll = Gtk.ScrolledWindow() scroll.set_vexpand(True) scroll.set_child(self._agent_output_view) box.append(scroll) frame = Gtk.Frame() frame.set_child(box) return frame def _build_agent_input(self): outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) outer.set_margin_start(4) outer.set_margin_end(4) outer.set_margin_top(4) outer.set_margin_bottom(4) # Quick action buttons + model selector actions_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) for label, verb in ACTIONS.items(): btn = Gtk.Button(label=label) btn.add_css_class("flat") btn.connect("clicked", lambda b, v=verb: self._send_action(v)) actions_box.append(btn) # Model dropdown (right-aligned) spacer = Gtk.Box() spacer.set_hexpand(True) actions_box.append(spacer) model_label = Gtk.Label(label="Model:") model_label.add_css_class("dim-label") actions_box.append(model_label) self._model_dropdown = Gtk.DropDown.new_from_strings([]) self._model_dropdown.set_size_request(200, -1) self._model_dropdown.connect("notify::selected", self._on_model_changed) actions_box.append(self._model_dropdown) lang_label = Gtk.Label(label="Lang:") lang_label.add_css_class("dim-label") actions_box.append(lang_label) lang_names = list(LANGUAGES.keys()) self._lang_dropdown = Gtk.DropDown.new_from_strings(lang_names) self._lang_dropdown.set_selected(0) self._lang_dropdown.connect("notify::selected", self._on_lang_changed) actions_box.append(self._lang_dropdown) self._history_toggle = Gtk.CheckButton(label="Chat") self._history_toggle.set_tooltip_text("Include conversation history in prompts") self._history_toggle.connect("toggled", lambda b: setattr(self._agent, "include_history", b.get_active())) actions_box.append(self._history_toggle) outer.append(actions_box) # Text entry + send input_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) self._input_entry = Gtk.Entry() self._input_entry.set_hexpand(True) self._input_entry.set_placeholder_text("Message agent... (use @F0001 to reference a frame)") self._input_entry.connect("activate", lambda e: self._send_message()) input_row.append(self._input_entry) send_btn = Gtk.Button(label="Send") send_btn.add_css_class("suggested-action") send_btn.connect("clicked", lambda b: self._send_message()) input_row.append(send_btn) outer.append(input_row) frame = Gtk.Frame() frame.set_child(outer) return frame def _scroll_to_frame(self, widget): """Scroll the frames strip so that widget is visible.""" adj = self._frames_scroll.get_hadjustment() alloc = widget.get_allocation() x = alloc.x w = alloc.width if w <= 0: return False # not allocated yet page = adj.get_page_size() val = adj.get_value() if x < val: adj.set_value(x) elif x + w > val + page: adj.set_value(x + w - page) return False def _clear_frame_selection(self): if self._selected_frame and self._selected_frame in self._frame_widgets: self._frame_widgets[self._selected_frame].remove_css_class("frame-selected") self._selected_frame = None def _clear_transcript_selection(self): for old_id in self._selected_transcripts: if old_id in self._transcript_rows: self._transcript_rows[old_id].remove_css_class("frame-selected") self._selected_transcripts.clear() def _build_selection_message(self, verb: str) -> str | None: """Build a message from verb + selected frame ref + transcript texts.""" parts = [verb] if self._selected_frame: parts.append(f"@{self._selected_frame}") if self._selected_transcripts: texts = [self._transcript_texts[tid] for tid in self._selected_transcripts if tid in self._transcript_texts] if texts: parts.append(" ".join(texts)) return " ".join(parts) if len(parts) > 1 else None def _send_action(self, verb: str): """Send a predefined action with selected frame/transcript.""" msg = self._build_selection_message(verb) if not msg: self._append_agent_output("Select a frame or transcript first.\n") return self._send_message(msg) def _select_frame(self, frame_id: str): """Select a frame thumbnail (or deselect if already selected).""" self._clear_transcript_selection() # Deselect previous if self._selected_frame and self._selected_frame in self._frame_widgets: self._frame_widgets[self._selected_frame].remove_css_class("frame-selected") if self._selected_frame == frame_id: self._selected_frame = None return self._selected_frame = frame_id if frame_id in self._frame_widgets: widget = self._frame_widgets[frame_id] widget.add_css_class("frame-selected") # Scroll after layout settles (idle may fire before allocation) GLib.timeout_add(50, self._scroll_to_frame, widget) def _select_transcript(self, seg_id, extend=False): """Select a transcript segment. If extend=True, add to selection.""" self._clear_frame_selection() if not extend: # Clear previous selection for old_id in self._selected_transcripts: if old_id in self._transcript_rows: self._transcript_rows[old_id].remove_css_class("frame-selected") self._selected_transcripts.clear() if seg_id in self._selected_transcripts: # Deselect if clicking same one (only in non-extend mode) if not extend: return self._selected_transcripts.remove(seg_id) if seg_id in self._transcript_rows: self._transcript_rows[seg_id].remove_css_class("frame-selected") return self._selected_transcripts.append(seg_id) if seg_id in self._transcript_rows: row = self._transcript_rows[seg_id] row.add_css_class("frame-selected") # Scroll row into view GLib.timeout_add(50, self._scroll_transcript_to_row, row) def _scroll_transcript_to_row(self, row): adj = self._transcript_scroll.get_vadjustment() alloc = row.get_allocation() y = alloc.y h = alloc.height if h <= 0: return False page = adj.get_page_size() val = adj.get_value() if y < val: adj.set_value(y) elif y + h > val + page: adj.set_value(y + h - page) return False def _select_adjacent_transcript(self, delta, extend=False): """Select next/prev transcript segment. Shift extends selection.""" if not self._transcript_order: return if not self._selected_transcripts: idx = 0 if delta > 0 else len(self._transcript_order) - 1 else: last = self._selected_transcripts[-1] try: cur = self._transcript_order.index(last) except ValueError: cur = 0 idx = cur + delta if idx < 0 or idx >= len(self._transcript_order): return self._select_transcript(self._transcript_order[idx], extend=extend) def _on_key_pressed(self, controller, keyval, keycode, state): """Keyboard shortcuts: Left/Right=frames, Up/Down=transcript, Enter=answer.""" # Don't intercept when text entry is focused focus = self.get_focus() if isinstance(focus, (Gtk.Entry, Gtk.TextView)): return False shift = bool(state & Gdk.ModifierType.SHIFT_MASK) if keyval == Gdk.KEY_Left: self._clear_transcript_selection() self._select_adjacent_frame(-1) return True elif keyval == Gdk.KEY_Right: self._clear_transcript_selection() self._select_adjacent_frame(1) return True elif keyval == Gdk.KEY_Up: self._clear_frame_selection() self._select_adjacent_transcript(-1, extend=shift) return True elif keyval == Gdk.KEY_Down: self._clear_frame_selection() self._select_adjacent_transcript(1, extend=shift) return True elif keyval in (Gdk.KEY_Return, Gdk.KEY_KP_Enter): msg = self._build_selection_message("answer") if msg: self._send_message(msg) return True elif keyval == Gdk.KEY_Delete: self._on_clear_agent_output(None) return True return False def _select_adjacent_frame(self, delta): """Select the next (+1) or previous (-1) frame. Stops at ends.""" if not self._frame_order: return if self._selected_frame is None: idx = 0 if delta > 0 else len(self._frame_order) - 1 else: try: cur = self._frame_order.index(self._selected_frame) except ValueError: cur = 0 idx = cur + delta if idx < 0 or idx >= len(self._frame_order): return # at the edge, do nothing self._select_frame(self._frame_order[idx]) def _send_message(self, text: str | None = None): if text is None: text = self._input_entry.get_text().strip() self._input_entry.set_text("") if not text: return if not self._stream_mgr: self._append_agent_output("No active session.\n") return self._append_agent_output(f"\n> {text}\n…\n") self._agent.send( message=text, stream_mgr=self._stream_mgr, tracker=self._tracker, on_chunk=lambda chunk: GLib.idle_add(self._replace_thinking, chunk), on_done=lambda err: GLib.idle_add(self._on_agent_done, err), ) self._thinking_replaced = False self._response_start_mark = None self._response_accum = [] def _replace_thinking(self, chunk: str): """Replace the '…' placeholder with the first real chunk.""" buf = self._agent_output_view.get_buffer() if not self._thinking_replaced: self._thinking_replaced = True end = buf.get_end_iter() start = end.copy() start.backward_chars(2) # remove '…\n' buf.delete(start, end) # Mark where the response starts for later MD formatting self._response_start_mark = buf.create_mark( None, buf.get_end_iter(), left_gravity=True ) self._response_accum.append(chunk) self._append_agent_output(chunk) def _on_agent_done(self, err: str | None): if err: self._append_agent_output(f"[Error: {err}]\n") return # Re-render accumulated response with markdown formatting if self._response_start_mark and self._response_accum: buf = self._agent_output_view.get_buffer() start = buf.get_iter_at_mark(self._response_start_mark) end = buf.get_end_iter() buf.delete(start, end) self._render_md(buf, start, "".join(self._response_accum)) buf.delete_mark(self._response_start_mark) self._append_agent_output("\n") def _setup_md_tags(self, buf): buf.create_tag("h1", weight=700, scale=1.4) buf.create_tag("h2", weight=700, scale=1.2) buf.create_tag("h3", weight=700, scale=1.05) buf.create_tag("bold", weight=700) buf.create_tag("italic", style=2) # Pango.Style.ITALIC = 2 buf.create_tag("code", family="monospace", background="#2a2a2a", foreground="#e8e8e8") buf.create_tag("codeblock", family="monospace", background="#1e1e1e", foreground="#e8e8e8", left_margin=16, right_margin=16, pixels_above_lines=4, pixels_below_lines=4) buf.create_tag("bullet", left_margin=16) def _render_md(self, buf, insert_iter, text: str): """Insert markdown-formatted text into buf at insert_iter using text tags.""" import re lines = text.split("\n") i = 0 while i < len(lines): line = lines[i] # Fenced code block if line.startswith("```"): lang = line[3:].strip() block_lines = [] i += 1 while i < len(lines) and not lines[i].startswith("```"): block_lines.append(lines[i]) i += 1 code = "\n".join(block_lines) + "\n" block_mark = buf.create_mark(None, insert_iter, True) self._insert_highlighted_code(buf, insert_iter, code, lang) buf.apply_tag_by_name("codeblock", buf.get_iter_at_mark(block_mark), insert_iter) buf.delete_mark(block_mark) buf.insert(insert_iter, "\n") i += 1 continue # Headers header_match = re.match(r"^(#{1,3})\s+(.*)", line) if header_match: level = len(header_match.group(1)) content = header_match.group(2) tag = f"h{level}" mark = buf.create_mark(None, insert_iter, True) buf.insert(insert_iter, content + "\n") buf.apply_tag_by_name(tag, buf.get_iter_at_mark(mark), insert_iter) i += 1 continue # Bullet points bullet_match = re.match(r"^[\-\*]\s+(.*)", line) if bullet_match: self._insert_inline(buf, insert_iter, "• " + bullet_match.group(1), "bullet") buf.insert(insert_iter, "\n") i += 1 continue # Inline formatting pass on normal lines self._insert_inline_line(buf, insert_iter, line) buf.insert(insert_iter, "\n") i += 1 def _insert_inline(self, buf, it, text: str, outer_tag: str | None = None): mark = buf.create_mark(None, it, True) self._insert_inline_line(buf, it, text) if outer_tag: buf.apply_tag_by_name(outer_tag, buf.get_iter_at_mark(mark), it) def _insert_inline_line(self, buf, it, text: str): """Insert text with inline bold/italic/code formatting.""" import re pattern = re.compile(r"(\*\*(.+?)\*\*|__(.+?)__|" r"\*(.+?)\*|_(.+?)_|" r"`([^`]+?)`)") pos = 0 for m in pattern.finditer(text): # Plain text before match if m.start() > pos: buf.insert(it, text[pos:m.start()]) full = m.group(0) if full.startswith("**") or full.startswith("__"): inner = m.group(2) or m.group(3) mark = buf.create_mark(None, it, True) buf.insert(it, inner) buf.apply_tag_by_name("bold", buf.get_iter_at_mark(mark), it) elif full.startswith("*") or full.startswith("_"): inner = m.group(4) or m.group(5) mark = buf.create_mark(None, it, True) buf.insert(it, inner) buf.apply_tag_by_name("italic", buf.get_iter_at_mark(mark), it) elif full.startswith("`"): inner = m.group(6) mark = buf.create_mark(None, it, True) buf.insert(it, inner) buf.apply_tag_by_name("code", buf.get_iter_at_mark(mark), it) pos = m.end() if pos < len(text): buf.insert(it, text[pos:]) def _insert_highlighted_code(self, buf, it, code: str, lang: str): """Insert syntax-highlighted code using Pygments token tags.""" try: from pygments.lexers import get_lexer_by_name, guess_lexer from pygments.styles import get_style_by_name except ImportError: buf.insert(it, code) return try: lexer = get_lexer_by_name(lang, stripall=False) if lang else guess_lexer(code) except Exception: try: lexer = guess_lexer(code) except Exception: buf.insert(it, code) return try: style = get_style_by_name("monokai") except Exception: buf.insert(it, code) return tag_table = buf.get_tag_table() for ttype, value in lexer.get_tokens(code): tag_name = f"pyg_{ttype}" tag = tag_table.lookup(tag_name) if tag is None: sf = style.style_for_token(ttype) tag = buf.create_tag(tag_name, family="monospace") if sf.get("color"): tag.set_property("foreground", f"#{sf['color']}") if sf.get("bold"): tag.set_property("weight", 700) if sf.get("italic"): tag.set_property("style", 2) mark = buf.create_mark(None, it, True) buf.insert(it, value) buf.apply_tag(tag, buf.get_iter_at_mark(mark), it) buf.delete_mark(mark) def _on_clear_agent_output(self, _button): self._agent_output_view.get_buffer().set_text("") def _on_lang_changed(self, dropdown, _pspec): idx = dropdown.get_selected() lang_names = list(LANGUAGES.keys()) if idx < len(lang_names): lang_code = LANGUAGES[lang_names[idx]] self._transcriber.language = lang_code log.info("Transcript language: %s (%s)", lang_names[idx], lang_code or "auto") def _on_model_changed(self, dropdown, _pspec): idx = dropdown.get_selected() model = self._agent.available_models[idx] if idx < len(self._agent.available_models) else None if model: self._agent.model = model log.info("Model switched to %s", model) def _populate_model_dropdown(self): models = self._agent.available_models if not models: return string_list = Gtk.StringList.new(models) self._model_dropdown.set_model(string_list) # Select current model current = self._agent.model for i, m in enumerate(models): if m == current: self._model_dropdown.set_selected(i) break def _check_agent_auth(self): import os if os.environ.get("GROQ_API_KEY") or os.environ.get("OPENAI_API_KEY"): self._populate_model_dropdown() return err = check_claude_cli() if err: self._append_agent_output(f"⚠ {err}\n") else: self._append_agent_output(f"Agent ready ({self._agent.provider_name})\n") self._populate_model_dropdown() def _append_agent_output(self, text: str): buf = self._agent_output_view.get_buffer() buf.insert(buf.get_end_iter(), text) # Auto-scroll to bottom self._agent_output_view.scroll_to_iter(buf.get_end_iter(), 0, False, 0, 0) def _append_transcript_segments(self, segments): """Append transcription segments to the transcript ListBox.""" for seg in segments: m1, s1 = divmod(int(seg.start), 60) m2, s2 = divmod(int(seg.end), 60) text = f"{seg.id} [{m1:02d}:{s1:02d}-{m2:02d}:{s2:02d}] {seg.text}" row_label = Gtk.Label(label=text) row_label.set_xalign(0) row_label.set_wrap(True) row_label.set_margin_start(8) row_label.set_margin_end(8) row_label.set_margin_top(2) row_label.set_margin_bottom(2) row = Gtk.ListBoxRow() row.set_child(row_label) gesture = Gtk.GestureClick() gesture.connect("released", lambda g, n, x, y, sid=seg.id: self._select_transcript(sid)) row.add_controller(gesture) self._transcript_list.append(row) self._transcript_rows[seg.id] = row self._transcript_texts[seg.id] = seg.text self._transcript_order.append(seg.id) # Auto-scroll to bottom adj = self._transcript_scroll.get_vadjustment() GLib.idle_add(lambda: adj.set_value(adj.get_upper()) or False) # -- Frame thumbnails -- def _load_existing_frames(self): """Load all frames from an existing session's index into the strip.""" if not self._stream_mgr: return index_path = self._stream_mgr.frames_dir / "index.json" if not index_path.exists(): self._append_agent_output(" No frames found.\n") return try: index = json.loads(index_path.read_text()) except (json.JSONDecodeError, IOError): return # Clear existing strip while child := self._frames_strip.get_first_child(): self._frames_strip.remove(child) self._known_frames.clear() self._frame_widgets.clear() self._frame_order.clear() self._selected_frame = None loaded = 0 for entry in index: fid = entry["id"] fpath = Path(entry["path"]) # Resolve path: try absolute first, then relative to frames_dir if not fpath.exists(): fpath = self._stream_mgr.frames_dir / fpath.name if not fpath.exists(): continue self._known_frames.add(fid) timestamp = entry.get("timestamp", 0) try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(fpath), 256, 144, True) self._add_frame_thumbnail(fid, pixbuf, timestamp, auto_select=False) loaded += 1 except Exception as e: log.warning("Thumbnail load failed for %s: %s", fid, e) # Select the last frame after bulk load if self._frame_order: self._select_frame(self._frame_order[-1]) self._append_agent_output(f" Loaded {loaded} frame thumbnails.\n") def _poll_frames(self): if not self._stream_mgr: return False index_path = self._stream_mgr.frames_dir / "index.json" if not index_path.exists(): return True try: index = json.loads(index_path.read_text()) except (json.JSONDecodeError, IOError): return True for entry in index: fid = entry["id"] if fid in self._known_frames: continue fpath = Path(entry["path"]) if not fpath.exists(): continue self._known_frames.add(fid) timestamp = entry.get("timestamp", 0) try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(fpath), 256, 144, True) self._add_frame_thumbnail(fid, pixbuf, timestamp) except Exception as e: log.warning("Thumbnail load failed for %s: %s", fid, e) return True def _add_frame_thumbnail(self, frame_id, pixbuf, timestamp, auto_select=True): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) box.set_size_request(256, -1) texture = Gdk.Texture.new_for_pixbuf(pixbuf) pic = Gtk.Picture.new_for_paintable(texture) pic.set_content_fit(Gtk.ContentFit.CONTAIN) pic.set_size_request(256, 144) pic.set_vexpand(False) box.append(pic) m, s = divmod(int(timestamp), 60) label = Gtk.Label(label=f"{frame_id} [{m:02d}:{s:02d}]") label.add_css_class("caption") label.set_ellipsize(Pango.EllipsizeMode.END) box.append(label) gesture = Gtk.GestureClick() gesture.connect("released", lambda g, n, x, y, fid=frame_id: self._select_frame(fid)) box.add_controller(gesture) self._frame_widgets[frame_id] = box self._frame_order.append(frame_id) self._frames_strip.append(box) if auto_select: self._select_frame(frame_id) log.info("Thumbnail: %s at %.1fs", frame_id, timestamp)