Files
mitus/cht/window.py
2026-04-02 21:08:17 -03:00

713 lines
26 KiB
Python

"""Main application window — wires Timeline to all components."""
import json
import logging
from pathlib import Path
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
gi.require_version("GdkPixbuf", "2.0")
from gi.repository import Gtk, Gdk, Adw, GLib, Pango, GdkPixbuf
from cht.config import APP_NAME, SCENE_THRESHOLD
from cht.ui.timeline import Timeline, TimelineControls
from cht.ui.monitor import MonitorWidget
from cht.stream.manager import StreamManager
from cht.stream.tracker import RecordingTracker
from cht.agent.runner import AgentRunner, ACTIONS, check_claude_cli
log = logging.getLogger(__name__)
class ChtWindow(Adw.ApplicationWindow):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_title(APP_NAME)
self.set_default_size(1400, 900)
self._streaming = False
self._gone_live = False
self._stream_mgr = None
self._tracker = None
self._known_frames = set()
self._selected_frame = None # currently selected frame ID
self._frame_widgets = {} # frame_id → outer Box widget
# Timeline is the central state machine
self._timeline = Timeline()
self._agent = AgentRunner()
# Main layout
self._main_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL)
self._main_paned.set_shrink_start_child(False)
self._main_paned.set_shrink_end_child(False)
self._main_paned.set_position(450)
self._main_paned.set_start_child(self._build_agent_output())
right_box = self._build_right_panels()
self._main_paned.set_end_child(right_box)
# Header
toolbar = Adw.ToolbarView()
header = Adw.HeaderBar()
header.set_title_widget(Gtk.Label(label=APP_NAME))
self._connect_btn = Gtk.Button(label="Connect")
self._connect_btn.add_css_class("suggested-action")
self._connect_btn.connect("clicked", self._on_connect_clicked)
header.pack_start(self._connect_btn)
toolbar.add_top_bar(header)
toolbar.set_content(self._main_paned)
self.set_content(toolbar)
self.connect("close-request", self._on_close)
log.info("Window initialized")
GLib.idle_add(self._start_stream)
GLib.idle_add(self._check_agent_auth)
def _on_connect_clicked(self, button):
if self._streaming:
self._stop_stream()
else:
self._start_stream()
def _start_stream(self):
log.info("Starting stream...")
self._connect_btn.set_label("Disconnect")
self._connect_btn.remove_css_class("suggested-action")
self._connect_btn.add_css_class("destructive-action")
self._streaming = True
self._gone_live = False
# Create session
self._stream_mgr = StreamManager()
self._stream_mgr.setup_dirs()
# Start ffmpeg recorder (listens for sender, relays to UDP)
self._stream_mgr.start_recorder()
# Tell monitor where the recording will be and what URL to stream live from
self._monitor.set_recording(self._stream_mgr.recording_path)
self._monitor.set_live_source(self._stream_mgr.relay_url)
# Start tracking recording duration (across segments)
self._tracker = RecordingTracker(
get_segments=lambda: self._stream_mgr.recording_segments if self._stream_mgr else [],
on_duration_update=self._on_duration_update,
)
self._tracker.start()
# Start scene detection
self._stream_mgr.start_scene_detector(on_new_frames=self._on_new_scene_frames)
# Start polling for frame thumbnails
GLib.timeout_add(1000, self._poll_frames)
# Tick the LIVE cursor every second
GLib.timeout_add(1000, self._tick_live)
# Watchdog: restart recorder on crash/disconnect
GLib.timeout_add(2000, self._check_recorder)
log.info("Waiting for sender...")
def _go_live_once(self):
"""Called once after startup delay — go LIVE."""
if self._stream_mgr:
log.info("Going LIVE (startup delay elapsed)")
self._timeline.go_live()
return False # one-shot
def _tick_live(self):
"""Tick cursor in LIVE mode so timer advances smoothly."""
if not self._streaming:
return False
self._timeline.tick_live()
return True
def _on_duration_update(self, duration):
"""Called from RecordingTracker thread."""
GLib.idle_add(self._timeline.set_duration, duration)
if not self._gone_live:
self._gone_live = True
GLib.idle_add(self._go_live_once)
# Capture initial frame — scene detector only fires on changes
if self._stream_mgr:
self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames)
def _on_new_scene_frames(self, frames):
"""Called from scene detector thread when new frames are found."""
for f in frames:
GLib.idle_add(self._timeline.add_scene_marker, f["timestamp"])
def _check_recorder(self):
"""Watchdog: restart recorder if it died (sender disconnect, etc)."""
if not self._streaming or not self._stream_mgr:
return False # stop polling
if not self._stream_mgr.recorder_alive():
log.warning("Recorder died — restarting into new segment")
self._stream_mgr.restart_recorder()
# Re-point monitor to new recording segment
self._monitor.set_recording(self._stream_mgr.recording_path)
return True # keep polling
def _on_live_toggle(self):
"""LIVE button handler — passes the live player's current position."""
pos = self._monitor.get_live_position()
self._timeline.toggle_live(live_player_pos=pos)
def _stop_stream(self):
log.info("Stopping stream...")
self._timeline.reset()
self._monitor.stop()
if self._tracker:
self._tracker.stop()
self._tracker = None
if self._stream_mgr:
self._stream_mgr.stop_all()
self._stream_mgr = None
self._known_frames = set()
self._selected_frame = None
self._frame_widgets = {}
self._connect_btn.set_label("Connect")
self._connect_btn.remove_css_class("destructive-action")
self._connect_btn.add_css_class("suggested-action")
self._streaming = False
def _on_close(self, *args):
self._stop_stream()
# -- Right panels --
def _build_right_panels(self):
right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2)
# Top row: player + waveform placeholder
top_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL)
top_paned.set_shrink_start_child(False)
top_paned.set_shrink_end_child(False)
self._monitor = MonitorWidget(self._timeline)
self._monitor.set_hexpand(True)
stream_frame = Gtk.Frame()
stream_frame.set_child(self._monitor)
top_paned.set_start_child(stream_frame)
self._waveform_area = self._build_placeholder("Waveform", height=250, width=200)
top_paned.set_end_child(self._waveform_area)
top_paned.set_position(650)
right_box.append(top_paned)
# Shared timeline slider (spans under player + waveform)
self._timeline_controls = TimelineControls(self._timeline)
self._timeline_controls.set_live_toggle_callback(self._on_live_toggle)
right_box.append(self._timeline_controls)
# Frames extracted
self._frames_panel = self._build_frames_panel()
right_box.append(self._frames_panel)
# Transcript
self._transcript_panel = self._build_transcript_panel()
right_box.append(self._transcript_panel)
# Agent input
self._agent_input = self._build_agent_input()
right_box.append(self._agent_input)
return right_box
def _build_placeholder(self, title, height=200, width=-1):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
label = Gtk.Label(label=title)
label.add_css_class("heading")
label.set_margin_top(4)
label.set_margin_bottom(4)
box.append(label)
area = Gtk.DrawingArea()
area.set_content_height(height)
if width > 0:
area.set_content_width(width)
area.set_vexpand(False)
area.set_hexpand(True)
box.append(area)
frame = Gtk.Frame()
frame.set_child(box)
return frame
def _on_capture_clicked(self, button):
if self._stream_mgr:
self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames)
def _on_scene_threshold_changed(self, scale):
val = scale.get_value()
self._scene_label.set_label(f"Frames (scene: {val:.2f})")
if self._stream_mgr:
self._stream_mgr.scene_threshold = val
def _build_frames_panel(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
header.set_margin_top(4)
header.set_margin_bottom(4)
header.set_margin_start(8)
header.set_margin_end(8)
self._scene_label = Gtk.Label(label=f"Frames (scene: {SCENE_THRESHOLD:.2f})")
self._scene_label.add_css_class("heading")
header.append(self._scene_label)
scale = Gtk.Scale.new_with_range(Gtk.Orientation.HORIZONTAL, 0.01, 0.50, 0.01)
scale.set_value(SCENE_THRESHOLD)
scale.set_hexpand(True)
scale.set_draw_value(False)
scale.connect("value-changed", self._on_scene_threshold_changed)
header.append(scale)
capture_btn = Gtk.Button(label="Capture")
capture_btn.add_css_class("flat")
capture_btn.connect("clicked", self._on_capture_clicked)
header.append(capture_btn)
box.append(header)
# Horizontal scrolling strip — storyboard style
self._frames_scroll = Gtk.ScrolledWindow()
self._frames_scroll.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.NEVER)
self._frames_scroll.set_min_content_height(168)
self._frames_scroll.set_size_request(-1, 168) # 144px thumb + label + padding
self._frames_strip = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4)
self._frames_strip.set_margin_start(4)
self._frames_strip.set_margin_end(4)
self._frames_strip.set_margin_top(4)
self._frames_strip.set_margin_bottom(4)
self._frames_scroll.set_child(self._frames_strip)
box.append(self._frames_scroll)
frame = Gtk.Frame()
frame.set_child(box)
return frame
def _build_transcript_panel(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
label = Gtk.Label(label="Transcript")
label.add_css_class("heading")
label.set_margin_top(4)
label.set_margin_bottom(4)
box.append(label)
self._transcript_view = Gtk.TextView()
self._transcript_view.set_editable(False)
self._transcript_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
self._transcript_view.set_cursor_visible(False)
self._transcript_view.set_left_margin(8)
self._transcript_view.set_right_margin(8)
scroll = Gtk.ScrolledWindow()
scroll.set_vexpand(True)
scroll.set_min_content_height(150)
scroll.set_child(self._transcript_view)
box.append(scroll)
frame = Gtk.Frame()
frame.set_child(box)
return frame
# -- Agent panels --
def _build_agent_output(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
label = Gtk.Label(label="Agent Output")
label.add_css_class("heading")
label.set_margin_top(8)
label.set_margin_bottom(8)
box.append(label)
self._agent_output_view = Gtk.TextView()
self._agent_output_view.set_editable(False)
self._agent_output_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
self._agent_output_view.set_cursor_visible(False)
self._agent_output_view.set_left_margin(8)
self._agent_output_view.set_right_margin(8)
self._setup_md_tags(self._agent_output_view.get_buffer())
scroll = Gtk.ScrolledWindow()
scroll.set_vexpand(True)
scroll.set_child(self._agent_output_view)
box.append(scroll)
frame = Gtk.Frame()
frame.set_child(box)
return frame
def _build_agent_input(self):
outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4)
outer.set_margin_start(4)
outer.set_margin_end(4)
outer.set_margin_top(4)
outer.set_margin_bottom(4)
# Quick action buttons + model selector
actions_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4)
for label, verb in ACTIONS.items():
btn = Gtk.Button(label=label)
btn.add_css_class("flat")
btn.connect("clicked", lambda b, v=verb: self._send_action(v))
actions_box.append(btn)
# Model dropdown (right-aligned)
spacer = Gtk.Box()
spacer.set_hexpand(True)
actions_box.append(spacer)
model_label = Gtk.Label(label="Model:")
model_label.add_css_class("dim-label")
actions_box.append(model_label)
self._model_dropdown = Gtk.DropDown.new_from_strings([])
self._model_dropdown.set_size_request(200, -1)
self._model_dropdown.connect("notify::selected", self._on_model_changed)
actions_box.append(self._model_dropdown)
outer.append(actions_box)
# Text entry + send
input_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4)
self._input_entry = Gtk.Entry()
self._input_entry.set_hexpand(True)
self._input_entry.set_placeholder_text("Message agent... (use @F0001 to reference a frame)")
self._input_entry.connect("activate", lambda e: self._send_message())
input_row.append(self._input_entry)
send_btn = Gtk.Button(label="Send")
send_btn.add_css_class("suggested-action")
send_btn.connect("clicked", lambda b: self._send_message())
input_row.append(send_btn)
outer.append(input_row)
frame = Gtk.Frame()
frame.set_child(outer)
return frame
def _send_action(self, verb: str):
"""Send a predefined action with the selected frame."""
if not self._selected_frame:
self._append_agent_output("Select a frame first.\n")
return
self._send_message(f"{verb} @{self._selected_frame}")
def _select_frame(self, frame_id: str):
"""Toggle selection of a frame thumbnail."""
# Deselect previous
if self._selected_frame and self._selected_frame in self._frame_widgets:
self._frame_widgets[self._selected_frame].remove_css_class("frame-selected")
if self._selected_frame == frame_id:
self._selected_frame = None
return
self._selected_frame = frame_id
if frame_id in self._frame_widgets:
self._frame_widgets[frame_id].add_css_class("frame-selected")
def _send_message(self, text: str | None = None):
if text is None:
text = self._input_entry.get_text().strip()
self._input_entry.set_text("")
if not text:
return
if not self._stream_mgr:
self._append_agent_output("No active session.\n")
return
self._append_agent_output(f"\n> {text}\n\n")
self._agent.send(
message=text,
stream_mgr=self._stream_mgr,
tracker=self._tracker,
on_chunk=lambda chunk: GLib.idle_add(self._replace_thinking, chunk),
on_done=lambda err: GLib.idle_add(self._on_agent_done, err),
)
self._thinking_replaced = False
self._response_start_mark = None
self._response_accum = []
def _replace_thinking(self, chunk: str):
"""Replace the '' placeholder with the first real chunk."""
buf = self._agent_output_view.get_buffer()
if not self._thinking_replaced:
self._thinking_replaced = True
end = buf.get_end_iter()
start = end.copy()
start.backward_chars(2) # remove '…\n'
buf.delete(start, end)
# Mark where the response starts for later MD formatting
self._response_start_mark = buf.create_mark(
None, buf.get_end_iter(), left_gravity=True
)
self._response_accum.append(chunk)
self._append_agent_output(chunk)
def _on_agent_done(self, err: str | None):
if err:
self._append_agent_output(f"[Error: {err}]\n")
return
# Re-render accumulated response with markdown formatting
if self._response_start_mark and self._response_accum:
buf = self._agent_output_view.get_buffer()
start = buf.get_iter_at_mark(self._response_start_mark)
end = buf.get_end_iter()
buf.delete(start, end)
self._render_md(buf, start, "".join(self._response_accum))
buf.delete_mark(self._response_start_mark)
self._append_agent_output("\n")
def _setup_md_tags(self, buf):
buf.create_tag("h1", weight=700, scale=1.4)
buf.create_tag("h2", weight=700, scale=1.2)
buf.create_tag("h3", weight=700, scale=1.05)
buf.create_tag("bold", weight=700)
buf.create_tag("italic", style=2) # Pango.Style.ITALIC = 2
buf.create_tag("code", family="monospace", background="#2a2a2a", foreground="#e8e8e8")
buf.create_tag("codeblock", family="monospace", background="#1e1e1e",
foreground="#e8e8e8", left_margin=16, right_margin=16,
pixels_above_lines=4, pixels_below_lines=4)
buf.create_tag("bullet", left_margin=16)
def _render_md(self, buf, insert_iter, text: str):
"""Insert markdown-formatted text into buf at insert_iter using text tags."""
import re
lines = text.split("\n")
i = 0
while i < len(lines):
line = lines[i]
# Fenced code block
if line.startswith("```"):
lang = line[3:].strip()
block_lines = []
i += 1
while i < len(lines) and not lines[i].startswith("```"):
block_lines.append(lines[i])
i += 1
code = "\n".join(block_lines) + "\n"
block_mark = buf.create_mark(None, insert_iter, True)
self._insert_highlighted_code(buf, insert_iter, code, lang)
buf.apply_tag_by_name("codeblock", buf.get_iter_at_mark(block_mark), insert_iter)
buf.delete_mark(block_mark)
buf.insert(insert_iter, "\n")
i += 1
continue
# Headers
header_match = re.match(r"^(#{1,3})\s+(.*)", line)
if header_match:
level = len(header_match.group(1))
content = header_match.group(2)
tag = f"h{level}"
mark = buf.create_mark(None, insert_iter, True)
buf.insert(insert_iter, content + "\n")
buf.apply_tag_by_name(tag, buf.get_iter_at_mark(mark), insert_iter)
i += 1
continue
# Bullet points
bullet_match = re.match(r"^[\-\*]\s+(.*)", line)
if bullet_match:
self._insert_inline(buf, insert_iter, "" + bullet_match.group(1), "bullet")
buf.insert(insert_iter, "\n")
i += 1
continue
# Inline formatting pass on normal lines
self._insert_inline_line(buf, insert_iter, line)
buf.insert(insert_iter, "\n")
i += 1
def _insert_inline(self, buf, it, text: str, outer_tag: str | None = None):
mark = buf.create_mark(None, it, True)
self._insert_inline_line(buf, it, text)
if outer_tag:
buf.apply_tag_by_name(outer_tag, buf.get_iter_at_mark(mark), it)
def _insert_inline_line(self, buf, it, text: str):
"""Insert text with inline bold/italic/code formatting."""
import re
pattern = re.compile(r"(\*\*(.+?)\*\*|__(.+?)__|"
r"\*(.+?)\*|_(.+?)_|"
r"`([^`]+?)`)")
pos = 0
for m in pattern.finditer(text):
# Plain text before match
if m.start() > pos:
buf.insert(it, text[pos:m.start()])
full = m.group(0)
if full.startswith("**") or full.startswith("__"):
inner = m.group(2) or m.group(3)
mark = buf.create_mark(None, it, True)
buf.insert(it, inner)
buf.apply_tag_by_name("bold", buf.get_iter_at_mark(mark), it)
elif full.startswith("*") or full.startswith("_"):
inner = m.group(4) or m.group(5)
mark = buf.create_mark(None, it, True)
buf.insert(it, inner)
buf.apply_tag_by_name("italic", buf.get_iter_at_mark(mark), it)
elif full.startswith("`"):
inner = m.group(6)
mark = buf.create_mark(None, it, True)
buf.insert(it, inner)
buf.apply_tag_by_name("code", buf.get_iter_at_mark(mark), it)
pos = m.end()
if pos < len(text):
buf.insert(it, text[pos:])
def _insert_highlighted_code(self, buf, it, code: str, lang: str):
"""Insert syntax-highlighted code using Pygments token tags."""
try:
from pygments.lexers import get_lexer_by_name, guess_lexer
from pygments.styles import get_style_by_name
except ImportError:
buf.insert(it, code)
return
try:
lexer = get_lexer_by_name(lang, stripall=False) if lang else guess_lexer(code)
except Exception:
try:
lexer = guess_lexer(code)
except Exception:
buf.insert(it, code)
return
try:
style = get_style_by_name("monokai")
except Exception:
buf.insert(it, code)
return
tag_table = buf.get_tag_table()
for ttype, value in lexer.get_tokens(code):
tag_name = f"pyg_{ttype}"
tag = tag_table.lookup(tag_name)
if tag is None:
sf = style.style_for_token(ttype)
tag = buf.create_tag(tag_name, family="monospace")
if sf.get("color"):
tag.set_property("foreground", f"#{sf['color']}")
if sf.get("bold"):
tag.set_property("weight", 700)
if sf.get("italic"):
tag.set_property("style", 2)
mark = buf.create_mark(None, it, True)
buf.insert(it, value)
buf.apply_tag(tag, buf.get_iter_at_mark(mark), it)
buf.delete_mark(mark)
def _on_model_changed(self, dropdown, _pspec):
idx = dropdown.get_selected()
model = self._agent.available_models[idx] if idx < len(self._agent.available_models) else None
if model:
self._agent.model = model
log.info("Model switched to %s", model)
def _populate_model_dropdown(self):
models = self._agent.available_models
if not models:
return
string_list = Gtk.StringList.new(models)
self._model_dropdown.set_model(string_list)
# Select current model
current = self._agent.model
for i, m in enumerate(models):
if m == current:
self._model_dropdown.set_selected(i)
break
def _check_agent_auth(self):
import os
if os.environ.get("GROQ_API_KEY") or os.environ.get("OPENAI_API_KEY"):
self._populate_model_dropdown()
return
err = check_claude_cli()
if err:
self._append_agent_output(f"{err}\n")
else:
self._append_agent_output(f"Agent ready ({self._agent.provider_name})\n")
self._populate_model_dropdown()
def _append_agent_output(self, text: str):
buf = self._agent_output_view.get_buffer()
buf.insert(buf.get_end_iter(), text)
# Auto-scroll to bottom
self._agent_output_view.scroll_to_iter(buf.get_end_iter(), 0, False, 0, 0)
# -- Frame thumbnails --
def _poll_frames(self):
if not self._stream_mgr:
return False
index_path = self._stream_mgr.frames_dir / "index.json"
if not index_path.exists():
return True
try:
index = json.loads(index_path.read_text())
except (json.JSONDecodeError, IOError):
return True
for entry in index:
fid = entry["id"]
if fid in self._known_frames:
continue
fpath = Path(entry["path"])
if not fpath.exists():
continue
self._known_frames.add(fid)
timestamp = entry.get("timestamp", 0)
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(fpath), 256, 144, True)
self._add_frame_thumbnail(fid, pixbuf, timestamp)
except Exception as e:
log.warning("Thumbnail load failed for %s: %s", fid, e)
return True
def _add_frame_thumbnail(self, frame_id, pixbuf, timestamp):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2)
box.set_size_request(256, -1)
texture = Gdk.Texture.new_for_pixbuf(pixbuf)
pic = Gtk.Picture.new_for_paintable(texture)
pic.set_content_fit(Gtk.ContentFit.CONTAIN)
pic.set_size_request(256, 144)
pic.set_vexpand(False)
box.append(pic)
m, s = divmod(int(timestamp), 60)
label = Gtk.Label(label=f"{frame_id} [{m:02d}:{s:02d}]")
label.add_css_class("caption")
label.set_ellipsize(Pango.EllipsizeMode.END)
box.append(label)
gesture = Gtk.GestureClick()
gesture.connect("released", lambda g, n, x, y, fid=frame_id: self._select_frame(fid))
box.add_controller(gesture)
self._frame_widgets[frame_id] = box
self._frames_strip.append(box)
# Auto-scroll to show the latest frame
adj = self._frames_scroll.get_hadjustment()
GLib.idle_add(lambda: adj.set_value(adj.get_upper()) or False)
log.info("Thumbnail: %s at %.1fs", frame_id, timestamp)