Files
mitus/cht/window.py
2026-04-03 05:03:30 -03:00

709 lines
26 KiB
Python

"""Main application window — wires Timeline to all components."""
import json
import logging
from pathlib import Path
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
gi.require_version("GdkPixbuf", "2.0")
from gi.repository import Gtk, Gdk, Adw, GLib, Pango, GdkPixbuf
from threading import Thread
from cht.config import APP_NAME, SCENE_THRESHOLD, TRANSCRIBE_MIN_CHUNK_S
from cht.ui.timeline import Timeline, TimelineControls
from cht.ui.monitor import MonitorWidget
from cht.ui.waveform import WaveformWidget
from cht.ui.frames_panel import FramesPanel
from cht.ui.transcript_panel import TranscriptPanel
from cht.ui.keyboard import KeyboardManager, KEY_LEFT, KEY_RIGHT, KEY_UP, KEY_DOWN, KEY_RETURN, KEY_KP_ENTER, KEY_ESCAPE, KEY_DELETE
from cht.ui.agent_output import AgentOutputPanel
from cht.audio.waveform import WaveformEngine
from cht.transcriber.engine import TranscriberEngine, LANGUAGES
from cht.stream.manager import StreamManager, list_sessions, delete_sessions
from cht.stream.tracker import RecordingTracker
from cht.agent.runner import AgentRunner, ACTIONS, check_claude_cli
log = logging.getLogger(__name__)
class ChtWindow(Adw.ApplicationWindow):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_title(APP_NAME)
self.set_default_size(1400, 900)
self._streaming = False
self._gone_live = False
self._stream_mgr = None
self._tracker = None
self._known_frames = set()
# Core components
self._timeline = Timeline()
self._agent = AgentRunner()
self._waveform_engine = WaveformEngine()
self._transcriber = TranscriberEngine()
self._pending_transcript_audio = []
self._pending_transcript_duration = 0.0
# Panels (own their selection state)
self._frames_panel = FramesPanel()
self._transcript_panel = TranscriptPanel()
# Main layout
self._main_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL)
self._main_paned.set_shrink_start_child(False)
self._main_paned.set_shrink_end_child(False)
self._main_paned.set_position(450)
self._agent_output = AgentOutputPanel()
self._main_paned.set_start_child(self._agent_output)
right_box = self._build_right_panels()
self._main_paned.set_end_child(right_box)
# Header
toolbar = Adw.ToolbarView()
header = Adw.HeaderBar()
header.set_title_widget(Gtk.Label(label=APP_NAME))
self._connect_btn = Gtk.Button(label="Connect")
self._connect_btn.add_css_class("suggested-action")
self._connect_btn.connect("clicked", self._on_connect_clicked)
header.pack_start(self._connect_btn)
self._load_btn = Gtk.Button(label="Load Session")
self._load_btn.connect("clicked", self._on_load_session_clicked)
header.pack_start(self._load_btn)
toolbar.add_top_bar(header)
toolbar.set_content(self._main_paned)
self.set_content(toolbar)
self.connect("close-request", self._on_close)
# Keyboard shortcuts
self._setup_keyboard()
# Wire panel signals
self._frames_panel.connect("capture-requested", lambda p: self._on_capture_clicked())
self._frames_panel.connect("threshold-changed", lambda p, v: self._on_scene_threshold(v))
# Cross-panel exclusion: selecting frame clears transcript and vice versa
self._frames_panel.connect("selection-changed", self._on_frame_selection_changed)
self._transcript_panel.connect("selection-changed", self._on_transcript_selection_changed)
self._transcript_panel.connect("min-chunk-changed", self._on_min_chunk_changed)
self._transcript_panel.connect("lines-per-group-changed", self._on_lines_per_group_changed)
log.info("Window initialized")
GLib.idle_add(self._check_agent_auth)
# -- Cross-panel selection exclusion --
def _on_frame_selection_changed(self, panel):
if panel.selected is not None:
self._transcript_panel.clear_selection()
def _on_transcript_selection_changed(self, panel):
if panel.has_selection:
self._frames_panel.clear_selection()
# -- Connect / Disconnect --
def _on_connect_clicked(self, button):
if self._streaming:
self._stop_stream(reload_session=True)
else:
session_id = self._stream_mgr.session_id if self._stream_mgr else None
if self._stream_mgr:
self._stop_stream()
self._start_stream(session_id=session_id)
def _on_capture_clicked(self):
if self._stream_mgr:
self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames)
def _on_scene_threshold(self, val):
if self._stream_mgr:
self._stream_mgr.scene_threshold = val
def _on_min_chunk_changed(self, panel, val):
import cht.config
cht.config.TRANSCRIBE_MIN_CHUNK_S = val
def _on_lines_per_group_changed(self, panel, val):
import cht.config
cht.config.TRANSCRIBE_LINES_PER_GROUP = val
# -- Session loading --
def _on_load_session_clicked(self, button):
sessions = list_sessions()
if not sessions:
self._agent_output.append("No previous sessions found.\n")
return
dialog = Adw.Window(transient_for=self, modal=True)
dialog.set_title("Load Session")
dialog.set_default_size(500, 400)
toolbar = Adw.ToolbarView()
header = Adw.HeaderBar()
select_all_btn = Gtk.CheckButton(label="All")
header.pack_start(select_all_btn)
delete_btn = Gtk.Button(label="Delete")
delete_btn.add_css_class("destructive-action")
header.pack_end(delete_btn)
toolbar.add_top_bar(header)
scroll = Gtk.ScrolledWindow()
scroll.set_vexpand(True)
listbox = Gtk.ListBox()
listbox.set_selection_mode(Gtk.SelectionMode.NONE)
listbox.add_css_class("boxed-list")
checks: list[tuple[str, Gtk.CheckButton]] = []
for sid, sdir in sessions:
idx = sdir / "frames" / "index.json"
nframes = 0
try:
nframes = len(json.loads(idx.read_text()))
except Exception:
pass
nrec = len(list((sdir / "stream").glob("recording_*.mp4")))
check = Gtk.CheckButton()
checks.append((sid, check))
row = Adw.ActionRow()
row.set_title(sid)
row.set_subtitle(f"{nframes} frames, {nrec} segments")
row.set_activatable(True)
row.add_prefix(check)
def _on_row_activated(r, s=sid, d=dialog):
d.close()
self._load_session(s)
row.connect("activated", _on_row_activated)
listbox.append(row)
def _on_select_all(btn):
active = btn.get_active()
for _, cb in checks:
cb.set_active(active)
select_all_btn.connect("toggled", _on_select_all)
def _on_delete(btn):
to_delete = [sid for sid, cb in checks if cb.get_active()]
if not to_delete:
return
current = self._stream_mgr.session_id if self._stream_mgr else None
if current in to_delete:
to_delete.remove(current)
if to_delete:
delete_sessions(to_delete)
dialog.close()
self._on_load_session_clicked(None)
delete_btn.connect("clicked", _on_delete)
scroll.set_child(listbox)
toolbar.set_content(scroll)
dialog.set_content(toolbar)
dialog.present()
def _load_session(self, session_id):
"""Load an existing session for review (no streaming)."""
if self._streaming or self._stream_mgr:
self._stop_stream()
try:
self._stream_mgr = StreamManager.from_existing(session_id)
except FileNotFoundError as e:
self._agent_output.append(f"Error: {e}\n")
return
self.set_title(f"{APP_NAME}{session_id}")
self._agent_output.append(f"Loaded session: {session_id}\n")
segments = self._stream_mgr.recording_segments
if segments:
self._monitor.set_recording(segments[0])
duration = self._stream_mgr.total_duration()
if duration > 0:
self._timeline.set_duration(duration)
self._timeline.seek(0)
self._agent_output.append(
f" Recording: {len(segments)} segment(s), "
f"{int(duration)}s duration\n"
)
else:
self._agent_output.append(" No recordings found (frames only).\n")
self._load_existing_frames()
self._load_existing_transcript()
# Waveform from recording (background)
if segments:
from cht.stream import ffmpeg as ff
def _compute_waveform():
audio_dir = self._stream_mgr.audio_dir
audio_dir.mkdir(parents=True, exist_ok=True)
full_wav = audio_dir / "full.wav"
try:
ff.extract_audio_chunk(segments[0], full_wav)
self._waveform_engine.compute_full(full_wav)
peaks = self._waveform_engine.peaks
bucket_dur = self._waveform_engine.bucket_duration
GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur)
except Exception as e:
log.error("Waveform computation failed: %s", e)
Thread(target=_compute_waveform, daemon=True, name="waveform_load").start()
self._populate_model_dropdown()
# -- Streaming --
def _start_stream(self, session_id=None):
log.info("Starting stream...")
self._connect_btn.set_label("Disconnect")
self._connect_btn.remove_css_class("suggested-action")
self._connect_btn.add_css_class("destructive-action")
self._streaming = True
self._gone_live = False
self._stream_mgr = StreamManager(session_id=session_id)
self._stream_mgr.setup_dirs()
self._stream_mgr.start_recorder()
self._monitor.set_recording(self._stream_mgr.recording_path)
self._monitor.set_live_source(self._stream_mgr.relay_url)
self._tracker = RecordingTracker(
get_segments=lambda: self._stream_mgr.recording_segments if self._stream_mgr else [],
on_duration_update=self._on_duration_update,
)
self._tracker.start()
self._stream_mgr.start_scene_detector(on_new_frames=self._on_new_scene_frames)
self._stream_mgr.start_audio_extractor(on_new_audio=self._on_new_audio)
GLib.timeout_add(1000, self._poll_frames)
GLib.timeout_add(1000, self._tick_live)
GLib.timeout_add(2000, self._check_recorder)
# Reload existing data if resuming
if session_id:
self._load_existing_frames()
self._load_existing_transcript()
self.set_title(f"{APP_NAME}{self._stream_mgr.session_id}")
log.info("Waiting for sender...")
def _go_live_once(self):
if self._stream_mgr:
log.info("Going LIVE (startup delay elapsed)")
self._timeline.go_live()
return False
def _tick_live(self):
if not self._streaming:
return False
self._timeline.tick_live()
return True
def _on_duration_update(self, duration):
GLib.idle_add(self._timeline.set_duration, duration)
if not self._gone_live:
self._gone_live = True
GLib.idle_add(self._go_live_once)
if self._stream_mgr:
self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames)
def _on_new_scene_frames(self, frames):
for f in frames:
GLib.idle_add(self._timeline.add_scene_marker, f["timestamp"])
def _on_new_audio(self, wav_path, start_time, duration):
if not self._stream_mgr:
return
self._waveform_engine.append_chunk(wav_path, start_time)
peaks = self._waveform_engine.peaks
bucket_dur = self._waveform_engine.bucket_duration
GLib.idle_add(self._waveform_widget.set_peaks, peaks.copy(), bucket_dur)
self._pending_transcript_audio.append((wav_path, start_time, duration))
self._pending_transcript_duration += duration
if self._pending_transcript_duration < TRANSCRIBE_MIN_CHUNK_S:
return
first_start = self._pending_transcript_audio[0][1]
total_dur = self._pending_transcript_duration
self._pending_transcript_audio.clear()
self._pending_transcript_duration = 0.0
mgr = self._stream_mgr
chunk_wav = mgr.audio_dir / f"transcript_{int(first_start):06d}.wav"
def _transcribe():
from cht.stream import ffmpeg as ff
try:
ff.extract_audio_chunk(
mgr.recording_path, chunk_wav,
start_time=first_start, duration=total_dur,
)
except Exception as e:
log.error("Transcript audio extraction failed: %s", e)
return
if not chunk_wav.exists():
return
new_segs = self._transcriber.transcribe_chunk(chunk_wav, time_offset=first_start)
self._transcriber.save_index(mgr.transcript_dir / "index.json")
if new_segs:
GLib.idle_add(self._transcript_panel.add_items, new_segs)
Thread(target=_transcribe, daemon=True, name="transcriber").start()
def _check_recorder(self):
if not self._streaming or not self._stream_mgr:
return False
if not self._stream_mgr.recorder_alive():
log.warning("Recorder died — restarting into new segment")
self._stream_mgr.restart_recorder()
self._monitor.set_recording(self._stream_mgr.recording_path)
return True
def _on_live_toggle(self):
pos = self._monitor.get_live_position()
self._timeline.toggle_live(live_player_pos=pos)
def _stop_stream(self, reload_session=False):
log.info("Stopping stream...")
last_session_id = self._stream_mgr.session_id if self._stream_mgr and not self._stream_mgr.readonly else None
if self._tracker:
self._tracker.stop()
self._tracker = None
if self._stream_mgr:
if not self._stream_mgr.readonly:
self._stream_mgr.stop_all()
self._stream_mgr = None
self._timeline.reset()
self._monitor.reset()
self._waveform_engine.reset()
self._waveform_widget.set_peaks(None, 0.05)
self._transcriber.reset()
self._agent.clear_history()
self._pending_transcript_audio.clear()
self._pending_transcript_duration = 0.0
self._known_frames = set()
self._frames_panel.clear()
self._transcript_panel.clear()
self._connect_btn.set_label("Connect")
self._connect_btn.remove_css_class("destructive-action")
self._connect_btn.add_css_class("suggested-action")
self._streaming = False
self.set_title(APP_NAME)
if reload_session and last_session_id:
GLib.idle_add(self._load_session, last_session_id)
def _on_close(self, *args):
self.teardown()
def teardown(self):
"""Full cleanup for app exit — safe to call multiple times."""
if self._stream_mgr or self._streaming:
self._stop_stream()
self._monitor.stop()
# -- Layout --
def _build_right_panels(self):
right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2)
# Video + waveform
top_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL)
top_paned.set_shrink_start_child(False)
top_paned.set_shrink_end_child(False)
self._monitor = MonitorWidget(self._timeline)
self._monitor.set_hexpand(True)
stream_frame = Gtk.Frame()
stream_frame.set_child(self._monitor)
top_paned.set_start_child(stream_frame)
self._waveform_widget = WaveformWidget(self._timeline)
waveform_frame = Gtk.Frame()
waveform_frame.set_child(self._waveform_widget)
top_paned.set_end_child(waveform_frame)
top_paned.set_position(650)
right_box.append(top_paned)
# Timeline slider
self._timeline_controls = TimelineControls(self._timeline)
self._timeline_controls.set_live_toggle_callback(self._on_live_toggle)
right_box.append(self._timeline_controls)
# Frames
frames_frame = Gtk.Frame()
frames_frame.set_child(self._frames_panel)
right_box.append(frames_frame)
# Transcript
transcript_frame = Gtk.Frame()
transcript_frame.set_child(self._transcript_panel)
right_box.append(transcript_frame)
# Agent input
self._agent_input = self._build_agent_input()
right_box.append(self._agent_input)
return right_box
def _build_agent_input(self):
outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4)
outer.set_margin_start(4)
outer.set_margin_end(4)
outer.set_margin_top(4)
outer.set_margin_bottom(4)
actions_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4)
for label, verb in ACTIONS.items():
btn = Gtk.Button(label=label)
btn.add_css_class("flat")
btn.connect("clicked", lambda b, v=verb: self._send_action(v))
actions_box.append(btn)
spacer = Gtk.Box()
spacer.set_hexpand(True)
actions_box.append(spacer)
model_label = Gtk.Label(label="Model:")
model_label.add_css_class("dim-label")
actions_box.append(model_label)
self._model_dropdown = Gtk.DropDown.new_from_strings([])
self._model_dropdown.set_size_request(200, -1)
self._model_dropdown.connect("notify::selected", self._on_model_changed)
actions_box.append(self._model_dropdown)
lang_label = Gtk.Label(label="Lang:")
lang_label.add_css_class("dim-label")
actions_box.append(lang_label)
lang_names = list(LANGUAGES.keys())
self._lang_dropdown = Gtk.DropDown.new_from_strings(lang_names)
self._lang_dropdown.set_selected(0)
self._lang_dropdown.connect("notify::selected", self._on_lang_changed)
actions_box.append(self._lang_dropdown)
self._history_toggle = Gtk.CheckButton(label="Chat")
self._history_toggle.set_tooltip_text("Include conversation history in prompts")
self._history_toggle.connect("toggled", lambda b: setattr(self._agent, "include_history", b.get_active()))
actions_box.append(self._history_toggle)
outer.append(actions_box)
input_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4)
self._input_entry = Gtk.Entry()
self._input_entry.set_hexpand(True)
self._input_entry.set_placeholder_text("Message agent... (@F1-3 frames, @T1-5 transcript)")
self._input_entry.connect("activate", lambda e: self._send_message())
input_row.append(self._input_entry)
send_btn = Gtk.Button(label="Send")
send_btn.add_css_class("suggested-action")
send_btn.connect("clicked", lambda b: self._send_message())
input_row.append(send_btn)
outer.append(input_row)
frame = Gtk.Frame()
frame.set_child(outer)
return frame
# -- Keyboard --
def _setup_keyboard(self):
kb = KeyboardManager()
def _entry_focused():
focus = self.get_focus()
if focus is None:
return False
w = focus
while w is not None:
if w is self._input_entry:
return True
w = w.get_parent()
return False
kb.set_passthrough(_entry_focused, except_keys={KEY_ESCAPE})
kb.bind(KEY_LEFT, lambda **_: self._frames_panel.select_adjacent(-1))
kb.bind(KEY_RIGHT, lambda **_: self._frames_panel.select_adjacent(1))
kb.bind(KEY_UP, lambda shift=False, **_: self._transcript_panel.select_adjacent(-1, extend=shift))
kb.bind(KEY_DOWN, lambda shift=False, **_: self._transcript_panel.select_adjacent(1, extend=shift))
kb.bind(KEY_RETURN, lambda **_: self._send_message(self._build_selection_message("answer")) if self._build_selection_message("answer") else None)
kb.bind(KEY_KP_ENTER, lambda **_: self._send_message(self._build_selection_message("answer")) if self._build_selection_message("answer") else None)
kb.bind(KEY_ESCAPE, lambda **_: (self.set_focus(None), self._frames_panel.clear_selection(), self._transcript_panel.clear_selection()))
kb.bind(KEY_DELETE, lambda **_: self._agent_output.clear())
kb.attach(self)
# -- Agent actions --
def _build_selection_message(self, verb: str) -> str | None:
parts = [verb]
if self._frames_panel.selected:
parts.append(f"@{self._frames_panel.selected}")
texts = self._transcript_panel.selected_texts
if texts:
parts.append(" ".join(texts))
return " ".join(parts) if len(parts) > 1 else None
def _send_action(self, verb: str):
msg = self._build_selection_message(verb)
if not msg:
self._agent_output.append("Select a frame or transcript first.\n")
return
self._send_message(msg)
def _send_message(self, text: str | None = None):
if text is None:
text = self._input_entry.get_text().strip()
self._input_entry.set_text("")
if not text:
text = self._build_selection_message("answer")
if not text:
return
if not self._stream_mgr:
self._agent_output.append("No active session.\n")
return
self._agent_output.append(f"\n> {text}\n\n")
self._agent_output.begin_response()
self._agent.send(
message=text,
stream_mgr=self._stream_mgr,
tracker=self._tracker,
on_chunk=lambda chunk: GLib.idle_add(self._agent_output.replace_thinking, chunk),
on_done=lambda err: GLib.idle_add(self._agent_output.finish_response, err),
)
# -- Settings callbacks --
def _on_lang_changed(self, dropdown, _pspec):
idx = dropdown.get_selected()
lang_names = list(LANGUAGES.keys())
if idx < len(lang_names):
lang_code = LANGUAGES[lang_names[idx]]
self._transcriber.language = lang_code
log.info("Transcript language: %s (%s)", lang_names[idx], lang_code or "auto")
def _on_model_changed(self, dropdown, _pspec):
idx = dropdown.get_selected()
model = self._agent.available_models[idx] if idx < len(self._agent.available_models) else None
if model:
self._agent.model = model
log.info("Model switched to %s", model)
def _populate_model_dropdown(self):
models = self._agent.available_models
if not models:
return
string_list = Gtk.StringList.new(models)
self._model_dropdown.set_model(string_list)
current = self._agent.model
for i, m in enumerate(models):
if m == current:
self._model_dropdown.set_selected(i)
break
def _check_agent_auth(self):
import os
if os.environ.get("GROQ_API_KEY") or os.environ.get("OPENAI_API_KEY"):
self._populate_model_dropdown()
return
err = check_claude_cli()
if err:
self._agent_output.append(f"{err}\n")
else:
self._agent_output.append(f"Agent ready ({self._agent.provider_name})\n")
self._populate_model_dropdown()
# -- Data loading --
def _load_existing_frames(self):
if not self._stream_mgr:
return
index_path = self._stream_mgr.frames_dir / "index.json"
if not index_path.exists():
self._agent_output.append(" No frames found.\n")
return
try:
index = json.loads(index_path.read_text())
except (json.JSONDecodeError, IOError):
return
items = []
for entry in index:
fpath = Path(entry["path"])
if not fpath.exists():
fpath = self._stream_mgr.frames_dir / fpath.name
if not fpath.exists():
continue
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(fpath), 256, 144, True)
items.append({"id": entry["id"], "pixbuf": pixbuf, "timestamp": entry.get("timestamp", 0)})
except Exception as e:
log.warning("Thumbnail load failed for %s: %s", entry["id"], e)
if items:
self._frames_panel.load_items(items)
self._known_frames = {item["id"] for item in items}
self._agent_output.append(f" Loaded {len(items)} frame thumbnails.\n")
def _load_existing_transcript(self):
if not self._stream_mgr:
return
transcript_index = self._stream_mgr.transcript_dir / "index.json"
if not transcript_index.exists():
return
self._transcriber.load_index(transcript_index)
segs = self._transcriber.all_segments()
if segs:
self._transcript_panel.add_items(segs)
self._agent_output.append(f" Loaded {len(segs)} transcript segments.\n")
def _poll_frames(self):
if not self._stream_mgr:
return False
index_path = self._stream_mgr.frames_dir / "index.json"
if not index_path.exists():
return True
try:
index = json.loads(index_path.read_text())
except (json.JSONDecodeError, IOError):
return True
for entry in index:
fid = entry["id"]
if fid in self._known_frames:
continue
fpath = Path(entry["path"])
if not fpath.exists():
continue
self._known_frames.add(fid)
timestamp = entry.get("timestamp", 0)
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(fpath), 256, 144, True)
auto = not self._transcript_panel.has_selection
self._frames_panel.add_item(fid, pixbuf, timestamp, auto_select=auto)
except Exception as e:
log.warning("Thumbnail load failed for %s: %s", fid, e)
return True