"""Agent output panel — single TextView, colored message regions, copy-pastable.""" import logging import gi gi.require_version("Gtk", "4.0") from gi.repository import Gtk, GLib, Pango from cht.ui import markdown from cht.agent.base import ( AssistantMessage, ImageBlock, TextBlock, Thread, ToolResult, ToolUse, TranscriptBlock, UserMessage, ) log = logging.getLogger(__name__) class AgentOutputPanel(Gtk.Frame): """Scrollable text view showing the full conversation, copy-pastable.""" def __init__(self, **kwargs): super().__init__(**kwargs) box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) # Header header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) header.set_margin_start(8) header.set_margin_end(8) header.set_margin_top(8) header.set_margin_bottom(8) label = Gtk.Label(label="Agent Output") label.add_css_class("heading") label.set_hexpand(True) label.set_halign(Gtk.Align.START) header.append(label) # Spinner for "thinking" feedback self._spinner = Gtk.Spinner() self._spinner.set_visible(False) header.append(self._spinner) clear_btn = Gtk.Button(label="Clear") clear_btn.add_css_class("flat") clear_btn.connect("clicked", lambda _: self.clear()) header.append(clear_btn) box.append(header) # Single text view for the whole conversation self._view = Gtk.TextView() self._view.set_editable(False) self._view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) self._view.set_cursor_visible(False) self._view.set_left_margin(8) self._view.set_right_margin(8) self._view.set_top_margin(4) self._view.set_bottom_margin(4) self._setup_tags() scroll = Gtk.ScrolledWindow() scroll.set_vexpand(True) scroll.set_child(self._view) box.append(scroll) self.set_child(box) # Streaming state self._response_marks: dict[str, Gtk.TextMark] = {} self._response_accum: dict[str, list[str]] = {} def _setup_tags(self): buf = self._view.get_buffer() markdown.setup_tags(buf) # Message region backgrounds — user indented right, assistant left buf.create_tag("user-bg", paragraph_background="#1a2a3a", left_margin=120, right_margin=12, pixels_above_lines=6, pixels_below_lines=6) buf.create_tag("assistant-bg", paragraph_background="#1a2a1a", left_margin=12, right_margin=120, pixels_above_lines=4, pixels_below_lines=4) # Inline styles buf.create_tag("user-prefix", weight=700, foreground="#7aafff") buf.create_tag("user-text", foreground="#c8ddf0") buf.create_tag("assistant-prefix", weight=700, foreground="#8bc78b") buf.create_tag("tool-prefix", weight=700, foreground="#d4a053") buf.create_tag("tool-output", foreground="#aaaaaa", left_margin=24) buf.create_tag("error", foreground="#ff6b6b") buf.create_tag("ref-chip", foreground="#5a9fd4", style=Pango.Style.ITALIC) buf.create_tag("status", foreground="#888888") buf.create_tag("thinking", foreground="#666666", style=Pango.Style.ITALIC) # -- helpers -- def _insert_tagged(self, text: str, *tags: str): """Insert text at end and apply tags.""" buf = self._view.get_buffer() mark = buf.create_mark(None, buf.get_end_iter(), True) buf.insert(buf.get_end_iter(), text) start = buf.get_iter_at_mark(mark) end = buf.get_end_iter() for tag in tags: buf.apply_tag_by_name(tag, start, end) buf.delete_mark(mark) # -- Public API -- def append(self, text: str) -> None: """Append a status/info line.""" self._insert_tagged(text, "status") self._scroll_to_bottom() def clear(self) -> None: self._view.get_buffer().set_text("") self._response_marks.clear() self._response_accum.clear() self._spinner.set_visible(False) self._spinner.stop() def add_user_message(self, text: str, frames: list | None = None, transcripts: list | None = None) -> None: buf = self._view.get_buffer() # Region start mark region_mark = buf.create_mark(None, buf.get_end_iter(), True) # Prefix self._insert_tagged("\n> ", "user-prefix") # Text self._insert_tagged(text, "user-text") # Reference chips refs = [] if frames: for f in frames: fid = f.frame_id if hasattr(f, "frame_id") else (f.id if hasattr(f, "id") else str(f)) refs.append(fid) if transcripts: for t in transcripts: tid = t.transcript_id if hasattr(t, "transcript_id") else (t.id if hasattr(t, "id") else str(t)) refs.append(tid) if refs: self._insert_tagged(" [" + ", ".join(refs) + "]", "ref-chip") buf.insert(buf.get_end_iter(), "\n") # Apply background to entire user region buf.apply_tag_by_name("user-bg", buf.get_iter_at_mark(region_mark), buf.get_end_iter()) buf.delete_mark(region_mark) self._scroll_to_bottom() def begin_assistant_message(self, msg_id: str) -> None: buf = self._view.get_buffer() end = buf.get_end_iter() # Mark where this response starts (for markdown re-render on finish) self._response_marks[msg_id] = buf.create_mark(f"resp_{msg_id}", end, True) self._response_accum[msg_id] = [] # Show inline thinking indicator + header spinner self._insert_tagged("thinking...\n", "thinking") self._spinner.set_visible(True) self._spinner.start() self._scroll_to_bottom() def append_to_assistant(self, msg_id: str, text: str) -> None: if msg_id not in self._response_accum: return buf = self._view.get_buffer() # On first chunk, clear the "thinking..." placeholder if not self._response_accum[msg_id]: mark = self._response_marks.get(msg_id) if mark: start = buf.get_iter_at_mark(mark) buf.delete(start, buf.get_end_iter()) self._response_accum[msg_id].append(text) buf.insert(buf.get_end_iter(), text) self._scroll_to_bottom() def finish_assistant(self, msg_id: str, full_text: str) -> None: # Stop spinner self._spinner.set_visible(False) self._spinner.stop() mark = self._response_marks.pop(msg_id, None) self._response_accum.pop(msg_id, None) if not mark: return buf = self._view.get_buffer() start = buf.get_iter_at_mark(mark) end = buf.get_end_iter() buf.delete(start, end) # Re-render with markdown region_start = buf.create_mark(None, buf.get_iter_at_mark(mark), True) it = buf.get_iter_at_mark(mark) markdown.render(buf, it, full_text) buf.insert(buf.get_end_iter(), "\n") # Apply assistant background to rendered region buf.apply_tag_by_name("assistant-bg", buf.get_iter_at_mark(region_start), buf.get_end_iter()) buf.delete_mark(region_start) buf.delete_mark(mark) def add_tool_call(self, tool_use: ToolUse) -> None: self._insert_tagged(f" ▶ {tool_use.tool_name}", "tool-prefix") if tool_use.input: inp = str(tool_use.input) if len(inp) > 80: inp = inp[:77] + "..." self._insert_tagged(f" {inp}", "tool-output") self._view.get_buffer().insert(self._view.get_buffer().get_end_iter(), "\n") self._scroll_to_bottom() def update_tool_result(self, tool_use_id: str, result: ToolResult) -> None: text = result.error or result.output or "" if not text: return indented = "\n".join(f" {line}" for line in text.split("\n")) tag = "error" if result.error else "tool-output" self._insert_tagged(indented + "\n", tag) self._scroll_to_bottom() def load_thread(self, thread: Thread) -> None: """Replay a thread to rebuild the conversation view.""" self.clear() for msg in thread.messages: if isinstance(msg, UserMessage): text_parts = [] frames = [] transcripts = [] for b in msg.content: if isinstance(b, TextBlock): text_parts.append(b.text) elif isinstance(b, ImageBlock): frames.append(b) elif isinstance(b, TranscriptBlock): transcripts.append(b) self.add_user_message(" ".join(text_parts), frames, transcripts) elif isinstance(msg, AssistantMessage): text = " ".join(b.text for b in msg.content if isinstance(b, TextBlock)) buf = self._view.get_buffer() region_mark = buf.create_mark(None, buf.get_end_iter(), True) it = buf.get_end_iter() markdown.render(buf, it, text) buf.insert(buf.get_end_iter(), "\n") buf.apply_tag_by_name("assistant-bg", buf.get_iter_at_mark(region_mark), buf.get_end_iter()) buf.delete_mark(region_mark) elif isinstance(msg, ToolUse): self.add_tool_call(msg) elif isinstance(msg, ToolResult): self.update_tool_result(msg.tool_use_id, msg) def _scroll_to_bottom(self): def _do(): adj = self._view.get_parent().get_vadjustment() adj.set_value(adj.get_upper() - adj.get_page_size()) return False GLib.idle_add(_do)