Files
mitus/cht/ui/agent_output.py
2026-04-09 14:58:15 -03:00

278 lines
10 KiB
Python

"""Agent output panel — single TextView, colored message regions, copy-pastable."""
import logging
import gi
gi.require_version("Gtk", "4.0")
from gi.repository import Gtk, GLib, Pango
from cht.ui import markdown
from cht.agent.base import (
AssistantMessage,
ImageBlock,
TextBlock,
Thread,
ToolResult,
ToolUse,
TranscriptBlock,
UserMessage,
)
log = logging.getLogger(__name__)
class AgentOutputPanel(Gtk.Frame):
"""Scrollable text view showing the full conversation, copy-pastable."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
# Header
header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4)
header.set_margin_start(8)
header.set_margin_end(8)
header.set_margin_top(8)
header.set_margin_bottom(8)
label = Gtk.Label(label="Agent Output")
label.add_css_class("heading")
label.set_hexpand(True)
label.set_halign(Gtk.Align.START)
header.append(label)
# Spinner for "thinking" feedback
self._spinner = Gtk.Spinner()
self._spinner.set_visible(False)
header.append(self._spinner)
clear_btn = Gtk.Button(label="Clear")
clear_btn.add_css_class("flat")
clear_btn.connect("clicked", lambda _: self.clear())
header.append(clear_btn)
box.append(header)
# Single text view for the whole conversation
self._view = Gtk.TextView()
self._view.set_editable(False)
self._view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
self._view.set_cursor_visible(False)
self._view.set_left_margin(8)
self._view.set_right_margin(8)
self._view.set_top_margin(4)
self._view.set_bottom_margin(4)
self._setup_tags()
scroll = Gtk.ScrolledWindow()
scroll.set_vexpand(True)
scroll.set_child(self._view)
box.append(scroll)
self.set_child(box)
# Streaming state
self._response_marks: dict[str, Gtk.TextMark] = {}
self._response_accum: dict[str, list[str]] = {}
def _setup_tags(self):
buf = self._view.get_buffer()
markdown.setup_tags(buf)
# Message region backgrounds — user indented right, assistant left
buf.create_tag("user-bg",
paragraph_background="#1a2a3a",
left_margin=120, right_margin=12,
pixels_above_lines=6, pixels_below_lines=6)
buf.create_tag("assistant-bg",
paragraph_background="#1a2a1a",
left_margin=12, right_margin=120,
pixels_above_lines=4, pixels_below_lines=4)
# Inline styles
buf.create_tag("user-prefix", weight=700, foreground="#7aafff")
buf.create_tag("user-text", foreground="#c8ddf0")
buf.create_tag("assistant-prefix", weight=700, foreground="#8bc78b")
buf.create_tag("tool-prefix", weight=700, foreground="#d4a053")
buf.create_tag("tool-output", foreground="#aaaaaa", left_margin=24)
buf.create_tag("error", foreground="#ff6b6b")
buf.create_tag("ref-chip", foreground="#5a9fd4", style=Pango.Style.ITALIC)
buf.create_tag("status", foreground="#888888")
buf.create_tag("thinking", foreground="#666666", style=Pango.Style.ITALIC)
# -- helpers --
def _insert_tagged(self, text: str, *tags: str):
"""Insert text at end and apply tags."""
buf = self._view.get_buffer()
mark = buf.create_mark(None, buf.get_end_iter(), True)
buf.insert(buf.get_end_iter(), text)
start = buf.get_iter_at_mark(mark)
end = buf.get_end_iter()
for tag in tags:
buf.apply_tag_by_name(tag, start, end)
buf.delete_mark(mark)
# -- Public API --
def append(self, text: str) -> None:
"""Append a status/info line."""
self._insert_tagged(text, "status")
self._scroll_to_bottom()
def clear(self) -> None:
self._view.get_buffer().set_text("")
self._response_marks.clear()
self._response_accum.clear()
self._spinner.set_visible(False)
self._spinner.stop()
def add_user_message(self, text: str, frames: list | None = None,
transcripts: list | None = None) -> None:
buf = self._view.get_buffer()
# Region start mark
region_mark = buf.create_mark(None, buf.get_end_iter(), True)
# Prefix
self._insert_tagged("\n> ", "user-prefix")
# Text
self._insert_tagged(text, "user-text")
# Reference chips
refs = []
if frames:
for f in frames:
fid = f.frame_id if hasattr(f, "frame_id") else (f.id if hasattr(f, "id") else str(f))
refs.append(fid)
if transcripts:
for t in transcripts:
tid = t.transcript_id if hasattr(t, "transcript_id") else (t.id if hasattr(t, "id") else str(t))
refs.append(tid)
if refs:
self._insert_tagged(" [" + ", ".join(refs) + "]", "ref-chip")
buf.insert(buf.get_end_iter(), "\n")
# Apply background to entire user region
buf.apply_tag_by_name("user-bg",
buf.get_iter_at_mark(region_mark),
buf.get_end_iter())
buf.delete_mark(region_mark)
self._scroll_to_bottom()
def begin_assistant_message(self, msg_id: str) -> None:
buf = self._view.get_buffer()
end = buf.get_end_iter()
# Mark where this response starts (for markdown re-render on finish)
self._response_marks[msg_id] = buf.create_mark(f"resp_{msg_id}", end, True)
self._response_accum[msg_id] = []
# Show inline thinking indicator + header spinner
self._insert_tagged("thinking...\n", "thinking")
self._spinner.set_visible(True)
self._spinner.start()
self._scroll_to_bottom()
def append_to_assistant(self, msg_id: str, text: str) -> None:
if msg_id not in self._response_accum:
return
buf = self._view.get_buffer()
# On first chunk, clear the "thinking..." placeholder
if not self._response_accum[msg_id]:
mark = self._response_marks.get(msg_id)
if mark:
start = buf.get_iter_at_mark(mark)
buf.delete(start, buf.get_end_iter())
self._response_accum[msg_id].append(text)
buf.insert(buf.get_end_iter(), text)
self._scroll_to_bottom()
def finish_assistant(self, msg_id: str, full_text: str) -> None:
# Stop spinner
self._spinner.set_visible(False)
self._spinner.stop()
mark = self._response_marks.pop(msg_id, None)
self._response_accum.pop(msg_id, None)
if not mark:
return
buf = self._view.get_buffer()
start = buf.get_iter_at_mark(mark)
end = buf.get_end_iter()
buf.delete(start, end)
# Re-render with markdown
region_start = buf.create_mark(None, buf.get_iter_at_mark(mark), True)
it = buf.get_iter_at_mark(mark)
markdown.render(buf, it, full_text)
buf.insert(buf.get_end_iter(), "\n")
# Apply assistant background to rendered region
buf.apply_tag_by_name("assistant-bg",
buf.get_iter_at_mark(region_start),
buf.get_end_iter())
buf.delete_mark(region_start)
buf.delete_mark(mark)
def add_tool_call(self, tool_use: ToolUse) -> None:
self._insert_tagged(f"{tool_use.tool_name}", "tool-prefix")
if tool_use.input:
inp = str(tool_use.input)
if len(inp) > 80:
inp = inp[:77] + "..."
self._insert_tagged(f" {inp}", "tool-output")
self._view.get_buffer().insert(self._view.get_buffer().get_end_iter(), "\n")
self._scroll_to_bottom()
def update_tool_result(self, tool_use_id: str, result: ToolResult) -> None:
text = result.error or result.output or ""
if not text:
return
indented = "\n".join(f" {line}" for line in text.split("\n"))
tag = "error" if result.error else "tool-output"
self._insert_tagged(indented + "\n", tag)
self._scroll_to_bottom()
def load_thread(self, thread: Thread) -> None:
"""Replay a thread to rebuild the conversation view."""
self.clear()
for msg in thread.messages:
if isinstance(msg, UserMessage):
text_parts = []
frames = []
transcripts = []
for b in msg.content:
if isinstance(b, TextBlock):
text_parts.append(b.text)
elif isinstance(b, ImageBlock):
frames.append(b)
elif isinstance(b, TranscriptBlock):
transcripts.append(b)
self.add_user_message(" ".join(text_parts), frames, transcripts)
elif isinstance(msg, AssistantMessage):
text = " ".join(b.text for b in msg.content if isinstance(b, TextBlock))
buf = self._view.get_buffer()
region_mark = buf.create_mark(None, buf.get_end_iter(), True)
it = buf.get_end_iter()
markdown.render(buf, it, text)
buf.insert(buf.get_end_iter(), "\n")
buf.apply_tag_by_name("assistant-bg",
buf.get_iter_at_mark(region_mark),
buf.get_end_iter())
buf.delete_mark(region_mark)
elif isinstance(msg, ToolUse):
self.add_tool_call(msg)
elif isinstance(msg, ToolResult):
self.update_tool_result(msg.tool_use_id, msg)
def _scroll_to_bottom(self):
def _do():
adj = self._view.get_parent().get_vadjustment()
adj.set_value(adj.get_upper() - adj.get_page_size())
return False
GLib.idle_add(_do)