From 68802db15c30ed771f9e1e30af3f0f8a708e49f5 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Wed, 1 Apr 2026 16:26:25 -0300 Subject: [PATCH] some changes --- .gitignore | 1 + cht/config.py | 5 +- cht/stream/ffmpeg.py | 98 +++------------ cht/stream/manager.py | 287 +++++++++++++++++++----------------------- cht/ui/monitor.py | 158 ++++++++++++++++++----- cht/ui/mpv.py | 91 ++++++++------ cht/window.py | 97 +++++++++++--- pyproject.toml | 2 + tests/test_ffmpeg.py | 101 ++++----------- tests/test_manager.py | 227 ++++++++++----------------------- 10 files changed, 500 insertions(+), 567 deletions(-) diff --git a/.gitignore b/.gitignore index c7b2807..2ae2849 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ def +data/ *.egg-info/ __pycache__/ .venv/ diff --git a/cht/config.py b/cht/config.py index 1df1ab9..9548725 100644 --- a/cht/config.py +++ b/cht/config.py @@ -3,8 +3,9 @@ from pathlib import Path APP_ID = "com.cht.StreamAgent" APP_NAME = "CHT" -# Default session data location -DATA_DIR = Path.home() / ".local" / "share" / "cht" +# Default session data location — in project dir for easy clearing +PROJECT_DIR = Path(__file__).resolve().parent.parent +DATA_DIR = PROJECT_DIR / "data" SESSIONS_DIR = DATA_DIR / "sessions" # Stream defaults diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index f632d43..b8cca53 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -1,17 +1,13 @@ """ Thin wrapper around ffmpeg-python for building and running ffmpeg pipelines. -All ffmpeg command construction goes through this module so manager.py -and other consumers never build raw CLI arg lists. - +All ffmpeg command construction goes through this module. Uses ffmpeg-python's own run/run_async for subprocess management. """ import logging -import os import signal import subprocess -from pathlib import Path import ffmpeg @@ -20,84 +16,37 @@ log = logging.getLogger(__name__) GLOBAL_ARGS = ("-hide_banner", "-loglevel", "warning") -def receive_to_pipe(stream_url, segment_dir=None, segment_duration=60): - """Receive mpegts stream and pipe to stdout for mpv. +def receive_and_record(stream_url, output_path): + """Receive mpegts stream and write to a single growing file. - If segment_dir is provided, also saves segments to disk. - Uses pipe (not fifo) so OS kernel buffers prevent blocking. + mpv reads this file for DVR-style playback. + ffmpeg scene detection runs on this file for frame extraction. + Audio is preserved in the recording (muxed mpegts). """ stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") - - out_pipe = ffmpeg.output(stream, "pipe:", c="copy", f="mpegts") - - if segment_dir: - out_segments = ffmpeg.output( - stream, - str(segment_dir / "segment_%04d.ts"), - c="copy", - f="segment", - segment_time=segment_duration, - reset_timestamps=1, - ) - return ffmpeg.merge_outputs(out_pipe, out_segments).global_args(*GLOBAL_ARGS) - - return out_pipe.global_args(*GLOBAL_ARGS) - - -def receive_and_segment(stream_url, segment_dir, segment_duration=60): - """Receive mpegts stream and save as segmented .ts files.""" - stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") return ( - ffmpeg.output( - stream, - str(segment_dir / "segment_%04d.ts"), - c="copy", - f="segment", - segment_time=segment_duration, - reset_timestamps=1, - ) + ffmpeg.output(stream, str(output_path), c="copy", f="mpegts") .global_args(*GLOBAL_ARGS) ) -def receive_and_segment_with_monitor(stream_url, segment_dir, fifo_path, segment_duration=60): - """Receive stream, save segments AND tee to a named pipe for monitoring.""" - if not fifo_path.exists(): - os.mkfifo(str(fifo_path)) - - stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") - - out_segments = ffmpeg.output( - stream, - str(segment_dir / "segment_%04d.ts"), - c="copy", - f="segment", - segment_time=segment_duration, - reset_timestamps=1, - ) - - out_monitor = ffmpeg.output( - stream, - str(fifo_path), - c="copy", - f="mpegts", - ) - - return ffmpeg.merge_outputs(out_segments, out_monitor).global_args(*GLOBAL_ARGS) - - def extract_scene_frames(input_path, output_dir, scene_threshold=0.3, - max_interval=30, start_number=1): + max_interval=30, start_number=1, start_time=0.0): """Extract frames from a file on scene change. Uses ffmpeg select filter with scene detection and a max-interval fallback. - Returns (stdout bytes, stderr bytes) for timestamp parsing. + start_time: skip to this position before processing (avoids re-scanning). + Returns (stdout, stderr) as decoded strings for timestamp parsing. """ select_expr = ( f"gt(scene,{scene_threshold})" f"+gte(t-prev_selected_t,{max_interval})" ) - stream = ffmpeg.input(str(input_path)) + input_opts = {} + if start_time > 0: + input_opts["ss"] = str(start_time) + + stream = ffmpeg.input(str(input_path), **input_opts) stream = stream.filter("select", select_expr).filter("showinfo") output = ( @@ -116,23 +65,6 @@ def extract_scene_frames(input_path, output_dir, scene_threshold=0.3, return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") -def extract_audio_pcm(input_path): - """Extract audio as 16kHz mono PCM wav, returning an output node for piping.""" - stream = ffmpeg.input(str(input_path)) - return ( - ffmpeg.output( - stream.audio, - "pipe:", - vn=None, - acodec="pcm_s16le", - ar=16000, - ac=1, - f="wav", - ) - .global_args(*GLOBAL_ARGS) - ) - - def run_async(output_node, pipe_stdout=False, pipe_stderr=False): """Start an ffmpeg pipeline asynchronously via ffmpeg-python's run_async.""" log.info("run_async: %s", " ".join(output_node.compile())) diff --git a/cht/stream/manager.py b/cht/stream/manager.py index 6a0d719..fb9c851 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -1,14 +1,15 @@ """ StreamManager: orchestrates ffmpeg pipelines for receiving, recording, -frame extraction, and audio extraction from a muxed mpegts/TCP stream. +and frame extraction from a muxed mpegts/TCP stream. -All data goes to disk. UI reads from disk. -All ffmpeg commands go through cht.stream.ffmpeg module. +Architecture: + sender → TCP:4444 → ffmpeg (writes growing recording.ts) + └→ mpv plays recording.ts (DVR: live edge + scrub) + └→ ffmpeg scene detection (periodic on recording) """ import json import logging -import os import re import time from pathlib import Path @@ -17,7 +18,6 @@ from threading import Thread from cht.config import ( STREAM_HOST, STREAM_PORT, - SEGMENT_DURATION, SCENE_THRESHOLD, MAX_FRAME_INTERVAL, SESSIONS_DIR, @@ -52,10 +52,129 @@ class StreamManager: def stream_url(self): return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" - def start_all(self): - self.setup_dirs() - self.start_recorder() - self.start_frame_extractor() + @property + def recording_path(self): + return self.stream_dir / "recording.ts" + + # -- Recording -- + + def start_recorder(self): + """Start ffmpeg to receive TCP stream and write to recording.ts.""" + node = ff.receive_and_record(self.stream_url, self.recording_path) + proc = ff.run_async(node, pipe_stderr=True) + self._procs["recorder"] = proc + log.info("Recorder started: pid=%s url=%s → %s", proc.pid, self.stream_url, self.recording_path) + self._start_stderr_reader("recorder", proc) + + # -- Scene detection -- + + def start_scene_detector(self): + """Periodically run ffmpeg scene detection on the growing recording. + + Tracks how far we've processed to avoid re-scanning from the start. + """ + log.info("Starting scene detector (threshold=%.2f, interval=%ds)", + SCENE_THRESHOLD, MAX_FRAME_INTERVAL) + + def _detect(): + last_processed_size = 0 + processed_duration = 0.0 # seconds already processed + frame_count = 0 + + while "stop" not in self._stop_flags: + time.sleep(10) + if not self.recording_path.exists(): + continue + size = self.recording_path.stat().st_size + if size <= last_processed_size or size < 100_000: + continue + + log.info("Recording grew: %d → %d bytes, scanning from %.1fs", + last_processed_size, size, processed_duration) + last_processed_size = size + + try: + new_count, new_duration = self._extract_new_frames( + self.recording_path, + start_time=processed_duration, + start_number=frame_count + 1, + ) + if new_count > 0: + frame_count += new_count + log.info("Found %d new frames (total: %d)", new_count, frame_count) + if new_duration > processed_duration: + processed_duration = new_duration + except Exception as e: + log.error("Scene detection failed: %s", e) + + log.info("Scene detector stopped") + + t = Thread(target=_detect, daemon=True, name="scene_detector") + t.start() + self._threads["scene_detector"] = t + + def _extract_new_frames(self, path, start_time=0.0, start_number=1): + """Extract scene-change frames starting from a given timestamp. + + Returns (new_frame_count, max_timestamp_seen). + """ + existing_before = set(f.name for f in self.frames_dir.glob("F*.jpg")) + + try: + _stdout, stderr = ff.extract_scene_frames( + path, + self.frames_dir, + scene_threshold=SCENE_THRESHOLD, + max_interval=MAX_FRAME_INTERVAL, + start_number=start_number, + start_time=start_time, + ) + except Exception as e: + log.error("ffmpeg scene extraction error: %s", e) + return 0, start_time + + if stderr: + for line in stderr.splitlines()[:5]: + log.debug("[scene_detect:stderr] %s", line) + + # Parse timestamps and update index + max_ts = start_time + new_count = 0 + index_path = self.frames_dir / "index.json" + if index_path.exists(): + with open(index_path) as f: + index = json.load(f) + else: + index = [] + + frame_num = start_number + for line in stderr.splitlines(): + if "showinfo" not in line: + continue + pts_match = re.search(r"pts_time:\s*([\d.]+)", line) + if pts_match: + pts_time = float(pts_match.group(1)) + frame_id = f"F{frame_num:04d}" + frame_path = self.frames_dir / f"{frame_id}.jpg" + if frame_path.exists() and frame_path.name not in existing_before: + index.append({ + "id": frame_id, + "timestamp": pts_time, + "path": str(frame_path), + "sent_to_agent": False, + }) + log.info("Indexed frame %s at pts=%.2f", frame_id, pts_time) + new_count += 1 + if pts_time > max_ts: + max_ts = pts_time + frame_num += 1 + + with open(index_path, "w") as f: + json.dump(index, f, indent=2) + + return new_count, max_ts + + # -- Lifecycle -- def stop_all(self): log.info("Stopping all processes...") @@ -66,44 +185,7 @@ class StreamManager: self._procs.clear() log.info("All processes stopped") - def start_recorder(self): - node = ff.receive_and_segment( - self.stream_url, self.stream_dir, SEGMENT_DURATION, - ) - proc = ff.run_async(node, pipe_stderr=True) - self._procs["recorder"] = proc - log.info("Recorder started: pid=%s url=%s", proc.pid, self.stream_url) - self._start_stderr_reader("recorder", proc) - - def start_receiver_pipe(self): - """Receive stream, pipe stdout to mpv, save segments to disk.""" - self.setup_dirs() - node = ff.receive_to_pipe( - self.stream_url, - segment_dir=self.stream_dir, - segment_duration=SEGMENT_DURATION, - ) - proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True) - self._procs["receiver"] = proc - log.info("Receiver started: pid=%s url=%s (pipe + segments)", proc.pid, self.stream_url) - self._start_stderr_reader("receiver", proc) - return proc - - def start_recorder_with_monitor(self): - self.setup_dirs() - fifo_path = self.session_dir / "monitor.pipe" - - node = ff.receive_and_segment_with_monitor( - self.stream_url, self.stream_dir, fifo_path, SEGMENT_DURATION, - ) - proc = ff.run_async(node, pipe_stderr=True) - self._procs["recorder"] = proc - log.info("Recorder+monitor started: pid=%s url=%s fifo=%s", proc.pid, self.stream_url, fifo_path) - self._start_stderr_reader("recorder", proc) - return fifo_path - def _start_stderr_reader(self, name, proc): - """Read stderr from a process in a thread and log it.""" def _read(): try: for line in proc.stderr: @@ -117,116 +199,3 @@ class StreamManager: t = Thread(target=_read, daemon=True, name=f"{name}_stderr") t.start() - - def start_frame_extractor_on_recording(self, recording_path): - """Extract frames periodically from a growing recording file.""" - log.info("Starting frame extractor on recording: %s", recording_path) - self._recording_path = recording_path - self._start_recording_frame_watcher() - - def _start_recording_frame_watcher(self): - def _watch(): - last_size = 0 - log.info("Recording frame watcher running, watching %s", self._recording_path) - while "stop" not in self._stop_flags: - if self._recording_path.exists(): - size = self._recording_path.stat().st_size - if size > last_size and size > 100_000: # wait for some data - log.info("Recording grew: %d -> %d bytes, extracting frames", last_size, size) - last_size = size - self._extract_frames_from_file(self._recording_path) - time.sleep(10) # check every 10s - log.info("Recording frame watcher stopped") - - t = Thread(target=_watch, daemon=True, name="recording_frame_watcher") - t.start() - self._threads["recording_frame_watcher"] = t - - def start_frame_extractor(self): - log.info("Starting frame watcher...") - self._start_frame_watcher() - - def _start_frame_watcher(self): - def _watch(): - seen = set() - log.info("Frame watcher running, watching %s", self.stream_dir) - while "stop" not in self._stop_flags: - segments = sorted(self.stream_dir.glob("segment_*.ts")) - for seg in segments: - if seg.name not in seen and seg.stat().st_size > 0: - seen.add(seg.name) - log.info("New segment found: %s (%d bytes)", seg.name, seg.stat().st_size) - self._extract_frames_from_file(seg) - time.sleep(2) - log.info("Frame watcher stopped") - - t = Thread(target=_watch, daemon=True, name="frame_watcher") - t.start() - self._threads["frame_watcher"] = t - - def _extract_frames_from_file(self, segment_path): - existing = list(self.frames_dir.glob("F*.jpg")) - start_num = len(existing) + 1 - log.info("Extracting frames from %s (start_num=%d)", segment_path.name, start_num) - - try: - _stdout, stderr = ff.extract_scene_frames( - segment_path, - self.frames_dir, - scene_threshold=SCENE_THRESHOLD, - max_interval=MAX_FRAME_INTERVAL, - start_number=start_num, - ) - if stderr: - for line in stderr.splitlines()[:10]: - log.debug("[frame_extract:stderr] %s", line) - self._parse_frame_timestamps(stderr, start_num) - new_frames = list(self.frames_dir.glob("F*.jpg")) - log.info("Frame extraction done: %d new frames", len(new_frames) - len(existing)) - except Exception as e: - log.error("Frame extraction failed for %s: %s", segment_path.name, e) - - def _parse_frame_timestamps(self, stderr_output, start_num): - index_path = self.frames_dir / "index.json" - if index_path.exists(): - with open(index_path) as f: - index = json.load(f) - else: - index = [] - - frame_num = start_num - for line in stderr_output.splitlines(): - if "showinfo" not in line: - continue - pts_match = re.search(r"pts_time:\s*([\d.]+)", line) - if pts_match: - pts_time = float(pts_match.group(1)) - frame_id = f"F{frame_num:04d}" - frame_path = self.frames_dir / f"{frame_id}.jpg" - if frame_path.exists(): - index.append({ - "id": frame_id, - "timestamp": pts_time, - "path": str(frame_path), - "sent_to_agent": False, - }) - log.info("Indexed frame %s at pts=%.2f", frame_id, pts_time) - frame_num += 1 - - with open(index_path, "w") as f: - json.dump(index, f, indent=2) - - def start_audio_extractor(self): - """Will be implemented in Phase 3.""" - pass - - def get_ffplay_cmd(self): - fifo_path = self.session_dir / "monitor.pipe" - return [ - "ffplay", - "-hwaccel", "cuda", - "-fflags", "nobuffer", - "-flags", "low_delay", - "-framedrop", - "-i", str(fifo_path), - ], fifo_path diff --git a/cht/ui/monitor.py b/cht/ui/monitor.py index e93c358..1a188e8 100644 --- a/cht/ui/monitor.py +++ b/cht/ui/monitor.py @@ -1,97 +1,193 @@ """ -MonitorWidget: mpv-based live stream monitor embedded in GTK4. +MonitorWidget: mpv-based stream monitor embedded in GTK4 via OpenGL. -Uses libmpv's OpenGL render API + Gtk.GLArea for proper embedding. -No X11 wid hacks — renders directly to a GL texture in the GTK layout. -Works on both X11 and Wayland. +Supports DVR-style playback of a growing recording file: + - Follows live edge by default + - Slider scrubs video + audio together + - Can capture frame at current cursor position """ +import ctypes import logging import gi gi.require_version("Gtk", "4.0") -from gi.repository import Gtk, GLib, Gdk +from gi.repository import Gtk, GLib from cht.ui.mpv import Player log = logging.getLogger(__name__) +# Cache libGL reference +_libGL = ctypes.cdll.LoadLibrary("libGL.so.1") +GL_DRAW_FRAMEBUFFER_BINDING = 0x8CA6 + class MonitorWidget(Gtk.Box): - """Widget that embeds mpv video via OpenGL into the GTK4 layout.""" + """Embedded mpv video player with DVR controls.""" def __init__(self, **kwargs): super().__init__(orientation=Gtk.Orientation.VERTICAL, **kwargs) self._player = None + self._following_live = True + self._slider_updating = False + # GL area for video self._gl_area = Gtk.GLArea() self._gl_area.set_hexpand(True) self._gl_area.set_vexpand(True) self._gl_area.set_auto_render(False) self._gl_area.set_has_depth_buffer(False) self._gl_area.set_has_stencil_buffer(False) - self._gl_area.connect("realize", self._on_realize) self._gl_area.connect("unrealize", self._on_unrealize) self._gl_area.connect("render", self._on_render) - self.append(self._gl_area) - log.info("MonitorWidget initialized (GLArea)") + + # Slider for scrubbing (shared timeline for video + audio) + slider_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) + slider_box.set_margin_start(4) + slider_box.set_margin_end(4) + slider_box.set_margin_bottom(2) + + self._time_label = Gtk.Label(label="00:00") + self._time_label.set_width_chars(6) + slider_box.append(self._time_label) + + self._slider = Gtk.Scale(orientation=Gtk.Orientation.HORIZONTAL) + self._slider.set_hexpand(True) + self._slider.set_range(0, 1) + self._slider.set_draw_value(False) + self._slider.connect("value-changed", self._on_slider_changed) + slider_box.append(self._slider) + + self._duration_label = Gtk.Label(label="00:00") + self._duration_label.set_width_chars(6) + slider_box.append(self._duration_label) + + self._live_btn = Gtk.Button(label="LIVE") + self._live_btn.add_css_class("suggested-action") + self._live_btn.connect("clicked", self._on_live_clicked) + slider_box.append(self._live_btn) + + self.append(slider_box) + + # Update slider position periodically + GLib.timeout_add(500, self._update_slider) + + log.info("MonitorWidget initialized (GLArea + slider)") + + # -- GL callbacks -- def _on_realize(self, gl_area): - """GL context is ready — initialize mpv's render context.""" log.info("GLArea realized") gl_area.make_current() if gl_area.get_error(): log.error("GLArea error: %s", gl_area.get_error()) - return def _on_unrealize(self, gl_area): - """Clean up mpv render context.""" log.info("GLArea unrealized") - if self._player: - self._player.terminate() - self._player = None + self.stop() def _on_render(self, gl_area, gl_context): - """Render mpv's current frame to the GLArea.""" if not self._player: return True - # Get the default FBO that GTK4 GLArea renders to - fbo = gl_area.get_buffer() if hasattr(gl_area, 'get_buffer') else 0 width = gl_area.get_width() height = gl_area.get_height() - # GTK4 GLArea uses its own FBO, get it from GL state - import ctypes fbo_id = ctypes.c_int(0) - gl = ctypes.cdll.LoadLibrary("libGL.so.1") - gl.glGetIntegerv(0x8CA6, ctypes.byref(fbo_id)) # GL_DRAW_FRAMEBUFFER_BINDING + _libGL.glGetIntegerv(GL_DRAW_FRAMEBUFFER_BINDING, ctypes.byref(fbo_id)) self._player.render(fbo_id.value, width, height) return True def _on_mpv_update(self): - """Called by mpv when a new frame is ready. Triggers re-render.""" GLib.idle_add(self._gl_area.queue_render) - def start_stream(self, source, record_path=None): - """Start playing from a URL and optionally record to disk. + # -- Slider -- + + def _on_slider_changed(self, slider): + if self._slider_updating or not self._player: + return + pos = slider.get_value() + self._player.seek(pos) + self._following_live = False + self._live_btn.remove_css_class("suggested-action") + + def _on_live_clicked(self, button): + if self._player and self._player.duration: + self._player.seek(self._player.duration - 0.5) + self._following_live = True + self._live_btn.add_css_class("suggested-action") + + def _update_slider(self): + if not self._player: + return True + + pos = self._player.time_pos + dur = self._player.duration + if pos is not None and dur is not None and dur > 0: + self._slider_updating = True + self._slider.set_range(0, dur) + self._slider.set_value(pos) + self._slider_updating = False + + self._time_label.set_text(self._fmt_time(pos)) + self._duration_label.set_text(self._fmt_time(dur)) + + # Auto-follow live edge: if at EOF or falling behind, reload + if self._following_live: + if self._player.idle or dur - pos > 3: + self._reload_live() + + return True # keep timer running + + def _reload_live(self): + """Reload the growing file and seek to near-end (live edge).""" + if not self._player or not self._recording_path: + return + self._player.play(str(self._recording_path)) + # Small delay then seek to end + GLib.timeout_add(500, self._seek_to_end_once) + + @staticmethod + def _fmt_time(seconds): + m, s = divmod(int(seconds), 60) + h, m = divmod(m, 60) + if h: + return f"{h}:{m:02d}:{s:02d}" + return f"{m:02d}:{s:02d}" + + # -- Public API -- + + def _seek_to_end_once(self): + if self._player and self._player.duration: + self._player.seek(self._player.duration - 0.5) + return False # don't repeat + + def start_recording(self, recording_path): + """Start DVR-style playback of a growing recording file. Args: - source: TCP URL (tcp://...), file path, etc. - record_path: if set, mpv dumps the raw stream to this file + recording_path: path to the .ts file being written by ffmpeg """ + self._recording_path = recording_path self._gl_area.make_current() - self._player = Player(record_path=record_path) + self._player = Player() self._player.init_gl(update_callback=self._on_mpv_update) - self._player.play(source) - log.info("Monitor streaming from: %s (record=%s)", source, record_path) + self._player.play_file(recording_path) + self._following_live = True + self._live_btn.add_css_class("suggested-action") + log.info("Monitor playing recording: %s", recording_path) + + def screenshot(self, path): + """Capture frame at current cursor position.""" + if self._player: + self._player.screenshot(path) def stop(self): - """Stop playback and release mpv.""" if self._player: log.info("Stopping monitor") self._player.terminate() diff --git a/cht/ui/mpv.py b/cht/ui/mpv.py index 777d833..2064627 100644 --- a/cht/ui/mpv.py +++ b/cht/ui/mpv.py @@ -2,7 +2,10 @@ MPV wrapper using python-mpv (libmpv bindings) with OpenGL render API. Renders video frames to an OpenGL context provided by GTK4's GLArea. -No subprocess calls, no X11 wid hacks. +Supports DVR-style playback of a growing recording file: + - Follow live edge (default) + - Scrub back to any point + - Audio + video synced via single slider """ import ctypes @@ -30,40 +33,50 @@ _get_proc_address = _make_get_proc_address() class Player: - """Wraps a libmpv player with OpenGL render context for GTK4 embedding.""" + """Wraps a libmpv player with OpenGL render context for GTK4 embedding. - def __init__(self, record_path=None): + Designed for DVR-style playback of a growing file: + - play_file() opens the recording and seeks to end (live edge) + - seek() scrubs to any position (audio + video move together) + - time_pos / duration track playback state for the slider + """ + + def __init__(self): opts = { "input_default_bindings": False, "input_vo_keyboard": False, "osc": False, - "profile": "low-latency", - "cache": "no", - "untimed": True, - "demuxer_thread": "no", - "demuxer_lavf_o": "fflags=+nobuffer", - "video_sync": "display-desync", "vo": "libmpv", "hwdec": "auto", + "video_sync": "display-desync", + # DVR: keep alive at EOF, wait for more data + "keep_open": "yes", + "demuxer_max_bytes": "500MiB", + "demuxer_readahead_secs": "5", + # Allow re-reading growing file + "demuxer_cache_wait": True, } - if record_path: - opts["stream_record"] = str(record_path) - log.info("mpv will record stream to: %s", record_path) - log.info("Creating mpv player (OpenGL render mode)") - self._player = libmpv.MPV(**opts) + log.info("Creating mpv player (OpenGL render, DVR mode)") + self._player = libmpv.MPV(log_handler=self._mpv_log, loglevel="v", **opts) self._ctx = None self._update_callback = None log.info("mpv player created") + @staticmethod + def _mpv_log(loglevel, component, message): + msg = f"[mpv/{component}] {message.strip()}" + if loglevel in ("fatal", "error"): + log.error(msg) + elif loglevel == "warn": + log.warning(msg) + else: + log.debug(msg) + def init_gl(self, update_callback): """Initialize the OpenGL render context. - Must be called from a thread with an active GL context (e.g. GLArea realize). - - Args: - update_callback: called by mpv when a new frame is ready to render. - Should trigger a GLArea queue_render(). + Must be called with an active GL context. """ self._update_callback = update_callback self._ctx = libmpv.MpvRenderContext( @@ -73,41 +86,40 @@ class Player: "get_proc_address": _get_proc_address, }, ) - # Keep reference to prevent GC of the ctypes callback self._get_proc_address_ref = _get_proc_address self._ctx.update_cb = self._on_mpv_update log.info("mpv OpenGL render context initialized") def _on_mpv_update(self): - """Called by mpv from any thread when a new frame is available.""" if self._update_callback: self._update_callback() def render(self, fbo, width, height): - """Render the current frame to the given OpenGL FBO. - - Call from the GLArea render signal handler. - """ + """Render current frame to the given OpenGL FBO.""" if self._ctx: self._ctx.render( flip_y=True, - opengl_fbo={ - "fbo": fbo, - "w": width, - "h": height, - }, + opengl_fbo={"fbo": fbo, "w": width, "h": height}, ) def play(self, source): - """Play from a file path or URL.""" + """Play from any source (URL, file path).""" log.info("mpv play: %s", source) self._player.play(str(source)) - def play_fd(self, fd): - """Play from a raw file descriptor.""" - source = f"fd://{fd}" - log.info("mpv play from fd: %s", source) - self._player.play(source) + def play_file(self, path): + """Play a recording file, seeking to the end (live edge).""" + log.info("mpv play_file (DVR): %s", path) + self._player.play(str(path)) + # Seek to end once playback starts + self._player.observe_property("duration", self._seek_to_live_once) + + def _seek_to_live_once(self, name, value): + """Seek to live edge once duration is known, then stop observing.""" + if value and value > 1: + log.info("Seeking to live edge: %.1fs", value) + self._player.seek(value - 0.5, reference="absolute") + self._player.unobserve_property("duration", self._seek_to_live_once) def pause(self): self._player.pause = True @@ -120,13 +132,18 @@ class Player: return self._player.pause def seek(self, seconds): - """Seek to absolute position in seconds.""" + """Seek to absolute position. Audio + video move together.""" self._player.seek(seconds, reference="absolute") def seek_relative(self, seconds): """Seek relative to current position.""" self._player.seek(seconds, reference="relative") + def screenshot(self, path): + """Save current frame as an image file.""" + self._player.screenshot_to_file(str(path), includes="video") + log.debug("Screenshot saved: %s", path) + def stop(self): log.info("mpv stop") self._player.stop() diff --git a/cht/window.py b/cht/window.py index c933b11..055b825 100644 --- a/cht/window.py +++ b/cht/window.py @@ -1,9 +1,12 @@ +import json import logging +from pathlib import Path import gi gi.require_version("Gtk", "4.0") gi.require_version("Adw", "1") -from gi.repository import Gtk, Adw, GLib, Pango +gi.require_version("GdkPixbuf", "2.0") +from gi.repository import Gtk, Adw, GLib, Pango, GdkPixbuf from cht.config import APP_NAME from cht.ui.monitor import MonitorWidget @@ -72,16 +75,32 @@ class ChtWindow(Adw.ApplicationWindow): log.info("Session: %s", self._stream_mgr.session_id) self._stream_mgr.setup_dirs() - # mpv listens on TCP directly (lowest latency, like def scripts) - # and records the raw stream to disk for frame extraction - stream_url = self._stream_mgr.stream_url - record_path = self._stream_mgr.stream_dir / "recording.ts" - log.info("Starting mpv on %s, recording to %s", stream_url, record_path) - self._monitor.start_stream(stream_url, record_path=record_path) - log.info("Monitor started, waiting for sender...") + # 1. ffmpeg receives TCP and writes growing recording.ts + self._stream_mgr.start_recorder() + log.info("Recorder started, waiting for sender...") - self._stream_mgr.start_frame_extractor_on_recording(record_path) - log.info("Frame extractor started") + # 2. mpv plays the recording file (DVR: live edge + scrub) + # Small delay to let ffmpeg create the file + GLib.timeout_add(2000, self._start_playback) + + # 3. ffmpeg scene detection runs periodically on the recording + self._stream_mgr.start_scene_detector() + log.info("Scene detector started") + + # 4. Poll for new frames and show thumbnails + self._known_frames = set() + GLib.timeout_add(3000, self._poll_frames) + + def _start_playback(self): + """Start mpv playback once recording file exists.""" + if self._stream_mgr and self._stream_mgr.recording_path.exists(): + size = self._stream_mgr.recording_path.stat().st_size + if size > 10_000: + self._monitor.start_recording(self._stream_mgr.recording_path) + log.info("Playback started") + return False # stop timer + log.info("Waiting for recording data...") + return True # retry def _stop_stream(self): log.info("Stopping stream...") @@ -279,13 +298,55 @@ class ChtWindow(Adw.ApplicationWindow): end_iter = buf.get_end_iter() buf.insert(end_iter, f"[{entry_id}] {text}\n") - def add_frame_thumbnail(self, frame_id, pixbuf): + def _poll_frames(self): + """Check for new extracted frames and add thumbnails.""" + if not self._stream_mgr: + return False + + index_path = self._stream_mgr.frames_dir / "index.json" + if not index_path.exists(): + return True + + try: + with open(index_path) as f: + index = json.load(f) + except (json.JSONDecodeError, IOError): + return True + + for entry in index: + fid = entry["id"] + if fid in self._known_frames: + continue + fpath = Path(entry["path"]) + if not fpath.exists(): + continue + + self._known_frames.add(fid) + try: + pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale( + str(fpath), 160, 90, True + ) + self._add_frame_thumbnail(fid, pixbuf, entry.get("timestamp")) + except Exception as e: + log.warning("Failed to load thumbnail for %s: %s", fid, e) + + return True # keep polling + + def _add_frame_thumbnail(self, frame_id, pixbuf, timestamp=None): + box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=1) + img = Gtk.Image.new_from_pixbuf(pixbuf) - overlay = Gtk.Overlay() - overlay.set_child(img) - label = Gtk.Label(label=frame_id) - label.set_halign(Gtk.Align.START) - label.set_valign(Gtk.Align.END) + box.append(img) + + label_text = frame_id + if timestamp is not None: + m, s = divmod(int(timestamp), 60) + label_text = f"{frame_id} [{m:02d}:{s:02d}]" + + label = Gtk.Label(label=label_text) label.add_css_class("caption") - overlay.add_overlay(label) - self._frames_flow.append(overlay) + label.set_ellipsize(Pango.EllipsizeMode.END) + box.append(label) + + self._frames_flow.append(box) + log.info("Added thumbnail: %s", frame_id) diff --git a/pyproject.toml b/pyproject.toml index 5807385..641c3ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,8 @@ requires-python = ">=3.13" dependencies = [ "ffmpeg-python", "python-mpv", + "Pillow", + "numpy", ] [tool.setuptools.packages.find] diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index d87824f..93084d8 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -1,9 +1,7 @@ """Tests for cht.stream.ffmpeg — pipeline construction and execution.""" -import os import signal import subprocess -from pathlib import Path from unittest.mock import patch, MagicMock import pytest @@ -12,76 +10,43 @@ import ffmpeg as ffmpeg_lib from cht.stream import ffmpeg as ff -class TestReceiveAndSegment: +class TestReceiveAndRecord: def test_compiles_to_valid_cmd(self, tmp_path): - node = ff.receive_and_segment( - "tcp://0.0.0.0:4444?listen", tmp_path, segment_duration=30 + node = ff.receive_and_record( + "tcp://0.0.0.0:4444?listen", tmp_path / "recording.ts" ) cmd = node.compile() cmd_str = " ".join(str(c) for c in cmd) - assert cmd[0] == "ffmpeg" assert "tcp://0.0.0.0:4444?listen" in cmd_str - assert "segment" in cmd_str - assert str(tmp_path / "segment_%04d.ts") in cmd_str + assert "recording.ts" in cmd_str def test_input_has_low_latency_flags(self, tmp_path): - node = ff.receive_and_segment("tcp://0.0.0.0:4444?listen", tmp_path) + node = ff.receive_and_record( + "tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts" + ) cmd_str = " ".join(str(c) for c in node.compile()) assert "nobuffer" in cmd_str assert "low_delay" in cmd_str def test_copy_codec(self, tmp_path): - node = ff.receive_and_segment("tcp://0.0.0.0:4444?listen", tmp_path) + node = ff.receive_and_record( + "tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts" + ) assert "copy" in node.compile() def test_has_global_args(self, tmp_path): - node = ff.receive_and_segment("tcp://0.0.0.0:4444?listen", tmp_path) - cmd = node.compile() - assert "-hide_banner" in cmd - - -class TestReceiveAndSegmentWithMonitor: - def test_creates_fifo(self, tmp_path): - fifo = tmp_path / "monitor.pipe" - ff.receive_and_segment_with_monitor( - "tcp://0.0.0.0:4444?listen", tmp_path, fifo - ) - assert fifo.exists() - - def test_does_not_recreate_existing_fifo(self, tmp_path): - fifo = tmp_path / "monitor.pipe" - os.mkfifo(str(fifo)) - inode_before = fifo.stat().st_ino - ff.receive_and_segment_with_monitor( - "tcp://0.0.0.0:4444?listen", tmp_path, fifo - ) - assert fifo.stat().st_ino == inode_before - - def test_has_two_outputs(self, tmp_path): - fifo = tmp_path / "monitor.pipe" - node = ff.receive_and_segment_with_monitor( - "tcp://0.0.0.0:4444?listen", tmp_path, fifo - ) - cmd_str = " ".join(str(c) for c in node.compile()) - assert "segment_%04d.ts" in cmd_str - assert "monitor.pipe" in cmd_str - - def test_both_outputs_use_copy(self, tmp_path): - fifo = tmp_path / "monitor.pipe" - node = ff.receive_and_segment_with_monitor( - "tcp://0.0.0.0:4444?listen", tmp_path, fifo - ) - cmd = node.compile() - assert cmd.count("copy") >= 2 - - def test_has_global_args(self, tmp_path): - fifo = tmp_path / "monitor.pipe" - node = ff.receive_and_segment_with_monitor( - "tcp://0.0.0.0:4444?listen", tmp_path, fifo + node = ff.receive_and_record( + "tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts" ) assert "-hide_banner" in node.compile() + def test_mpegts_format(self, tmp_path): + node = ff.receive_and_record( + "tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts" + ) + assert "mpegts" in node.compile() + class TestExtractSceneFrames: def test_compiles_select_filter(self, tmp_path): @@ -113,32 +78,12 @@ class TestExtractSceneFrames: assert stderr == "err" -class TestExtractAudioPcm: - def test_compiles_audio_extraction(self, tmp_path): - node = ff.extract_audio_pcm(tmp_path / "test.ts") - cmd_str = " ".join(str(c) for c in node.compile()) - assert "pcm_s16le" in cmd_str - assert "16000" in cmd_str - assert "pipe:" in cmd_str - assert "wav" in cmd_str - - def test_has_global_args(self, tmp_path): - node = ff.extract_audio_pcm(tmp_path / "test.ts") - assert "-hide_banner" in node.compile() - - class TestRunAsync: - @patch("ffmpeg.run_async") - def test_delegates_to_ffmpeg_python(self, mock_run_async, tmp_path): - node = ffmpeg_lib.input("test").output("out") - ff.run_async(node) - # ffmpeg-python's run_async is called on the node - # Our function calls node.run_async() which is the bound method - - @patch("ffmpeg.run_async") - def test_passes_pipe_flags(self, mock_run_async, tmp_path): - node = ffmpeg_lib.input("test").output("out") - ff.run_async(node, pipe_stdout=True, pipe_stderr=True) + def test_compiles_valid_command(self, tmp_path): + node = ff.receive_and_record("tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts") + cmd = node.compile() + assert cmd[0] == "ffmpeg" + assert "tcp://0.0.0.0:4444?listen" in " ".join(cmd) class TestStopProc: diff --git a/tests/test_manager.py b/tests/test_manager.py index 9622864..b708337 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,10 +1,8 @@ """Tests for cht.stream.manager — StreamManager orchestration.""" import json -import os import time -from pathlib import Path -from unittest.mock import patch, MagicMock, call +from unittest.mock import patch, MagicMock import pytest @@ -13,7 +11,6 @@ from cht.stream.manager import StreamManager @pytest.fixture def manager(tmp_path): - """StreamManager with session dir in tmp_path.""" with patch("cht.stream.manager.SESSIONS_DIR", tmp_path): mgr = StreamManager(session_id="test_session") yield mgr @@ -24,7 +21,7 @@ class TestInit: def test_session_id_default(self, tmp_path): with patch("cht.stream.manager.SESSIONS_DIR", tmp_path): mgr = StreamManager() - assert mgr.session_id # should be a timestamp string + assert mgr.session_id def test_session_id_custom(self, manager): assert manager.session_id == "test_session" @@ -54,44 +51,29 @@ class TestStreamUrl: assert "listen" in manager.stream_url +class TestRecordingPath: + def test_is_in_stream_dir(self, manager): + assert manager.recording_path.parent == manager.stream_dir + assert manager.recording_path.name == "recording.ts" + + class TestStartRecorder: @patch("cht.stream.manager.ff.run_async") - @patch("cht.stream.manager.ff.receive_and_segment") - def test_calls_ffmpeg_module(self, mock_segment, mock_async, manager): + @patch("cht.stream.manager.ff.receive_and_record") + def test_calls_ffmpeg_module(self, mock_record, mock_async, manager): manager.setup_dirs() mock_node = MagicMock() - mock_segment.return_value = mock_node + mock_record.return_value = mock_node manager.start_recorder() - mock_segment.assert_called_once_with( - manager.stream_url, manager.stream_dir, 60, + mock_record.assert_called_once_with( + manager.stream_url, manager.recording_path, ) mock_async.assert_called_once_with(mock_node, pipe_stderr=True) assert "recorder" in manager._procs -class TestStartRecorderWithMonitor: - @patch("cht.stream.manager.ff.run_async") - @patch("cht.stream.manager.ff.receive_and_segment_with_monitor") - def test_creates_fifo_and_starts(self, mock_monitor, mock_async, manager): - mock_node = MagicMock() - mock_monitor.return_value = mock_node - - fifo = manager.start_recorder_with_monitor() - - assert fifo == manager.session_dir / "monitor.pipe" - mock_monitor.assert_called_once() - mock_async.assert_called_once_with(mock_node, pipe_stderr=True) - - @patch("cht.stream.manager.ff.run_async") - @patch("cht.stream.manager.ff.receive_and_segment_with_monitor") - def test_setup_dirs_called(self, mock_monitor, mock_async, manager): - mock_monitor.return_value = MagicMock() - manager.start_recorder_with_monitor() - assert manager.stream_dir.is_dir() - - class TestStopAll: @patch("cht.stream.manager.ff.stop_proc") def test_stops_all_procs(self, mock_stop, manager): @@ -109,162 +91,89 @@ class TestStopAll: assert "stop" in manager._stop_flags -class TestParseFrameTimestamps: - def test_parses_showinfo_output(self, manager): - manager.setup_dirs() - # Create fake frame files - for i in range(1, 4): - (manager.frames_dir / f"F{i:04d}.jpg").touch() - - stderr = ( - "[Parsed_showinfo_1 @ 0x1234] n:0 pts:1000 pts_time:10.5 other stuff\n" - "some other line\n" - "[Parsed_showinfo_1 @ 0x1234] n:1 pts:2000 pts_time:20.0 other stuff\n" - "[Parsed_showinfo_1 @ 0x1234] n:2 pts:3000 pts_time:35.7 other stuff\n" - ) - - manager._parse_frame_timestamps(stderr, start_num=1) - - index_path = manager.frames_dir / "index.json" - assert index_path.exists() - - with open(index_path) as f: - index = json.load(f) - - assert len(index) == 3 - assert index[0]["id"] == "F0001" - assert index[0]["timestamp"] == 10.5 - assert index[0]["sent_to_agent"] is False - assert index[1]["id"] == "F0002" - assert index[1]["timestamp"] == 20.0 - assert index[2]["id"] == "F0003" - assert index[2]["timestamp"] == 35.7 - - def test_appends_to_existing_index(self, manager): - manager.setup_dirs() - index_path = manager.frames_dir / "index.json" - - # Pre-existing index - existing = [{"id": "F0001", "timestamp": 5.0, "path": "/old", "sent_to_agent": True}] - with open(index_path, "w") as f: - json.dump(existing, f) - - # New frame - (manager.frames_dir / "F0002.jpg").touch() - stderr = "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:15.0 stuff\n" - manager._parse_frame_timestamps(stderr, start_num=2) - - with open(index_path) as f: - index = json.load(f) - - assert len(index) == 2 - assert index[0]["id"] == "F0001" # preserved - assert index[1]["id"] == "F0002" # new - - def test_skips_missing_frame_files(self, manager): - manager.setup_dirs() - # Don't create the frame file - stderr = "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:10.0 stuff\n" - manager._parse_frame_timestamps(stderr, start_num=1) - - index_path = manager.frames_dir / "index.json" - with open(index_path) as f: - index = json.load(f) - assert len(index) == 0 - - def test_ignores_non_showinfo_lines(self, manager): - manager.setup_dirs() - (manager.frames_dir / "F0001.jpg").touch() - - stderr = ( - "frame= 100 fps=30 q=28.0 size= 1024kB\n" - "video:500kB audio:200kB subtitle:0kB other streams:0kB\n" - ) - manager._parse_frame_timestamps(stderr, start_num=1) - - index_path = manager.frames_dir / "index.json" - with open(index_path) as f: - index = json.load(f) - assert len(index) == 0 - - -class TestExtractFramesFromFile: +class TestExtractNewFrames: @patch("cht.stream.manager.ff.extract_scene_frames") - def test_calls_ffmpeg_with_correct_args(self, mock_extract, manager): + def test_calls_ffmpeg_with_start_time(self, mock_extract, manager): manager.setup_dirs() - seg = manager.stream_dir / "segment_0001.ts" - seg.touch() + rec = manager.recording_path + rec.touch() mock_extract.return_value = ("", "") - manager._extract_frames_from_file(seg) + manager._extract_new_frames(rec, start_time=10.0, start_number=5) mock_extract.assert_called_once_with( - seg, + rec, manager.frames_dir, scene_threshold=0.3, max_interval=30, - start_number=1, + start_number=5, + start_time=10.0, ) @patch("cht.stream.manager.ff.extract_scene_frames") - def test_continues_numbering(self, mock_extract, manager): + def test_indexes_new_frames(self, mock_extract, manager): manager.setup_dirs() - # Pre-existing frames - (manager.frames_dir / "F0001.jpg").touch() - (manager.frames_dir / "F0002.jpg").touch() + rec = manager.recording_path + rec.touch() - seg = manager.stream_dir / "segment_0002.ts" - seg.touch() + # Simulate ffmpeg creating a frame file during extraction + def create_frame_and_return(*args, **kwargs): + (manager.frames_dir / "F0001.jpg").touch() + return ("", "[Parsed_showinfo_1 @ 0x1] n:0 pts:1000 pts_time:10.5 stuff\n") - mock_extract.return_value = ("", "") - manager._extract_frames_from_file(seg) + mock_extract.side_effect = create_frame_and_return - assert mock_extract.call_args.kwargs["start_number"] == 3 + count, max_ts = manager._extract_new_frames(rec, start_number=1) + assert count == 1 + assert max_ts == 10.5 + + index_path = manager.frames_dir / "index.json" + with open(index_path) as f: + index = json.load(f) + assert len(index) == 1 + assert index[0]["id"] == "F0001" + assert index[0]["timestamp"] == 10.5 @patch("cht.stream.manager.ff.extract_scene_frames") def test_handles_ffmpeg_failure(self, mock_extract, manager): manager.setup_dirs() - seg = manager.stream_dir / "segment_0001.ts" - seg.touch() + rec = manager.recording_path + rec.touch() mock_extract.side_effect = RuntimeError("ffmpeg died") - # Should not raise - manager._extract_frames_from_file(seg) + count, max_ts = manager._extract_new_frames(rec) + assert count == 0 - -class TestFrameWatcher: @patch("cht.stream.manager.ff.extract_scene_frames") - def test_detects_new_segments(self, mock_extract, manager): + def test_skips_preexisting_frames(self, mock_extract, manager): + manager.setup_dirs() + rec = manager.recording_path + rec.touch() + + # Pre-existing frame + (manager.frames_dir / "F0001.jpg").touch() + + # ffmpeg "creates" no new files, just returns showinfo for existing + stderr = "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:5.0 stuff\n" + mock_extract.return_value = ("", stderr) + + count, _ = manager._extract_new_frames(rec, start_number=1) + # F0001 already existed before extraction, should not be counted + assert count == 0 + + +class TestSceneDetector: + @patch("cht.stream.manager.ff.extract_scene_frames") + def test_detects_growing_file(self, mock_extract, manager): manager.setup_dirs() mock_extract.return_value = ("", "") - manager.start_frame_extractor() + # Create recording with some data + rec = manager.recording_path + rec.write_bytes(b"\x00" * 200_000) - # Create a segment file - seg = manager.stream_dir / "segment_0001.ts" - seg.write_bytes(b"\x00" * 100) - - # Wait for watcher to pick it up - time.sleep(3) + manager.start_scene_detector() + time.sleep(12) # wait for one cycle manager.stop_all() mock_extract.assert_called() - - @patch("cht.stream.manager.ff.extract_scene_frames") - def test_skips_already_seen(self, mock_extract, manager): - manager.setup_dirs() - mock_extract.return_value = ("", "") - - # Pre-create segment - seg = manager.stream_dir / "segment_0001.ts" - seg.write_bytes(b"\x00" * 100) - - manager.start_frame_extractor() - time.sleep(3) - call_count = mock_extract.call_count - - # Wait another cycle — should not re-process - time.sleep(3) - manager.stop_all() - - assert mock_extract.call_count == call_count