diff --git a/cht/app.py b/cht/app.py index 3707bc7..4197405 100644 --- a/cht/app.py +++ b/cht/app.py @@ -5,7 +5,7 @@ import gi gi.require_version("Gtk", "4.0") gi.require_version("Adw", "1") -from gi.repository import Gtk, Adw, Gio +from gi.repository import Gtk, Adw, Gio, GLib from cht.config import APP_ID, APP_NAME from cht.window import ChtWindow @@ -25,7 +25,14 @@ class ChtApp(Adw.Application): win.present() +def _suppress_egl_warnings(domain, level, message, user_data): + if b"eglExportDMABUFImage" in message: + return + GLib.log_default_handler(domain, level, message, user_data) + + def main(): + GLib.log_set_handler("Gdk", GLib.LogLevelFlags.LEVEL_WARNING, _suppress_egl_warnings, None) logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(levelname)-7s %(name)s: %(message)s", diff --git a/cht/config.py b/cht/config.py index 9548725..23f4b6e 100644 --- a/cht/config.py +++ b/cht/config.py @@ -11,10 +11,10 @@ SESSIONS_DIR = DATA_DIR / "sessions" # Stream defaults STREAM_HOST = "0.0.0.0" STREAM_PORT = 4444 +RELAY_PORT = 4445 # UDP loopback relay for live display -# Frame extraction -SCENE_THRESHOLD = 0.3 # 0-1, lower = more sensitive -MAX_FRAME_INTERVAL = 30 # seconds, fallback if no scene change +# Frame extraction — scene-only, no interval fallback +SCENE_THRESHOLD = 0.10 # 0-1, lower = more sensitive; 0.1 catches slide/window changes # Segment recording SEGMENT_DURATION = 60 # seconds per .ts segment diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index b8cca53..e913c31 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -13,40 +13,78 @@ import ffmpeg log = logging.getLogger(__name__) -GLOBAL_ARGS = ("-hide_banner", "-loglevel", "warning") +GLOBAL_ARGS = ("-hide_banner",) +# Note: scene detection needs -loglevel info for showinfo filter output. +# Individual pipelines can override with .global_args() +QUIET_ARGS = ("-hide_banner", "-loglevel", "warning") def receive_and_record(stream_url, output_path): - """Receive mpegts stream and write to a single growing file. + """Receive mpegts stream and write to MKV file. - mpv reads this file for DVR-style playback. - ffmpeg scene detection runs on this file for frame extraction. - Audio is preserved in the recording (muxed mpegts). + MKV (Matroska) is used because: + - Handles incomplete writes gracefully (like OBS default) + - Proper timestamps for seeking and duration detection + - mpv plays growing MKV files better than mpegts """ stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") return ( - ffmpeg.output(stream, str(output_path), c="copy", f="mpegts") - .global_args(*GLOBAL_ARGS) + ffmpeg.output( + stream, str(output_path), + c="copy", + f="matroska", + flush_packets=1, + ) + .global_args(*QUIET_ARGS) ) -def extract_scene_frames(input_path, output_dir, scene_threshold=0.3, - max_interval=30, start_number=1, start_time=0.0): - """Extract frames from a file on scene change. +def receive_record_and_relay(stream_url, output_path, relay_url): + """Receive TCP stream, write to MKV, and relay to UDP loopback for live display. - Uses ffmpeg select filter with scene detection and a max-interval fallback. - start_time: skip to this position before processing (avoids re-scanning). + Uses ffmpeg tee via merge_outputs: one ffmpeg process handles both outputs + from the same decoded input, keeping them in sync with identical timestamps. + """ + stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") + file_out = ffmpeg.output( + stream, str(output_path), + c="copy", f="matroska", flush_packets=1, + ) + relay_out = ffmpeg.output( + stream, relay_url, + c="copy", f="mpegts", + ) + return ffmpeg.merge_outputs(file_out, relay_out).global_args(*QUIET_ARGS) + + +def extract_scene_frames(input_path, output_dir, scene_threshold=0.10, + start_number=1, start_time=0.0, duration=None): + """Extract frames from a file on scene change only (no interval fallback). + + Frames are a chronological storyboard — captured whenever content changes + meaningfully vs the previous frame. No periodic fallback so static content + produces no spurious frames. + + start_time/duration: applied via the select filter expression (NOT as -ss/-t + input options, which break h264 scene detection on MKV). Returns (stdout, stderr) as decoded strings for timestamp parsing. """ - select_expr = ( - f"gt(scene,{scene_threshold})" - f"+gte(t-prev_selected_t,{max_interval})" - ) - input_opts = {} - if start_time > 0: - input_opts["ss"] = str(start_time) + scene_expr = f"gt(scene,{scene_threshold})" - stream = ffmpeg.input(str(input_path), **input_opts) + # Add time range filter if specified (incremental processing) + time_conditions = [] + if start_time > 0: + time_conditions.append(f"gte(t,{start_time})") + if duration is not None: + time_conditions.append(f"lte(t,{start_time + duration})") + + if time_conditions: + time_filter = "*".join(time_conditions) + select_expr = f"({scene_expr})*{time_filter}" + else: + select_expr = scene_expr + + stream = ffmpeg.input(str(input_path)) stream = stream.filter("select", select_expr).filter("showinfo") output = ( @@ -61,7 +99,14 @@ def extract_scene_frames(input_path, output_dir, scene_threshold=0.3, ) log.info("extract_scene_frames: %s", " ".join(output.compile())) - stdout, stderr = output.run(capture_stdout=True, capture_stderr=True) + try: + stdout, stderr = output.run(capture_stdout=True, capture_stderr=True) + except ffmpeg.Error as e: + # ffmpeg may exit non-zero on growing files (corrupt tail) but still + # produce valid frames. Return the stderr for parsing anyway. + log.debug("ffmpeg exited with error (may still have valid frames)") + stdout = e.stdout or b"" + stderr = e.stderr or b"" return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") diff --git a/cht/stream/manager.py b/cht/stream/manager.py index fb9c851..5090a5c 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -1,11 +1,10 @@ """ -StreamManager: orchestrates ffmpeg pipelines for receiving, recording, -and frame extraction from a muxed mpegts/TCP stream. +StreamManager: orchestrates ffmpeg for recording and scene detection. Architecture: - sender → TCP:4444 → ffmpeg (writes growing recording.ts) - └→ mpv plays recording.ts (DVR: live edge + scrub) - └→ ffmpeg scene detection (periodic on recording) + sender → TCP:4444 → ffmpeg (writes recording.ts) + recording.ts → mpv (plays via Timeline) + recording.ts → ffmpeg scene detection (periodic, incremental) """ import json @@ -18,8 +17,8 @@ from threading import Thread from cht.config import ( STREAM_HOST, STREAM_PORT, + RELAY_PORT, SCENE_THRESHOLD, - MAX_FRAME_INTERVAL, SESSIONS_DIR, ) from cht.stream import ffmpeg as ff @@ -41,71 +40,83 @@ class StreamManager: self._procs = {} self._threads = {} self._stop_flags = set() - log.info("StreamManager created: session=%s dir=%s", session_id, self.session_dir) + log.info("Session: %s", session_id) def setup_dirs(self): for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.agent_dir): d.mkdir(parents=True, exist_ok=True) - log.info("Session directories created") @property def stream_url(self): return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" + @property + def relay_url(self): + return f"udp://127.0.0.1:{RELAY_PORT}" + @property def recording_path(self): - return self.stream_dir / "recording.ts" + return self.stream_dir / "recording.mkv" # -- Recording -- def start_recorder(self): - """Start ffmpeg to receive TCP stream and write to recording.ts.""" - node = ff.receive_and_record(self.stream_url, self.recording_path) + """Start ffmpeg to receive TCP stream, write to MKV, and relay to UDP.""" + node = ff.receive_record_and_relay(self.stream_url, self.recording_path, self.relay_url) proc = ff.run_async(node, pipe_stderr=True) self._procs["recorder"] = proc - log.info("Recorder started: pid=%s url=%s → %s", proc.pid, self.stream_url, self.recording_path) + log.info("Recorder: pid=%s → %s", proc.pid, self.recording_path) self._start_stderr_reader("recorder", proc) - # -- Scene detection -- + # -- Scene Detection -- - def start_scene_detector(self): - """Periodically run ffmpeg scene detection on the growing recording. + def start_scene_detector(self, on_new_frames=None): + """Periodically run scene detection on new portions of the recording. - Tracks how far we've processed to avoid re-scanning from the start. + Args: + on_new_frames: callback(list of {id, timestamp, path}) for new frames """ - log.info("Starting scene detector (threshold=%.2f, interval=%ds)", - SCENE_THRESHOLD, MAX_FRAME_INTERVAL) + self._on_new_frames = on_new_frames def _detect(): - last_processed_size = 0 - processed_duration = 0.0 # seconds already processed + processed_time = 0.0 frame_count = 0 while "stop" not in self._stop_flags: - time.sleep(10) + time.sleep(5) if not self.recording_path.exists(): continue + size = self.recording_path.stat().st_size - if size <= last_processed_size or size < 100_000: + if size < 100_000: continue - log.info("Recording grew: %d → %d bytes, scanning from %.1fs", - last_processed_size, size, processed_duration) - last_processed_size = size + # Get current duration. Use a 6s safety margin — MKV tail can + # be corrupt for several seconds after the last flush, causing + # ffmpeg to crash even with a 3s margin. + safe_duration = self._estimate_safe_duration() + if safe_duration is None or safe_duration <= processed_time + 8: + continue - try: - new_count, new_duration = self._extract_new_frames( - self.recording_path, - start_time=processed_duration, - start_number=frame_count + 1, - ) - if new_count > 0: - frame_count += new_count - log.info("Found %d new frames (total: %d)", new_count, frame_count) - if new_duration > processed_duration: - processed_duration = new_duration - except Exception as e: - log.error("Scene detection failed: %s", e) + # Process from last checkpoint to safe point + process_to = safe_duration - 6 # 6s safety margin for MKV tail + if process_to <= processed_time: + continue + + log.info("Scene detection: %.1fs → %.1fs", processed_time, process_to) + new_frames = self._detect_scenes( + start_time=processed_time, + end_time=process_to, + start_number=frame_count + 1, + ) + + if new_frames: + frame_count += len(new_frames) + log.info("Found %d new scene frames (total: %d)", len(new_frames), frame_count) + if self._on_new_frames: + self._on_new_frames(new_frames) + + processed_time = process_to log.info("Scene detector stopped") @@ -113,39 +124,46 @@ class StreamManager: t.start() self._threads["scene_detector"] = t - def _extract_new_frames(self, path, start_time=0.0, start_number=1): - """Extract scene-change frames starting from a given timestamp. + def _estimate_safe_duration(self): + """Estimate recording duration. Uses ffprobe, falls back to file size.""" + try: + import ffmpeg as ffmpeg_lib + info = ffmpeg_lib.probe(str(self.recording_path)) + dur = float(info.get("format", {}).get("duration", 0)) + if dur > 0: + return dur + except Exception: + pass - Returns (new_frame_count, max_timestamp_seen). - """ + # Fallback: rough estimate from file size (~500kbit/s typical for this stream) + try: + size = self.recording_path.stat().st_size + return size / 65_000 # ~500kbps → 62.5 KB/s + except Exception: + return None + + def _detect_scenes(self, start_time, end_time, start_number): + """Run ffmpeg scene detection on a time range. Returns list of new frame entries.""" + duration = end_time - start_time existing_before = set(f.name for f in self.frames_dir.glob("F*.jpg")) try: _stdout, stderr = ff.extract_scene_frames( - path, + self.recording_path, self.frames_dir, scene_threshold=SCENE_THRESHOLD, - max_interval=MAX_FRAME_INTERVAL, start_number=start_number, start_time=start_time, + duration=duration, ) except Exception as e: - log.error("ffmpeg scene extraction error: %s", e) - return 0, start_time + log.error("Scene detection failed: %s", e) + return [] - if stderr: - for line in stderr.splitlines()[:5]: - log.debug("[scene_detect:stderr] %s", line) - - # Parse timestamps and update index - max_ts = start_time - new_count = 0 + # Parse new frames from showinfo output + new_frames = [] index_path = self.frames_dir / "index.json" - if index_path.exists(): - with open(index_path) as f: - index = json.load(f) - else: - index = [] + index = json.loads(index_path.read_text()) if index_path.exists() else [] frame_num = start_number for line in stderr.splitlines(): @@ -157,45 +175,35 @@ class StreamManager: frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" if frame_path.exists() and frame_path.name not in existing_before: - index.append({ + entry = { "id": frame_id, "timestamp": pts_time, "path": str(frame_path), "sent_to_agent": False, - }) - log.info("Indexed frame %s at pts=%.2f", frame_id, pts_time) - new_count += 1 - if pts_time > max_ts: - max_ts = pts_time + } + index.append(entry) + new_frames.append(entry) frame_num += 1 - with open(index_path, "w") as f: - json.dump(index, f, indent=2) - - return new_count, max_ts + index_path.write_text(json.dumps(index, indent=2)) + return new_frames # -- Lifecycle -- def stop_all(self): - log.info("Stopping all processes...") + log.info("Stopping all...") self._stop_flags.add("stop") for name, proc in self._procs.items(): - log.info("Stopping %s (pid=%s)", name, proc.pid if proc else "?") + log.info("Stopping %s", name) ff.stop_proc(proc) self._procs.clear() - log.info("All processes stopped") def _start_stderr_reader(self, name, proc): def _read(): - try: - for line in proc.stderr: - text = line.decode("utf-8", errors="replace").rstrip() - if text: - log.info("[%s:stderr] %s", name, text) - except Exception as e: - log.warning("[%s:stderr] read error: %s", name, e) - retcode = proc.poll() - log.info("[%s] process exited: code=%s", name, retcode) + for line in proc.stderr: + text = line.decode("utf-8", errors="replace").rstrip() + if text: + log.debug("[%s] %s", name, text) + log.info("[%s] exited: %s", name, proc.poll()) - t = Thread(target=_read, daemon=True, name=f"{name}_stderr") - t.start() + Thread(target=_read, daemon=True, name=f"{name}_stderr").start() diff --git a/cht/stream/tracker.py b/cht/stream/tracker.py new file mode 100644 index 0000000..6849547 --- /dev/null +++ b/cht/stream/tracker.py @@ -0,0 +1,83 @@ +""" +RecordingTracker: monitors the growing recording file and estimates duration. + +Polls file size periodically. Uses ffprobe occasionally for accurate +duration calibration. Feeds duration updates to the Timeline. +""" + +import json +import logging +import subprocess +import time +from pathlib import Path +from threading import Thread + +import ffmpeg as ffmpeg_lib + +log = logging.getLogger(__name__) + + +class RecordingTracker: + """Tracks a growing recording file and estimates its duration.""" + + def __init__(self, recording_path, on_duration_update=None): + self._path = recording_path + self._on_duration = on_duration_update + self._duration = 0.0 + self._avg_bitrate = None # bytes per second, calibrated by ffprobe + self._stop = False + self._thread = None + + @property + def duration(self): + return self._duration + + def start(self): + self._stop = False + self._thread = Thread(target=self._poll_loop, daemon=True, name="rec_tracker") + self._thread.start() + log.info("RecordingTracker started: %s", self._path) + + def stop(self): + self._stop = True + log.info("RecordingTracker stopped") + + def _poll_loop(self): + probe_interval = 0 # probe on first data + cycles = 0 + + while not self._stop: + time.sleep(2) + + if not self._path.exists(): + continue + + size = self._path.stat().st_size + if size < 10_000: + continue + + # Calibrate with ffprobe every ~30s or on first data + cycles += 1 + if self._avg_bitrate is None or cycles % 15 == 0: + probed = self._probe_duration() + if probed and probed > 0 and size > 0: + self._avg_bitrate = size / probed + self._duration = probed + log.info("Probed duration: %.1fs (bitrate: %.0f B/s)", + probed, self._avg_bitrate) + elif self._avg_bitrate: + # Estimate from file size between probes + self._duration = size / self._avg_bitrate + + if self._on_duration and self._duration > 0: + self._on_duration(self._duration) + + def _probe_duration(self): + """Use ffprobe to get accurate duration of the recording.""" + try: + info = ffmpeg_lib.probe(str(self._path)) + duration = float(info.get("format", {}).get("duration", 0)) + return duration + except Exception as e: + log.debug("ffprobe failed (file still growing): %s", e) + return None diff --git a/cht/ui/monitor.py b/cht/ui/monitor.py index 1a188e8..de2a6ec 100644 --- a/cht/ui/monitor.py +++ b/cht/ui/monitor.py @@ -1,10 +1,13 @@ """ -MonitorWidget: mpv-based stream monitor embedded in GTK4 via OpenGL. +MonitorWidget: dual-player video display embedded in GTK4 via OpenGL. -Supports DVR-style playback of a growing recording file: - - Follows live edge by default - - Slider scrubs video + audio together - - Can capture frame at current cursor position +Two players share the same position via a Gtk.Stack: + - "live" player: mpv reads UDP relay (low latency, always streaming) + - "review" player: mpv reads local MKV file (full seek support) + +Driven by a single Timeline "changed" signal. Reads timeline.state directly: + state.live=True → show live stack, live player streams + state.live=False → show review stack, apply state.paused """ import ctypes @@ -18,179 +21,187 @@ from cht.ui.mpv import Player log = logging.getLogger(__name__) -# Cache libGL reference _libGL = ctypes.cdll.LoadLibrary("libGL.so.1") GL_DRAW_FRAMEBUFFER_BINDING = 0x8CA6 +def _make_gl_area(on_realize, on_unrealize, on_render): + gl_area = Gtk.GLArea() + gl_area.set_hexpand(True) + gl_area.set_vexpand(True) + gl_area.set_auto_render(False) + gl_area.set_has_depth_buffer(False) + gl_area.set_has_stencil_buffer(False) + gl_area.connect("realize", on_realize) + gl_area.connect("unrealize", on_unrealize) + gl_area.connect("render", on_render) + return gl_area + + class MonitorWidget(Gtk.Box): - """Embedded mpv video player with DVR controls.""" + """Dual-player mpv display, driven by Timeline "changed" signal.""" - def __init__(self, **kwargs): + def __init__(self, timeline, **kwargs): super().__init__(orientation=Gtk.Orientation.VERTICAL, **kwargs) - self._player = None - self._following_live = True - self._slider_updating = False + self._timeline = timeline + self._live_source_url = None + self._recording_path = None - # GL area for video - self._gl_area = Gtk.GLArea() - self._gl_area.set_hexpand(True) - self._gl_area.set_vexpand(True) - self._gl_area.set_auto_render(False) - self._gl_area.set_has_depth_buffer(False) - self._gl_area.set_has_stencil_buffer(False) - self._gl_area.connect("realize", self._on_realize) - self._gl_area.connect("unrealize", self._on_unrealize) - self._gl_area.connect("render", self._on_render) - self.append(self._gl_area) + self._live_player = None + self._live_loaded = False - # Slider for scrubbing (shared timeline for video + audio) - slider_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) - slider_box.set_margin_start(4) - slider_box.set_margin_end(4) - slider_box.set_margin_bottom(2) + self._review_player = None - self._time_label = Gtk.Label(label="00:00") - self._time_label.set_width_chars(6) - slider_box.append(self._time_label) + self._stack = Gtk.Stack() + self._stack.set_hexpand(True) + self._stack.set_vexpand(True) + self._stack.set_transition_type(Gtk.StackTransitionType.NONE) - self._slider = Gtk.Scale(orientation=Gtk.Orientation.HORIZONTAL) - self._slider.set_hexpand(True) - self._slider.set_range(0, 1) - self._slider.set_draw_value(False) - self._slider.connect("value-changed", self._on_slider_changed) - slider_box.append(self._slider) + self._live_gl = _make_gl_area( + self._on_live_realize, self._on_live_unrealize, self._on_live_render, + ) + self._stack.add_named(self._live_gl, "live") - self._duration_label = Gtk.Label(label="00:00") - self._duration_label.set_width_chars(6) - slider_box.append(self._duration_label) + self._review_gl = _make_gl_area( + self._on_review_realize, self._on_review_unrealize, self._on_review_render, + ) + self._stack.add_named(self._review_gl, "review") - self._live_btn = Gtk.Button(label="LIVE") - self._live_btn.add_css_class("suggested-action") - self._live_btn.connect("clicked", self._on_live_clicked) - slider_box.append(self._live_btn) + self.append(self._stack) - self.append(slider_box) - - # Update slider position periodically - GLib.timeout_add(500, self._update_slider) - - log.info("MonitorWidget initialized (GLArea + slider)") - - # -- GL callbacks -- - - def _on_realize(self, gl_area): - log.info("GLArea realized") - gl_area.make_current() - if gl_area.get_error(): - log.error("GLArea error: %s", gl_area.get_error()) - - def _on_unrealize(self, gl_area): - log.info("GLArea unrealized") - self.stop() - - def _on_render(self, gl_area, gl_context): - if not self._player: - return True - - width = gl_area.get_width() - height = gl_area.get_height() - - fbo_id = ctypes.c_int(0) - _libGL.glGetIntegerv(GL_DRAW_FRAMEBUFFER_BINDING, ctypes.byref(fbo_id)) - - self._player.render(fbo_id.value, width, height) - return True - - def _on_mpv_update(self): - GLib.idle_add(self._gl_area.queue_render) - - # -- Slider -- - - def _on_slider_changed(self, slider): - if self._slider_updating or not self._player: - return - pos = slider.get_value() - self._player.seek(pos) - self._following_live = False - self._live_btn.remove_css_class("suggested-action") - - def _on_live_clicked(self, button): - if self._player and self._player.duration: - self._player.seek(self._player.duration - 0.5) - self._following_live = True - self._live_btn.add_css_class("suggested-action") - - def _update_slider(self): - if not self._player: - return True - - pos = self._player.time_pos - dur = self._player.duration - if pos is not None and dur is not None and dur > 0: - self._slider_updating = True - self._slider.set_range(0, dur) - self._slider.set_value(pos) - self._slider_updating = False - - self._time_label.set_text(self._fmt_time(pos)) - self._duration_label.set_text(self._fmt_time(dur)) - - # Auto-follow live edge: if at EOF or falling behind, reload - if self._following_live: - if self._player.idle or dur - pos > 3: - self._reload_live() - - return True # keep timer running - - def _reload_live(self): - """Reload the growing file and seek to near-end (live edge).""" - if not self._player or not self._recording_path: - return - self._player.play(str(self._recording_path)) - # Small delay then seek to end - GLib.timeout_add(500, self._seek_to_end_once) - - @staticmethod - def _fmt_time(seconds): - m, s = divmod(int(seconds), 60) - h, m = divmod(m, 60) - if h: - return f"{h}:{m:02d}:{s:02d}" - return f"{m:02d}:{s:02d}" + timeline.connect("changed", self._on_changed) + GLib.timeout_add(500, self._sync_cursor_from_player) + log.info("MonitorWidget initialized") # -- Public API -- - def _seek_to_end_once(self): - if self._player and self._player.duration: - self._player.seek(self._player.duration - 0.5) - return False # don't repeat + def set_live_source(self, url): + self._live_source_url = url + log.info("Live source: %s", url) + if self._live_player and not self._live_loaded: + self._live_player.load_live(url) + self._live_player.play() + self._live_loaded = True - def start_recording(self, recording_path): - """Start DVR-style playback of a growing recording file. + def set_recording(self, path): + self._recording_path = path + log.info("Recording path: %s", path) - Args: - recording_path: path to the .ts file being written by ffmpeg - """ - self._recording_path = recording_path - self._gl_area.make_current() - - self._player = Player() - self._player.init_gl(update_callback=self._on_mpv_update) - self._player.play_file(recording_path) - self._following_live = True - self._live_btn.add_css_class("suggested-action") - log.info("Monitor playing recording: %s", recording_path) + def get_live_position(self): + """Return the live player's current time_pos, or None.""" + if self._live_player: + return self._live_player.time_pos + return None def screenshot(self, path): - """Capture frame at current cursor position.""" - if self._player: - self._player.screenshot(path) + if self._timeline.state.live and self._live_player: + self._live_player.screenshot(path) + elif self._review_player: + self._review_player.screenshot(path) def stop(self): - if self._player: - log.info("Stopping monitor") - self._player.terminate() - self._player = None + log.info("Stopping monitor") + if self._live_player: + self._live_player.terminate() + self._live_player = None + self._live_loaded = False + if self._review_player: + self._review_player.terminate() + self._review_player = None + + # -- Live GLArea -- + + def _on_live_realize(self, gl_area): + gl_area.make_current() + self._live_player = Player() + self._live_player.init_gl( + update_callback=lambda: GLib.idle_add(self._live_gl.queue_render) + ) + log.info("Live player created") + if self._live_source_url and not self._live_loaded: + self._live_player.load_live(self._live_source_url) + self._live_player.play() + self._live_loaded = True + + def _on_live_unrealize(self, gl_area): + if self._live_player: + self._live_player.terminate() + self._live_player = None + self._live_loaded = False + + def _on_live_render(self, gl_area, _ctx): + if not self._live_player: + return True + fbo = ctypes.c_int(0) + _libGL.glGetIntegerv(GL_DRAW_FRAMEBUFFER_BINDING, ctypes.byref(fbo)) + self._live_player.render(fbo.value, gl_area.get_width(), gl_area.get_height()) + return True + + # -- Review GLArea -- + + def _on_review_realize(self, gl_area): + gl_area.make_current() + self._review_player = Player() + self._review_player.init_gl( + update_callback=lambda: GLib.idle_add(self._review_gl.queue_render) + ) + log.info("Review player created") + + def _on_review_unrealize(self, gl_area): + if self._review_player: + self._review_player.terminate() + self._review_player = None + + def _on_review_render(self, gl_area, _ctx): + if not self._review_player: + return True + fbo = ctypes.c_int(0) + _libGL.glGetIntegerv(GL_DRAW_FRAMEBUFFER_BINDING, ctypes.byref(fbo)) + self._review_player.render(fbo.value, gl_area.get_width(), gl_area.get_height()) + return True + + # -- Timeline response -- + + def _on_changed(self, timeline): + s = timeline.state + current = self._stack.get_visible_child_name() + + if s.live: + # Ensure live player is loaded and playing + if self._live_player and not self._live_loaded and self._live_source_url: + self._live_player.load_live(self._live_source_url) + self._live_player.play() + self._live_loaded = True + elif self._live_player and self._live_loaded: + self._live_player.play() + if current != "live": + self._stack.set_visible_child_name("live") else: - log.info("Monitor already stopped") + # Scrub mode + if current == "live": + # Transitioning from live: seek review player to live position + pos = s.cursor # already set by toggle_live() + if self._review_player and self._recording_path: + self._review_player.load(self._recording_path) + if s.paused: + self._review_player.show_frame_at(pos) + else: + self._review_player.seek(pos) + self._review_player.play() + self._stack.set_visible_child_name("review") + else: + # Already in review: just apply paused state + if self._review_player: + if s.paused: + self._review_player.pause() + else: + self._review_player.play() + + def _sync_cursor_from_player(self): + s = self._timeline.state + if not s.live and not s.paused and self._review_player: + pos = self._review_player.time_pos + if pos is not None and pos > 0: + self._timeline.set_cursor(pos) + # Live mode: cursor driven by tick_live() in window.py + return True diff --git a/cht/ui/mpv.py b/cht/ui/mpv.py index 2064627..a2bd257 100644 --- a/cht/ui/mpv.py +++ b/cht/ui/mpv.py @@ -2,10 +2,7 @@ MPV wrapper using python-mpv (libmpv bindings) with OpenGL render API. Renders video frames to an OpenGL context provided by GTK4's GLArea. -Supports DVR-style playback of a growing recording file: - - Follow live edge (default) - - Scrub back to any point - - Audio + video synced via single slider +Driven by the Timeline state machine — does not manage its own state. """ import ctypes @@ -35,10 +32,8 @@ _get_proc_address = _make_get_proc_address() class Player: """Wraps a libmpv player with OpenGL render context for GTK4 embedding. - Designed for DVR-style playback of a growing file: - - play_file() opens the recording and seeks to end (live edge) - - seek() scrubs to any position (audio + video move together) - - time_pos / duration track playback state for the slider + Does not manage playback state — that's the Timeline's job. + Provides: load, play, pause, seek, show_frame_at, render. """ def __init__(self): @@ -48,17 +43,12 @@ class Player: "osc": False, "vo": "libmpv", "hwdec": "auto", - "video_sync": "display-desync", - # DVR: keep alive at EOF, wait for more data + # Keep open at EOF so LIVE mode can wait for more data "keep_open": "yes", - "demuxer_max_bytes": "500MiB", - "demuxer_readahead_secs": "5", - # Allow re-reading growing file - "demuxer_cache_wait": True, + "keep_open_pause": "no", } - log.info("Creating mpv player (OpenGL render, DVR mode)") - self._player = libmpv.MPV(log_handler=self._mpv_log, loglevel="v", **opts) + self._player = libmpv.MPV(log_handler=self._mpv_log, loglevel="warn", **opts) self._ctx = None self._update_callback = None log.info("mpv player created") @@ -70,14 +60,9 @@ class Player: log.error(msg) elif loglevel == "warn": log.warning(msg) - else: - log.debug(msg) def init_gl(self, update_callback): - """Initialize the OpenGL render context. - - Must be called with an active GL context. - """ + """Initialize the OpenGL render context. Call with active GL context.""" self._update_callback = update_callback self._ctx = libmpv.MpvRenderContext( self._player, @@ -88,7 +73,7 @@ class Player: ) self._get_proc_address_ref = _get_proc_address self._ctx.update_cb = self._on_mpv_update - log.info("mpv OpenGL render context initialized") + log.info("mpv GL render context initialized") def _on_mpv_update(self): if self._update_callback: @@ -102,51 +87,63 @@ class Player: opengl_fbo={"fbo": fbo, "w": width, "h": height}, ) - def play(self, source): - """Play from any source (URL, file path).""" - log.info("mpv play: %s", source) - self._player.play(str(source)) + def load(self, path): + """Load a recording file. Does not start playback.""" + log.info("mpv load: %s", path) + self._player.loadfile(str(path), mode="replace") - def play_file(self, path): - """Play a recording file, seeking to the end (live edge).""" - log.info("mpv play_file (DVR): %s", path) - self._player.play(str(path)) - # Seek to end once playback starts - self._player.observe_property("duration", self._seek_to_live_once) + def load_live(self, url): + """Load a live stream URL with low-latency options.""" + self._player["cache"] = "no" + self._player["demuxer-max-bytes"] = "512KiB" + self._player["audio-buffer"] = 0.2 + log.info("mpv load_live: %s", url) + self._player.loadfile(str(url), mode="replace") - def _seek_to_live_once(self, name, value): - """Seek to live edge once duration is known, then stop observing.""" - if value and value > 1: - log.info("Seeking to live edge: %.1fs", value) - self._player.seek(value - 0.5, reference="absolute") - self._player.unobserve_property("duration", self._seek_to_live_once) + def play(self): + """Resume/start playback.""" + self._player.pause = False def pause(self): + """Pause playback.""" self._player.pause = True - def resume(self): - self._player.pause = False + def seek(self, seconds): + """Seek to absolute position.""" + try: + self._player.seek(seconds, reference="absolute") + except Exception: + pass # seek may fail if file not loaded yet + + def show_frame_at(self, seconds): + """Pause and show the frame at the given timestamp.""" + self._player.pause = True + try: + self._player.seek(seconds, reference="absolute") + except Exception: + pass + + @property + def time_pos(self): + try: + return self._player.time_pos + except Exception: + return None @property def paused(self): return self._player.pause - def seek(self, seconds): - """Seek to absolute position. Audio + video move together.""" - self._player.seek(seconds, reference="absolute") - - def seek_relative(self, seconds): - """Seek relative to current position.""" - self._player.seek(seconds, reference="relative") + @property + def idle(self): + return self._player.core_idle def screenshot(self, path): """Save current frame as an image file.""" - self._player.screenshot_to_file(str(path), includes="video") - log.debug("Screenshot saved: %s", path) - - def stop(self): - log.info("mpv stop") - self._player.stop() + try: + self._player.screenshot_to_file(str(path), includes="video") + except Exception as e: + log.warning("Screenshot failed: %s", e) def terminate(self): log.info("mpv terminate") @@ -157,21 +154,3 @@ class Player: self._player.terminate() except Exception as e: log.warning("mpv terminate error: %s", e) - - @property - def idle(self): - return self._player.core_idle - - @property - def duration(self): - try: - return self._player.duration - except Exception: - return None - - @property - def time_pos(self): - try: - return self._player.time_pos - except Exception: - return None diff --git a/cht/ui/timeline.py b/cht/ui/timeline.py new file mode 100644 index 0000000..0e7d21c --- /dev/null +++ b/cht/ui/timeline.py @@ -0,0 +1,273 @@ +""" +Timeline: state machine + shared slider for the recording. + +State is a plain ViewState dataclass — two orthogonal flags replace the old +4-state enum. A single "changed" GObject signal is emitted on any mutation; +consumers read timeline.state directly. + + live=True — watching UDP relay; play/pause and seek are disabled + live=False — scrub mode; paused controls play/pause +""" + +import logging +import time +from dataclasses import dataclass, field + +import gi +gi.require_version("Gtk", "4.0") +from gi.repository import Gtk, GLib, GObject + +log = logging.getLogger(__name__) + + +@dataclass +class ViewState: + live: bool = False # True = watching live UDP relay + paused: bool = False # scrub mode: is playback paused? + cursor: float = 0.0 # current position in recording (seconds) + duration: float = 0.0 # recording duration (seconds) + scene_markers: list[float] = field(default_factory=list) + + +class Timeline(GObject.Object): + """State machine for recording playback. + + Single signal: "changed" — emitted on any state mutation. + Consumers read timeline.state directly (no signal arguments). + """ + + __gsignals__ = { + "changed": (GObject.SignalFlags.RUN_FIRST, None, ()), + } + + def __init__(self): + super().__init__() + self.state = ViewState() + log.info("Timeline created") + + def _emit(self): + self.emit("changed") + + # -- Duration / cursor (called from background threads via GLib.idle_add) -- + + def set_duration(self, duration): + """Update recording duration. In live mode, cursor follows end.""" + if duration <= self.state.duration: + return + self.state.duration = duration + if self.state.live: + self.state.cursor = duration + self._emit() + + def set_cursor(self, position): + """Set cursor position (from slider drag or playback sync).""" + position = max(0.0, min(position, self.state.duration)) + if abs(position - self.state.cursor) < 0.05: + return + self.state.cursor = position + self._emit() + + def tick_live(self): + """Advance cursor by 1s in live mode (smooth progression between probes).""" + if not self.state.live: + return + self.state.cursor += 1.0 + # Only cap once duration is known — avoids clamping to 0 before first probe + if self.state.duration > 0: + self.state.cursor = min(self.state.cursor, self.state.duration) + self._emit() + + def add_scene_marker(self, timestamp): + """Add a scene change marker (does not emit — no UI update needed).""" + self.state.scene_markers.append(timestamp) + self.state.scene_markers.sort() + + # -- User actions -- + + def go_live(self): + """Go to live mode at the recording end.""" + self.state.live = True + self.state.paused = False + self.state.cursor = self.state.duration + self._emit() + + def toggle_live(self, live_player_pos=None): + """Toggle between live and scrub mode. + + When leaving live, cursor stays at the live player's actual position + if provided, otherwise stays at current cursor. + """ + if self.state.live: + # Enter scrub mode at the live player's current position + self.state.live = False + self.state.paused = True + if live_player_pos is not None and live_player_pos > 0: + pos = max(0.0, min(live_player_pos, self.state.duration)) + self.state.cursor = pos + else: + self.state.live = True + self.state.paused = False + self.state.cursor = self.state.duration + self._emit() + + def play(self): + if self.state.live: + return + self.state.paused = False + self._emit() + + def pause(self): + if self.state.live: + return + self.state.paused = True + self._emit() + + def seek(self, position): + """Seek to position — enters scrub mode, pauses.""" + self.state.live = False + self.state.paused = True + position = max(0.0, min(position, self.state.duration)) + if abs(position - self.state.cursor) >= 0.05: + self.state.cursor = position + self._emit() + + def reset(self): + """Reset all state (called on disconnect).""" + self.state = ViewState() + self._emit() + + +class TimelineControls(Gtk.Box): + """Shared slider + play/pause/live controls. + + Play/Pause and slider are insensitive in live mode. + LIVE button is a toggle — active style when live=True. + """ + + def __init__(self, timeline, **kwargs): + super().__init__(orientation=Gtk.Orientation.HORIZONTAL, spacing=4, **kwargs) + self._timeline = timeline + self._updating_slider = False + self._dragging = False + self._wall_clock_start = None + + self.set_margin_start(4) + self.set_margin_end(4) + self.set_margin_top(2) + self.set_margin_bottom(4) + + # Play/Pause button + self._play_btn = Gtk.Button(label="Play") + self._play_btn.connect("clicked", self._on_play_clicked) + self.append(self._play_btn) + + # Current time label + self._time_label = Gtk.Label(label="00:00") + self._time_label.set_width_chars(6) + self.append(self._time_label) + + # Slider + self._slider = Gtk.Scale(orientation=Gtk.Orientation.HORIZONTAL) + self._slider.set_hexpand(True) + self._slider.set_range(0, 1) + self._slider.set_draw_value(False) + self._slider.connect("value-changed", self._on_slider_value_changed) + + press_ctrl = Gtk.GestureClick() + press_ctrl.connect("pressed", self._on_slider_pressed) + press_ctrl.connect("released", self._on_slider_released) + press_ctrl.set_propagation_phase(Gtk.PropagationPhase.CAPTURE) + self._slider.add_controller(press_ctrl) + self.append(self._slider) + + # Duration label + self._duration_label = Gtk.Label(label="00:00 / 00:00") + self._duration_label.set_width_chars(14) + self.append(self._duration_label) + + # LIVE toggle button + self._live_btn = Gtk.Button(label="LIVE") + self._live_btn.connect("clicked", self._on_live_clicked) + self.append(self._live_btn) + + timeline.connect("changed", self._on_changed) + GLib.timeout_add(1000, self._tick_total) + + def _on_play_clicked(self, btn): + s = self._timeline.state + if s.paused: + self._timeline.play() + else: + self._timeline.pause() + + def set_live_toggle_callback(self, cb): + """Override the LIVE button handler. cb() should return the live player + position (float or None) and call timeline.toggle_live() itself.""" + self._live_toggle_cb = cb + + def _on_live_clicked(self, btn): + if hasattr(self, "_live_toggle_cb"): + self._live_toggle_cb() + else: + self._timeline.toggle_live() + + def _on_slider_value_changed(self, slider): + if self._dragging: + self._time_label.set_text(self._fmt_time(slider.get_value())) + + def _on_slider_pressed(self, gesture, n_press, x, y): + self._dragging = True + + def _on_slider_released(self, gesture, n_press, x, y): + if self._dragging: + self._dragging = False + self._timeline.seek(self._slider.get_value()) + + def _on_changed(self, timeline): + s = timeline.state + + # Start wall clock when first going live (not on duration, which arrives ~30s later) + if s.live and self._wall_clock_start is None: + self._wall_clock_start = time.monotonic() + + # Live mode: disable scrub controls + self._play_btn.set_sensitive(not s.live) + self._slider.set_sensitive(not s.live) + + if s.live: + self._live_btn.add_css_class("suggested-action") + else: + self._live_btn.remove_css_class("suggested-action") + + # Play button label (only relevant in scrub mode) + self._play_btn.set_label("Pause" if not s.paused else "Play") + + # Slider position + if not self._dragging: + self._updating_slider = True + self._slider.set_range(0, max(s.duration, 0.1)) + self._slider.set_value(s.cursor) + self._updating_slider = False + + self._time_label.set_text(self._fmt_time(s.cursor)) + self._update_duration_label() + + def _tick_total(self): + self._update_duration_label() + return True + + def _update_duration_label(self): + s = self._timeline.state + loaded = s.duration + total = (time.monotonic() - self._wall_clock_start) if self._wall_clock_start else loaded + self._duration_label.set_text( + f"{self._fmt_time(loaded)} / {self._fmt_time(total)}" + ) + + @staticmethod + def _fmt_time(seconds): + m, s = divmod(int(seconds), 60) + h, m = divmod(m, 60) + if h: + return f"{h}:{m:02d}:{s:02d}" + return f"{m:02d}:{s:02d}" diff --git a/cht/window.py b/cht/window.py index 055b825..7d9820f 100644 --- a/cht/window.py +++ b/cht/window.py @@ -1,3 +1,5 @@ +"""Main application window — wires Timeline to all components.""" + import json import logging from pathlib import Path @@ -9,8 +11,10 @@ gi.require_version("GdkPixbuf", "2.0") from gi.repository import Gtk, Adw, GLib, Pango, GdkPixbuf from cht.config import APP_NAME +from cht.ui.timeline import Timeline, TimelineControls from cht.ui.monitor import MonitorWidget from cht.stream.manager import StreamManager +from cht.stream.tracker import RecordingTracker log = logging.getLogger(__name__) @@ -21,22 +25,25 @@ class ChtWindow(Adw.ApplicationWindow): self.set_title(APP_NAME) self.set_default_size(1400, 900) self._streaming = False + self._stream_mgr = None + self._tracker = None + self._known_frames = set() - # Main horizontal paned: agent output (left) | right panels + # Timeline is the central state machine + self._timeline = Timeline() + + # Main layout self._main_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) self._main_paned.set_shrink_start_child(False) self._main_paned.set_shrink_end_child(False) self._main_paned.set_position(450) - # Left: Agent output panel - self._agent_output = self._build_agent_output() - self._main_paned.set_start_child(self._agent_output) + self._main_paned.set_start_child(self._build_agent_output()) - # Right: vertical stack of panels right_box = self._build_right_panels() self._main_paned.set_end_child(right_box) - # Wrap in toolbar view with header + # Header toolbar = Adw.ToolbarView() header = Adw.HeaderBar() header.set_title_widget(Gtk.Label(label=APP_NAME)) @@ -48,13 +55,8 @@ class ChtWindow(Adw.ApplicationWindow): toolbar.add_top_bar(header) toolbar.set_content(self._main_paned) - self.set_content(toolbar) - # Stream manager - self._stream_mgr = None - - # Connect window close to cleanup self.connect("close-request", self._on_close) log.info("Window initialized") @@ -71,124 +73,134 @@ class ChtWindow(Adw.ApplicationWindow): self._connect_btn.add_css_class("destructive-action") self._streaming = True + # Create session self._stream_mgr = StreamManager() - log.info("Session: %s", self._stream_mgr.session_id) self._stream_mgr.setup_dirs() - # 1. ffmpeg receives TCP and writes growing recording.ts + # Start ffmpeg recorder (listens for sender, relays to UDP) self._stream_mgr.start_recorder() - log.info("Recorder started, waiting for sender...") - # 2. mpv plays the recording file (DVR: live edge + scrub) - # Small delay to let ffmpeg create the file - GLib.timeout_add(2000, self._start_playback) + # Tell monitor where the recording will be and what URL to stream live from + self._monitor.set_recording(self._stream_mgr.recording_path) + self._monitor.set_live_source(self._stream_mgr.relay_url) - # 3. ffmpeg scene detection runs periodically on the recording - self._stream_mgr.start_scene_detector() - log.info("Scene detector started") + # Start tracking recording duration + self._tracker = RecordingTracker( + self._stream_mgr.recording_path, + on_duration_update=self._on_duration_update, + ) + self._tracker.start() - # 4. Poll for new frames and show thumbnails - self._known_frames = set() + # Go LIVE after a short delay — ffmpeg needs time to establish TCP + # and begin writing both outputs. UDP relay starts immediately after. + GLib.timeout_add(4000, self._go_live_once) + + # Start scene detection + self._stream_mgr.start_scene_detector(on_new_frames=self._on_new_scene_frames) + + # Start polling for frame thumbnails GLib.timeout_add(3000, self._poll_frames) - def _start_playback(self): - """Start mpv playback once recording file exists.""" - if self._stream_mgr and self._stream_mgr.recording_path.exists(): - size = self._stream_mgr.recording_path.stat().st_size - if size > 10_000: - self._monitor.start_recording(self._stream_mgr.recording_path) - log.info("Playback started") - return False # stop timer - log.info("Waiting for recording data...") - return True # retry + # Tick the LIVE cursor every second + GLib.timeout_add(1000, self._tick_live) + + log.info("Waiting for sender...") + + def _go_live_once(self): + """Called once after startup delay — go LIVE.""" + if self._stream_mgr: + log.info("Going LIVE (startup delay elapsed)") + self._timeline.go_live() + return False # one-shot + + def _tick_live(self): + """Tick cursor in LIVE mode so timer advances smoothly.""" + if not self._streaming: + return False + self._timeline.tick_live() + return True + + def _on_duration_update(self, duration): + """Called from RecordingTracker thread.""" + GLib.idle_add(self._timeline.set_duration, duration) + + def _on_new_scene_frames(self, frames): + """Called from scene detector thread when new frames are found.""" + for f in frames: + GLib.idle_add(self._timeline.add_scene_marker, f["timestamp"]) + + def _on_live_toggle(self): + """LIVE button handler — passes the live player's current position.""" + pos = self._monitor.get_live_position() + self._timeline.toggle_live(live_player_pos=pos) def _stop_stream(self): log.info("Stopping stream...") + self._timeline.reset() self._monitor.stop() - log.info("Monitor stopped") + if self._tracker: + self._tracker.stop() + self._tracker = None if self._stream_mgr: self._stream_mgr.stop_all() - log.info("Stream manager stopped") self._stream_mgr = None + self._known_frames = set() self._connect_btn.set_label("Connect") self._connect_btn.remove_css_class("destructive-action") self._connect_btn.add_css_class("suggested-action") self._streaming = False - log.info("Stream stopped, ready to reconnect") def _on_close(self, *args): - log.info("Window closing, cleaning up...") - self._monitor.stop() - if self._stream_mgr: - self._stream_mgr.stop_all() + self._stop_stream() - def _build_agent_output(self): - """Left panel: agent output log.""" - box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) - - label = Gtk.Label(label="Agent Output") - label.add_css_class("heading") - label.set_margin_top(8) - label.set_margin_bottom(8) - box.append(label) - - self._agent_output_view = Gtk.TextView() - self._agent_output_view.set_editable(False) - self._agent_output_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) - self._agent_output_view.set_cursor_visible(False) - self._agent_output_view.set_left_margin(8) - self._agent_output_view.set_right_margin(8) - self._agent_output_view.set_top_margin(4) - self._agent_output_view.set_bottom_margin(4) - - scroll = Gtk.ScrolledWindow() - scroll.set_vexpand(True) - scroll.set_child(self._agent_output_view) - box.append(scroll) - - frame = Gtk.Frame() - frame.set_child(box) - return frame + # -- Right panels -- def _build_right_panels(self): right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) + # Top row: player + waveform placeholder top_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) top_paned.set_shrink_start_child(False) top_paned.set_shrink_end_child(False) - self._monitor = MonitorWidget() + self._monitor = MonitorWidget(self._timeline) self._monitor.set_hexpand(True) stream_frame = Gtk.Frame() stream_frame.set_child(self._monitor) top_paned.set_start_child(stream_frame) - self._waveform_area = self._build_panel("Waveform", height=250, width=200) + self._waveform_area = self._build_placeholder("Waveform", height=250, width=200) top_paned.set_end_child(self._waveform_area) - top_paned.set_position(650) right_box.append(top_paned) + # Shared timeline slider (spans under player + waveform) + self._timeline_controls = TimelineControls(self._timeline) + self._timeline_controls.set_live_toggle_callback(self._on_live_toggle) + right_box.append(self._timeline_controls) + + # Frames extracted self._frames_panel = self._build_frames_panel() right_box.append(self._frames_panel) + # Transcript self._transcript_panel = self._build_transcript_panel() right_box.append(self._transcript_panel) + # Agent input self._agent_input = self._build_agent_input() right_box.append(self._agent_input) return right_box - def _build_panel(self, title, height=200, width=-1): - box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) + def _build_placeholder(self, title, height=200, width=-1): + box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) label = Gtk.Label(label=title) label.add_css_class("heading") label.set_margin_top(4) label.set_margin_bottom(4) box.append(label) - area = Gtk.DrawingArea() area.set_content_height(height) if width > 0: @@ -196,7 +208,6 @@ class ChtWindow(Adw.ApplicationWindow): area.set_vexpand(False) area.set_hexpand(True) box.append(area) - frame = Gtk.Frame() frame.set_child(box) return frame @@ -209,18 +220,18 @@ class ChtWindow(Adw.ApplicationWindow): label.set_margin_bottom(4) box.append(label) - self._frames_flow = Gtk.FlowBox() - self._frames_flow.set_orientation(Gtk.Orientation.HORIZONTAL) - self._frames_flow.set_max_children_per_line(20) - self._frames_flow.set_min_children_per_line(1) - self._frames_flow.set_selection_mode(Gtk.SelectionMode.MULTIPLE) - self._frames_flow.set_homogeneous(True) + # Horizontal scrolling strip — storyboard style + self._frames_scroll = Gtk.ScrolledWindow() + self._frames_scroll.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.NEVER) + self._frames_scroll.set_min_content_height(180) # 144px thumb + label + padding - scroll = Gtk.ScrolledWindow() - scroll.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) - scroll.set_min_content_height(120) - scroll.set_child(self._frames_flow) - box.append(scroll) + self._frames_strip = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) + self._frames_strip.set_margin_start(4) + self._frames_strip.set_margin_end(4) + self._frames_strip.set_margin_top(4) + self._frames_strip.set_margin_bottom(4) + self._frames_scroll.set_child(self._frames_strip) + box.append(self._frames_scroll) frame = Gtk.Frame() frame.set_child(box) @@ -251,6 +262,32 @@ class ChtWindow(Adw.ApplicationWindow): frame.set_child(box) return frame + # -- Agent panels -- + + def _build_agent_output(self): + box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) + label = Gtk.Label(label="Agent Output") + label.add_css_class("heading") + label.set_margin_top(8) + label.set_margin_bottom(8) + box.append(label) + + self._agent_output_view = Gtk.TextView() + self._agent_output_view.set_editable(False) + self._agent_output_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) + self._agent_output_view.set_cursor_visible(False) + self._agent_output_view.set_left_margin(8) + self._agent_output_view.set_right_margin(8) + + scroll = Gtk.ScrolledWindow() + scroll.set_vexpand(True) + scroll.set_child(self._agent_output_view) + box.append(scroll) + + frame = Gtk.Frame() + frame.set_child(box) + return frame + def _build_agent_input(self): box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) box.set_margin_start(4) @@ -261,45 +298,29 @@ class ChtWindow(Adw.ApplicationWindow): self._input_entry = Gtk.Entry() self._input_entry.set_hexpand(True) self._input_entry.set_placeholder_text("Message agent... (use @ to reference frames/transcripts)") - self._input_entry.connect("activate", self._on_input_activate) + self._input_entry.connect("activate", lambda e: self._send_message()) box.append(self._input_entry) send_btn = Gtk.Button(label="Send") send_btn.add_css_class("suggested-action") - send_btn.connect("clicked", self._on_send_clicked) + send_btn.connect("clicked", lambda b: self._send_message()) box.append(send_btn) frame = Gtk.Frame() frame.set_child(box) return frame - def _on_input_activate(self, entry): - self._send_message() - - def _on_send_clicked(self, button): - self._send_message() - def _send_message(self): text = self._input_entry.get_text().strip() if not text: return buf = self._agent_output_view.get_buffer() - end_iter = buf.get_end_iter() - buf.insert(end_iter, f"\n> {text}\n") + buf.insert(buf.get_end_iter(), f"\n> {text}\n") self._input_entry.set_text("") - def append_agent_output(self, text): - buf = self._agent_output_view.get_buffer() - end_iter = buf.get_end_iter() - buf.insert(end_iter, text + "\n") - - def append_transcript(self, entry_id, text): - buf = self._transcript_view.get_buffer() - end_iter = buf.get_end_iter() - buf.insert(end_iter, f"[{entry_id}] {text}\n") + # -- Frame thumbnails -- def _poll_frames(self): - """Check for new extracted frames and add thumbnails.""" if not self._stream_mgr: return False @@ -308,8 +329,7 @@ class ChtWindow(Adw.ApplicationWindow): return True try: - with open(index_path) as f: - index = json.load(f) + index = json.loads(index_path.read_text()) except (json.JSONDecodeError, IOError): return True @@ -322,31 +342,39 @@ class ChtWindow(Adw.ApplicationWindow): continue self._known_frames.add(fid) + timestamp = entry.get("timestamp", 0) try: - pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale( - str(fpath), 160, 90, True - ) - self._add_frame_thumbnail(fid, pixbuf, entry.get("timestamp")) + pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(str(fpath), 256, 144, True) + self._add_frame_thumbnail(fid, pixbuf, timestamp) except Exception as e: - log.warning("Failed to load thumbnail for %s: %s", fid, e) + log.warning("Thumbnail load failed for %s: %s", fid, e) - return True # keep polling + return True - def _add_frame_thumbnail(self, frame_id, pixbuf, timestamp=None): - box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=1) + def _add_frame_thumbnail(self, frame_id, pixbuf, timestamp): + box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) + box.set_size_request(256, -1) img = Gtk.Image.new_from_pixbuf(pixbuf) + img.set_size_request(256, 144) + img.set_vexpand(False) box.append(img) - label_text = frame_id - if timestamp is not None: - m, s = divmod(int(timestamp), 60) - label_text = f"{frame_id} [{m:02d}:{s:02d}]" - - label = Gtk.Label(label=label_text) + m, s = divmod(int(timestamp), 60) + label = Gtk.Label(label=f"{frame_id} [{m:02d}:{s:02d}]") label.add_css_class("caption") label.set_ellipsize(Pango.EllipsizeMode.END) box.append(label) - self._frames_flow.append(box) - log.info("Added thumbnail: %s", frame_id) + # Click to seek into scrub mode + gesture = Gtk.GestureClick() + gesture.connect("released", lambda g, n, x, y: self._timeline.seek(timestamp)) + box.add_controller(gesture) + + self._frames_strip.append(box) + + # Auto-scroll to show the latest frame + adj = self._frames_scroll.get_hadjustment() + GLib.idle_add(lambda: adj.set_value(adj.get_upper()) or False) + + log.info("Thumbnail: %s at %.1fs", frame_id, timestamp) diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index 93084d8..c95128e 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -41,11 +41,11 @@ class TestReceiveAndRecord: ) assert "-hide_banner" in node.compile() - def test_mpegts_format(self, tmp_path): + def test_matroska_format(self, tmp_path): node = ff.receive_and_record( - "tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts" + "tcp://0.0.0.0:4444?listen", tmp_path / "rec.mkv" ) - assert "mpegts" in node.compile() + assert "matroska" in node.compile() class TestExtractSceneFrames: diff --git a/tests/test_manager.py b/tests/test_manager.py index b708337..a08719b 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,4 +1,4 @@ -"""Tests for cht.stream.manager — StreamManager orchestration.""" +"""Tests for cht.stream.manager — StreamManager.""" import json import time @@ -18,14 +18,12 @@ def manager(tmp_path): class TestInit: - def test_session_id_default(self, tmp_path): - with patch("cht.stream.manager.SESSIONS_DIR", tmp_path): - mgr = StreamManager() - assert mgr.session_id - def test_session_id_custom(self, manager): assert manager.session_id == "test_session" + def test_recording_path(self, manager): + assert manager.recording_path.name == "recording.mkv" + def test_dirs_not_created_on_init(self, manager): assert not manager.stream_dir.exists() @@ -38,52 +36,25 @@ class TestSetupDirs: assert manager.transcript_dir.is_dir() assert manager.agent_dir.is_dir() - def test_idempotent(self, manager): - manager.setup_dirs() - manager.setup_dirs() - assert manager.stream_dir.is_dir() - - -class TestStreamUrl: - def test_default_url(self, manager): - assert "0.0.0.0" in manager.stream_url - assert "4444" in manager.stream_url - assert "listen" in manager.stream_url - - -class TestRecordingPath: - def test_is_in_stream_dir(self, manager): - assert manager.recording_path.parent == manager.stream_dir - assert manager.recording_path.name == "recording.ts" - class TestStartRecorder: @patch("cht.stream.manager.ff.run_async") @patch("cht.stream.manager.ff.receive_and_record") - def test_calls_ffmpeg_module(self, mock_record, mock_async, manager): + def test_starts_ffmpeg(self, mock_record, mock_async, manager): manager.setup_dirs() - mock_node = MagicMock() - mock_record.return_value = mock_node - + mock_record.return_value = MagicMock() manager.start_recorder() - - mock_record.assert_called_once_with( - manager.stream_url, manager.recording_path, - ) - mock_async.assert_called_once_with(mock_node, pipe_stderr=True) + mock_record.assert_called_once_with(manager.stream_url, manager.recording_path) assert "recorder" in manager._procs class TestStopAll: @patch("cht.stream.manager.ff.stop_proc") def test_stops_all_procs(self, mock_stop, manager): - proc1, proc2 = MagicMock(), MagicMock() - manager._procs = {"a": proc1, "b": proc2} - + proc = MagicMock() + manager._procs = {"recorder": proc} manager.stop_all() - - mock_stop.assert_any_call(proc1) - mock_stop.assert_any_call(proc2) + mock_stop.assert_called_with(proc) assert len(manager._procs) == 0 def test_sets_stop_flag(self, manager): @@ -91,89 +62,41 @@ class TestStopAll: assert "stop" in manager._stop_flags -class TestExtractNewFrames: +class TestDetectScenes: @patch("cht.stream.manager.ff.extract_scene_frames") - def test_calls_ffmpeg_with_start_time(self, mock_extract, manager): + def test_returns_new_frames(self, mock_extract, manager): manager.setup_dirs() rec = manager.recording_path rec.touch() - mock_extract.return_value = ("", "") - manager._extract_new_frames(rec, start_time=10.0, start_number=5) - - mock_extract.assert_called_once_with( - rec, - manager.frames_dir, - scene_threshold=0.3, - max_interval=30, - start_number=5, - start_time=10.0, - ) - - @patch("cht.stream.manager.ff.extract_scene_frames") - def test_indexes_new_frames(self, mock_extract, manager): - manager.setup_dirs() - rec = manager.recording_path - rec.touch() - - # Simulate ffmpeg creating a frame file during extraction - def create_frame_and_return(*args, **kwargs): + def create_frame(*args, **kwargs): (manager.frames_dir / "F0001.jpg").touch() - return ("", "[Parsed_showinfo_1 @ 0x1] n:0 pts:1000 pts_time:10.5 stuff\n") + return ("", "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:10.5 stuff\n") - mock_extract.side_effect = create_frame_and_return + mock_extract.side_effect = create_frame - count, max_ts = manager._extract_new_frames(rec, start_number=1) - assert count == 1 - assert max_ts == 10.5 - - index_path = manager.frames_dir / "index.json" - with open(index_path) as f: - index = json.load(f) - assert len(index) == 1 - assert index[0]["id"] == "F0001" - assert index[0]["timestamp"] == 10.5 + frames = manager._detect_scenes(start_time=0, end_time=15, start_number=1) + assert len(frames) == 1 + assert frames[0]["id"] == "F0001" + assert frames[0]["timestamp"] == 10.5 @patch("cht.stream.manager.ff.extract_scene_frames") - def test_handles_ffmpeg_failure(self, mock_extract, manager): - manager.setup_dirs() - rec = manager.recording_path - rec.touch() - - mock_extract.side_effect = RuntimeError("ffmpeg died") - count, max_ts = manager._extract_new_frames(rec) - assert count == 0 - - @patch("cht.stream.manager.ff.extract_scene_frames") - def test_skips_preexisting_frames(self, mock_extract, manager): - manager.setup_dirs() - rec = manager.recording_path - rec.touch() - - # Pre-existing frame - (manager.frames_dir / "F0001.jpg").touch() - - # ffmpeg "creates" no new files, just returns showinfo for existing - stderr = "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:5.0 stuff\n" - mock_extract.return_value = ("", stderr) - - count, _ = manager._extract_new_frames(rec, start_number=1) - # F0001 already existed before extraction, should not be counted - assert count == 0 - - -class TestSceneDetector: - @patch("cht.stream.manager.ff.extract_scene_frames") - def test_detects_growing_file(self, mock_extract, manager): + def test_passes_duration(self, mock_extract, manager): manager.setup_dirs() + manager.recording_path.touch() mock_extract.return_value = ("", "") - # Create recording with some data - rec = manager.recording_path - rec.write_bytes(b"\x00" * 200_000) + manager._detect_scenes(start_time=10, end_time=25, start_number=1) - manager.start_scene_detector() - time.sleep(12) # wait for one cycle - manager.stop_all() + call_kwargs = mock_extract.call_args + assert call_kwargs.kwargs["start_time"] == 10 + assert call_kwargs.kwargs["duration"] == 15 - mock_extract.assert_called() + @patch("cht.stream.manager.ff.extract_scene_frames") + def test_handles_failure(self, mock_extract, manager): + manager.setup_dirs() + manager.recording_path.touch() + mock_extract.side_effect = RuntimeError("boom") + + frames = manager._detect_scenes(start_time=0, end_time=10, start_number=1) + assert frames == [] diff --git a/tests/test_timeline.py b/tests/test_timeline.py new file mode 100644 index 0000000..4ebfbbb --- /dev/null +++ b/tests/test_timeline.py @@ -0,0 +1,47 @@ +"""Tests for cht.ui.timeline — Timeline state machine.""" + +import pytest +from unittest.mock import MagicMock + +# Skip GTK import for headless testing +import sys +from unittest.mock import MagicMock as _MM + +# Mock gi modules for headless testing +gi_mock = _MM() +gi_mock.require_version = _MM() +gtk_mock = _MM() +gobject_mock = _MM() + +# GObject.Object needs __gsignals__ support +class FakeGObject: + def __init__(self): + self._signals = {} + def emit(self, signal, *args): + for cb in self._signals.get(signal, []): + cb(self, *args) + def connect(self, signal, cb): + self._signals.setdefault(signal, []).append(cb) + +# We test the logic, not the GTK widgets +# Import after mocking would be complex, so test the state logic directly + + +try: + from cht.ui.timeline import State + HAS_GI = True +except ImportError: + HAS_GI = False + + +@pytest.mark.skipif(not HAS_GI, reason="GTK/gi not available") +class TestTimelineState: + def test_initial_state(self): + assert State.WAITING.name == "WAITING" + assert State.LIVE.name == "LIVE" + assert State.PLAYING.name == "PLAYING" + assert State.PAUSED.name == "PAUSED" + + def test_state_values_are_unique(self): + values = [s.value for s in State] + assert len(values) == len(set(values)) diff --git a/tests/test_tracker.py b/tests/test_tracker.py new file mode 100644 index 0000000..39cd1e1 --- /dev/null +++ b/tests/test_tracker.py @@ -0,0 +1,38 @@ +"""Tests for cht.stream.tracker — RecordingTracker.""" + +import time +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +from cht.stream.tracker import RecordingTracker + + +class TestRecordingTracker: + def test_initial_duration_is_zero(self, tmp_path): + tracker = RecordingTracker(tmp_path / "rec.ts") + assert tracker.duration == 0.0 + + def test_callback_called_on_update(self, tmp_path): + rec = tmp_path / "rec.ts" + rec.write_bytes(b"\x00" * 100_000) + + cb = MagicMock() + tracker = RecordingTracker(rec, on_duration_update=cb) + + with patch.object(tracker, "_probe_duration", return_value=10.0): + tracker.start() + time.sleep(3) + tracker.stop() + + cb.assert_called() + assert cb.call_args[0][0] > 0 + + def test_no_callback_if_file_missing(self, tmp_path): + cb = MagicMock() + tracker = RecordingTracker(tmp_path / "nonexistent.ts", on_duration_update=cb) + tracker.start() + time.sleep(3) + tracker.stop() + cb.assert_not_called()