303 lines
11 KiB
Python
303 lines
11 KiB
Python
"""ScrubBar: tall segmented block bar for frame-accurate scrubbing.
|
|
|
|
Replaces the thin timeline slider with a horizontal row of blocks,
|
|
one per recording segment, proportional in width to duration.
|
|
|
|
Click a block to activate it (trigger proxy generation).
|
|
Drag within a block to scrub frame-by-frame at mouse speed.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from pathlib import Path
|
|
|
|
import gi
|
|
gi.require_version("Gtk", "4.0")
|
|
gi.require_version("GdkPixbuf", "2.0")
|
|
from gi.repository import Gtk, Gdk, GLib, GObject, GdkPixbuf
|
|
|
|
import cairo
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
BAR_HEIGHT = 50
|
|
BAR_COLOR = (0.20, 0.20, 0.25)
|
|
CURSOR_COLOR = (0.9, 0.2, 0.2)
|
|
MARKER_COLOR = (0.9, 0.8, 0.2)
|
|
|
|
|
|
class ScrubBar(Gtk.DrawingArea):
|
|
"""Segmented block bar for scrubbing through recording segments."""
|
|
|
|
__gsignals__ = {
|
|
"segment-activated": (GObject.SignalFlags.RUN_FIRST, None, (int,)),
|
|
"scrub-position": (GObject.SignalFlags.RUN_FIRST, None, (float,)),
|
|
}
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.set_content_height(BAR_HEIGHT)
|
|
self.set_hexpand(True)
|
|
|
|
self._manifest = [] # list of {path, index, duration, global_offset}
|
|
self._total_duration = 0.0
|
|
self._cursor = 0.0 # global cursor position
|
|
self._active_index = -1 # currently active segment index
|
|
self._hover_index = -1 # segment under mouse
|
|
self._proxy_states = {} # segment_index → "generating" | "ready"
|
|
self._scene_markers = [] # global timestamps
|
|
self._scrubbing = False
|
|
self._frame_thumbs = [] # list of {timestamp, surface} — cairo surfaces
|
|
|
|
self.set_draw_func(self._draw)
|
|
|
|
# Mouse events
|
|
click = Gtk.GestureClick()
|
|
click.connect("pressed", self._on_pressed)
|
|
click.connect("released", self._on_released)
|
|
self.add_controller(click)
|
|
|
|
motion = Gtk.EventControllerMotion()
|
|
motion.connect("motion", self._on_motion)
|
|
motion.connect("leave", self._on_leave)
|
|
self.add_controller(motion)
|
|
|
|
drag = Gtk.GestureDrag()
|
|
drag.connect("drag-begin", self._on_drag_begin)
|
|
drag.connect("drag-update", self._on_drag_update)
|
|
drag.connect("drag-end", self._on_drag_end)
|
|
self.add_controller(drag)
|
|
|
|
# -- Public API --
|
|
|
|
def set_manifest(self, manifest: list[dict]) -> None:
|
|
"""Update the segment manifest. Triggers redraw."""
|
|
self._manifest = manifest
|
|
self._total_duration = sum(s["duration"] for s in manifest)
|
|
self.queue_draw()
|
|
|
|
def set_duration(self, duration: float) -> None:
|
|
"""Update total duration (from Timeline, overrides manifest sum if larger)."""
|
|
if duration > self._total_duration:
|
|
self._total_duration = duration
|
|
self.queue_draw()
|
|
|
|
def set_cursor(self, global_time: float) -> None:
|
|
"""Update the cursor position (from Timeline)."""
|
|
self._cursor = global_time
|
|
self.queue_draw()
|
|
|
|
def set_scene_markers(self, markers: list[float]) -> None:
|
|
"""Set scene change marker positions."""
|
|
self._scene_markers = markers
|
|
self.queue_draw()
|
|
|
|
def set_active_segment(self, index: int) -> None:
|
|
"""Set which segment is active (loaded for scrubbing)."""
|
|
self._active_index = index
|
|
self.queue_draw()
|
|
|
|
def set_proxy_state(self, segment_index: int, state: str) -> None:
|
|
"""Update proxy state for a segment ('generating', 'ready')."""
|
|
self._proxy_states[segment_index] = state
|
|
self.queue_draw()
|
|
|
|
def set_frames(self, frames: list[dict]) -> None:
|
|
"""Set frame thumbnails. Each dict: {timestamp, path}.
|
|
|
|
Loads thumbnails scaled to fit the bar height and caches as cairo surfaces.
|
|
"""
|
|
self._frame_thumbs = []
|
|
thumb_h = BAR_HEIGHT - 4 # 2px margin top/bottom
|
|
thumb_w = int(thumb_h * 16 / 9) # assume 16:9 aspect
|
|
for f in frames:
|
|
path = f.get("path")
|
|
ts = f.get("timestamp", 0)
|
|
if not path or not Path(path).exists():
|
|
continue
|
|
try:
|
|
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
|
|
str(path), thumb_w, thumb_h, True
|
|
)
|
|
surface = self._pixbuf_to_surface(pixbuf)
|
|
self._frame_thumbs.append({
|
|
"timestamp": ts,
|
|
"surface": surface,
|
|
"width": pixbuf.get_width(),
|
|
"height": pixbuf.get_height(),
|
|
})
|
|
except Exception as e:
|
|
log.debug("Thumb load failed for %s: %s", path, e)
|
|
self.queue_draw()
|
|
|
|
def add_frame(self, timestamp: float, path: str) -> None:
|
|
"""Add a single frame thumbnail from file path."""
|
|
if not Path(path).exists():
|
|
return
|
|
thumb_h = BAR_HEIGHT - 4
|
|
thumb_w = int(thumb_h * 16 / 9)
|
|
try:
|
|
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(path, thumb_w, thumb_h, True)
|
|
self.add_frame_from_pixbuf(timestamp, pixbuf)
|
|
except Exception as e:
|
|
log.debug("Thumb load failed for %s: %s", path, e)
|
|
|
|
def add_frame_from_pixbuf(self, timestamp: float, pixbuf) -> None:
|
|
"""Add a single frame thumbnail from an already-loaded pixbuf (shared with frames panel)."""
|
|
thumb_h = BAR_HEIGHT - 4
|
|
thumb_w = int(thumb_h * 16 / 9)
|
|
scaled = pixbuf.scale_simple(thumb_w, thumb_h, GdkPixbuf.InterpType.BILINEAR)
|
|
surface = self._pixbuf_to_surface(scaled)
|
|
self._frame_thumbs.append({
|
|
"timestamp": timestamp,
|
|
"surface": surface,
|
|
"width": scaled.get_width(),
|
|
"height": scaled.get_height(),
|
|
})
|
|
self.queue_draw()
|
|
|
|
def set_frames_from_pixbufs(self, frames: list[dict]) -> None:
|
|
"""Bulk set thumbnails from already-loaded pixbufs. Each dict: {timestamp, pixbuf}."""
|
|
self._frame_thumbs = []
|
|
for f in frames:
|
|
self.add_frame_from_pixbuf(f["timestamp"], f["pixbuf"])
|
|
# queue_draw already called per frame, but one more to be safe
|
|
self.queue_draw()
|
|
|
|
@staticmethod
|
|
def _pixbuf_to_surface(pixbuf):
|
|
"""Convert a GdkPixbuf to a cairo ImageSurface."""
|
|
w, h = pixbuf.get_width(), pixbuf.get_height()
|
|
has_alpha = pixbuf.get_has_alpha()
|
|
fmt = cairo.FORMAT_ARGB32 if has_alpha else cairo.FORMAT_RGB24
|
|
surface = cairo.ImageSurface(fmt, w, h)
|
|
cr = cairo.Context(surface)
|
|
Gdk.cairo_set_source_pixbuf(cr, pixbuf, 0, 0)
|
|
cr.paint()
|
|
surface.flush()
|
|
return surface
|
|
|
|
# -- Drawing --
|
|
|
|
def _draw(self, area, cr, width, height):
|
|
# Solid background — always full width
|
|
cr.set_source_rgb(*BAR_COLOR)
|
|
cr.rectangle(0, 0, width, height)
|
|
cr.fill()
|
|
|
|
if self._total_duration <= 0:
|
|
return
|
|
|
|
# Frame thumbnails at their timestamp positions
|
|
for thumb in self._frame_thumbs:
|
|
tx = self._global_to_x(thumb["timestamp"], width)
|
|
tw, th = thumb["width"], thumb["height"]
|
|
x0 = tx - tw / 2
|
|
y0 = (height - th) / 2
|
|
cr.save()
|
|
cr.set_source_rgba(0, 0, 0, 0.6)
|
|
cr.set_line_width(1.5)
|
|
cr.rectangle(x0 - 0.5, y0 - 0.5, tw + 1, th + 1)
|
|
cr.stroke()
|
|
cr.set_source_surface(thumb["surface"], x0, y0)
|
|
cr.rectangle(x0, y0, tw, th)
|
|
cr.fill()
|
|
cr.restore()
|
|
|
|
# Scene markers
|
|
cr.set_source_rgb(*MARKER_COLOR)
|
|
for ts in self._scene_markers:
|
|
mx = self._global_to_x(ts, width)
|
|
if 0 <= mx <= width:
|
|
cr.rectangle(mx, 0, 1, 5)
|
|
cr.fill()
|
|
|
|
# Cursor
|
|
cx = self._global_to_x(self._cursor, width)
|
|
if 0 <= cx <= width:
|
|
cr.set_source_rgb(*CURSOR_COLOR)
|
|
cr.rectangle(cx - 1, 0, 2, height)
|
|
cr.fill()
|
|
|
|
# -- Geometry helpers --
|
|
|
|
def _segment_rect(self, seg, total_width):
|
|
"""Return (x, width) for a segment's region."""
|
|
if self._total_duration <= 0:
|
|
return 0, 0
|
|
x = (seg["global_offset"] / self._total_duration) * total_width
|
|
w = (seg["duration"] / self._total_duration) * total_width
|
|
return max(0, x), max(1, w)
|
|
|
|
def _global_to_x(self, global_time, total_width):
|
|
"""Map global time to pixel x position."""
|
|
if self._total_duration <= 0:
|
|
return 0
|
|
return (global_time / self._total_duration) * total_width
|
|
|
|
def _x_to_global(self, x, total_width):
|
|
"""Map pixel x to global time."""
|
|
if total_width <= 0 or self._total_duration <= 0:
|
|
return 0.0
|
|
return (x / total_width) * self._total_duration
|
|
|
|
def _segment_at_x(self, x, total_width):
|
|
"""Return the segment index at pixel x, or -1."""
|
|
for seg in self._manifest:
|
|
sx, sw = self._segment_rect(seg, total_width)
|
|
if sx <= x <= sx + sw:
|
|
return seg["index"]
|
|
return -1
|
|
|
|
# -- Mouse handlers --
|
|
|
|
def _on_pressed(self, gesture, n_press, x, y):
|
|
width = self.get_width()
|
|
gt = self._x_to_global(x, width)
|
|
# Activate segment if different
|
|
idx = self._segment_at_x(x, width)
|
|
if idx >= 0 and idx != self._active_index:
|
|
self._active_index = idx
|
|
self.emit("segment-activated", idx)
|
|
# Always seek to click position
|
|
self.emit("scrub-position", gt)
|
|
self.queue_draw()
|
|
|
|
def _on_released(self, gesture, n_press, x, y):
|
|
self._scrubbing = False
|
|
|
|
def _on_motion(self, controller, x, y):
|
|
if self._scrubbing:
|
|
width = self.get_width()
|
|
gt = self._x_to_global(x, width)
|
|
gt = max(0, min(gt, self._total_duration))
|
|
# Switch segment if drag crosses boundary
|
|
idx = self._segment_at_x(x, width)
|
|
if idx >= 0 and idx != self._active_index:
|
|
self._active_index = idx
|
|
self.emit("segment-activated", idx)
|
|
self.emit("scrub-position", gt)
|
|
|
|
def _on_leave(self, controller):
|
|
pass
|
|
|
|
def _on_drag_begin(self, gesture, start_x, start_y):
|
|
self._scrubbing = True
|
|
|
|
def _on_drag_update(self, gesture, offset_x, offset_y):
|
|
if self._scrubbing:
|
|
ok, start_x, start_y = gesture.get_start_point()
|
|
if ok:
|
|
x = start_x + offset_x
|
|
width = self.get_width()
|
|
gt = self._x_to_global(x, width)
|
|
gt = max(0, min(gt, self._total_duration))
|
|
idx = self._segment_at_x(x, width)
|
|
if idx >= 0 and idx != self._active_index:
|
|
self._active_index = idx
|
|
self.emit("segment-activated", idx)
|
|
self.emit("scrub-position", gt)
|
|
|
|
def _on_drag_end(self, gesture, offset_x, offset_y):
|
|
self._scrubbing = False
|