"""ScrubBar: tall segmented block bar for frame-accurate scrubbing. Replaces the thin timeline slider with a horizontal row of blocks, one per recording segment, proportional in width to duration. Click a block to activate it (trigger proxy generation). Drag within a block to scrub frame-by-frame at mouse speed. """ import logging from pathlib import Path import gi gi.require_version("Gtk", "4.0") gi.require_version("GdkPixbuf", "2.0") from gi.repository import Gtk, Gdk, GLib, GObject, GdkPixbuf import cairo log = logging.getLogger(__name__) BAR_HEIGHT = 50 BAR_COLOR = (0.20, 0.20, 0.25) CURSOR_COLOR = (0.9, 0.2, 0.2) MARKER_COLOR = (0.9, 0.8, 0.2) class ScrubBar(Gtk.DrawingArea): """Segmented block bar for scrubbing through recording segments.""" __gsignals__ = { "segment-activated": (GObject.SignalFlags.RUN_FIRST, None, (int,)), "scrub-position": (GObject.SignalFlags.RUN_FIRST, None, (float,)), } def __init__(self, **kwargs): super().__init__(**kwargs) self.set_content_height(BAR_HEIGHT) self.set_hexpand(True) self._manifest = [] # list of {path, index, duration, global_offset} self._total_duration = 0.0 self._cursor = 0.0 # global cursor position self._active_index = -1 # currently active segment index self._hover_index = -1 # segment under mouse self._proxy_states = {} # segment_index → "generating" | "ready" self._scene_markers = [] # global timestamps self._scrubbing = False self._frame_thumbs = [] # list of {timestamp, surface} — cairo surfaces self.set_draw_func(self._draw) # Mouse events click = Gtk.GestureClick() click.connect("pressed", self._on_pressed) click.connect("released", self._on_released) self.add_controller(click) motion = Gtk.EventControllerMotion() motion.connect("motion", self._on_motion) motion.connect("leave", self._on_leave) self.add_controller(motion) drag = Gtk.GestureDrag() drag.connect("drag-begin", self._on_drag_begin) drag.connect("drag-update", self._on_drag_update) drag.connect("drag-end", self._on_drag_end) self.add_controller(drag) # -- Public API -- def set_manifest(self, manifest: list[dict]) -> None: """Update the segment manifest. Triggers redraw.""" self._manifest = manifest self._total_duration = sum(s["duration"] for s in manifest) self.queue_draw() def set_duration(self, duration: float) -> None: """Update total duration (from Timeline, overrides manifest sum if larger).""" if duration > self._total_duration: self._total_duration = duration self.queue_draw() def set_cursor(self, global_time: float) -> None: """Update the cursor position (from Timeline).""" self._cursor = global_time self.queue_draw() def set_scene_markers(self, markers: list[float]) -> None: """Set scene change marker positions.""" self._scene_markers = markers self.queue_draw() def set_active_segment(self, index: int) -> None: """Set which segment is active (loaded for scrubbing).""" self._active_index = index self.queue_draw() def set_proxy_state(self, segment_index: int, state: str) -> None: """Update proxy state for a segment ('generating', 'ready').""" self._proxy_states[segment_index] = state self.queue_draw() def set_frames(self, frames: list[dict]) -> None: """Set frame thumbnails. Each dict: {timestamp, path}. Loads thumbnails scaled to fit the bar height and caches as cairo surfaces. """ self._frame_thumbs = [] thumb_h = BAR_HEIGHT - 4 # 2px margin top/bottom thumb_w = int(thumb_h * 16 / 9) # assume 16:9 aspect for f in frames: path = f.get("path") ts = f.get("timestamp", 0) if not path or not Path(path).exists(): continue try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale( str(path), thumb_w, thumb_h, True ) surface = self._pixbuf_to_surface(pixbuf) self._frame_thumbs.append({ "timestamp": ts, "surface": surface, "width": pixbuf.get_width(), "height": pixbuf.get_height(), }) except Exception as e: log.debug("Thumb load failed for %s: %s", path, e) self.queue_draw() def add_frame(self, timestamp: float, path: str) -> None: """Add a single frame thumbnail incrementally.""" if not Path(path).exists(): return thumb_h = BAR_HEIGHT - 4 thumb_w = int(thumb_h * 16 / 9) try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(path, thumb_w, thumb_h, True) surface = self._pixbuf_to_surface(pixbuf) self._frame_thumbs.append({ "timestamp": timestamp, "surface": surface, "width": pixbuf.get_width(), "height": pixbuf.get_height(), }) self.queue_draw() except Exception as e: log.debug("Thumb load failed for %s: %s", path, e) @staticmethod def _pixbuf_to_surface(pixbuf): """Convert a GdkPixbuf to a cairo ImageSurface.""" w, h = pixbuf.get_width(), pixbuf.get_height() has_alpha = pixbuf.get_has_alpha() fmt = cairo.FORMAT_ARGB32 if has_alpha else cairo.FORMAT_RGB24 surface = cairo.ImageSurface(fmt, w, h) cr = cairo.Context(surface) Gdk.cairo_set_source_pixbuf(cr, pixbuf, 0, 0) cr.paint() surface.flush() return surface # -- Drawing -- def _draw(self, area, cr, width, height): # Solid background — always full width cr.set_source_rgb(*BAR_COLOR) cr.rectangle(0, 0, width, height) cr.fill() if self._total_duration <= 0: return # Frame thumbnails at their timestamp positions for thumb in self._frame_thumbs: tx = self._global_to_x(thumb["timestamp"], width) tw, th = thumb["width"], thumb["height"] x0 = tx - tw / 2 y0 = (height - th) / 2 cr.save() cr.set_source_rgba(0, 0, 0, 0.6) cr.set_line_width(1.5) cr.rectangle(x0 - 0.5, y0 - 0.5, tw + 1, th + 1) cr.stroke() cr.set_source_surface(thumb["surface"], x0, y0) cr.rectangle(x0, y0, tw, th) cr.fill() cr.restore() # Scene markers cr.set_source_rgb(*MARKER_COLOR) for ts in self._scene_markers: mx = self._global_to_x(ts, width) if 0 <= mx <= width: cr.rectangle(mx, 0, 1, 5) cr.fill() # Cursor cx = self._global_to_x(self._cursor, width) if 0 <= cx <= width: cr.set_source_rgb(*CURSOR_COLOR) cr.rectangle(cx - 1, 0, 2, height) cr.fill() # -- Geometry helpers -- def _segment_rect(self, seg, total_width): """Return (x, width) for a segment's region.""" if self._total_duration <= 0: return 0, 0 x = (seg["global_offset"] / self._total_duration) * total_width w = (seg["duration"] / self._total_duration) * total_width return max(0, x), max(1, w) def _global_to_x(self, global_time, total_width): """Map global time to pixel x position.""" if self._total_duration <= 0: return 0 return (global_time / self._total_duration) * total_width def _x_to_global(self, x, total_width): """Map pixel x to global time.""" if total_width <= 0 or self._total_duration <= 0: return 0.0 return (x / total_width) * self._total_duration def _segment_at_x(self, x, total_width): """Return the segment index at pixel x, or -1.""" for seg in self._manifest: sx, sw = self._segment_rect(seg, total_width) if sx <= x <= sx + sw: return seg["index"] return -1 # -- Mouse handlers -- def _on_pressed(self, gesture, n_press, x, y): width = self.get_width() gt = self._x_to_global(x, width) # Activate segment if different idx = self._segment_at_x(x, width) if idx >= 0 and idx != self._active_index: self._active_index = idx self.emit("segment-activated", idx) # Always seek to click position self.emit("scrub-position", gt) self.queue_draw() def _on_released(self, gesture, n_press, x, y): self._scrubbing = False def _on_motion(self, controller, x, y): if self._scrubbing: width = self.get_width() gt = self._x_to_global(x, width) gt = max(0, min(gt, self._total_duration)) # Switch segment if drag crosses boundary idx = self._segment_at_x(x, width) if idx >= 0 and idx != self._active_index: self._active_index = idx self.emit("segment-activated", idx) self.emit("scrub-position", gt) def _on_leave(self, controller): pass def _on_drag_begin(self, gesture, start_x, start_y): self._scrubbing = True def _on_drag_update(self, gesture, offset_x, offset_y): if self._scrubbing: ok, start_x, start_y = gesture.get_start_point() if ok: x = start_x + offset_x width = self.get_width() gt = self._x_to_global(x, width) gt = max(0, min(gt, self._total_duration)) idx = self._segment_at_x(x, width) if idx >= 0 and idx != self._active_index: self._active_index = idx self.emit("segment-activated", idx) self.emit("scrub-position", gt) def _on_drag_end(self, gesture, offset_x, offset_y): self._scrubbing = False