100 lines
3.3 KiB
Python
100 lines
3.3 KiB
Python
"""Agent output panel — scrollable text view with markdown rendering."""
|
|
|
|
import gi
|
|
gi.require_version("Gtk", "4.0")
|
|
from gi.repository import Gtk
|
|
|
|
from cht.ui import markdown
|
|
|
|
|
|
class AgentOutputPanel(Gtk.Frame):
|
|
"""Scrollable text view that displays agent responses with markdown."""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
|
|
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
|
|
|
|
header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4)
|
|
header.set_margin_start(8)
|
|
header.set_margin_end(8)
|
|
header.set_margin_top(8)
|
|
header.set_margin_bottom(8)
|
|
label = Gtk.Label(label="Agent Output")
|
|
label.add_css_class("heading")
|
|
label.set_hexpand(True)
|
|
label.set_halign(Gtk.Align.START)
|
|
header.append(label)
|
|
|
|
clear_btn = Gtk.Button(label="Clear")
|
|
clear_btn.add_css_class("flat")
|
|
clear_btn.connect("clicked", lambda _: self.clear())
|
|
header.append(clear_btn)
|
|
|
|
box.append(header)
|
|
|
|
self._view = Gtk.TextView()
|
|
self._view.set_editable(False)
|
|
self._view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
|
|
self._view.set_cursor_visible(False)
|
|
self._view.set_left_margin(8)
|
|
self._view.set_right_margin(8)
|
|
markdown.setup_tags(self._view.get_buffer())
|
|
|
|
scroll = Gtk.ScrolledWindow()
|
|
scroll.set_vexpand(True)
|
|
scroll.set_child(self._view)
|
|
box.append(scroll)
|
|
|
|
self.set_child(box)
|
|
|
|
# Streaming state
|
|
self._thinking_replaced = False
|
|
self._response_start_mark = None
|
|
self._response_accum = []
|
|
|
|
def append(self, text: str) -> None:
|
|
"""Append plain text and auto-scroll."""
|
|
buf = self._view.get_buffer()
|
|
buf.insert(buf.get_end_iter(), text)
|
|
self._view.scroll_to_iter(buf.get_end_iter(), 0, False, 0, 0)
|
|
|
|
def clear(self) -> None:
|
|
"""Clear all output."""
|
|
self._view.get_buffer().set_text("")
|
|
|
|
def begin_response(self) -> None:
|
|
"""Reset streaming state for a new response (call before sending)."""
|
|
self._thinking_replaced = False
|
|
self._response_start_mark = None
|
|
self._response_accum = []
|
|
|
|
def replace_thinking(self, chunk: str) -> None:
|
|
"""Replace the '...' placeholder with streamed chunks."""
|
|
buf = self._view.get_buffer()
|
|
if not self._thinking_replaced:
|
|
self._thinking_replaced = True
|
|
end = buf.get_end_iter()
|
|
start = end.copy()
|
|
start.backward_chars(2)
|
|
buf.delete(start, end)
|
|
self._response_start_mark = buf.create_mark(
|
|
None, buf.get_end_iter(), left_gravity=True
|
|
)
|
|
self._response_accum.append(chunk)
|
|
self.append(chunk)
|
|
|
|
def finish_response(self, err: str | None) -> None:
|
|
"""Finalize the response — render markdown or show error."""
|
|
if err:
|
|
self.append(f"[Error: {err}]\n")
|
|
return
|
|
if self._response_start_mark and self._response_accum:
|
|
buf = self._view.get_buffer()
|
|
start = buf.get_iter_at_mark(self._response_start_mark)
|
|
end = buf.get_end_iter()
|
|
buf.delete(start, end)
|
|
markdown.render(buf, start, "".join(self._response_accum))
|
|
buf.delete_mark(self._response_start_mark)
|
|
self.append("\n")
|