diff --git a/cht/app.py b/cht/app.py index 4197405..3807dd8 100644 --- a/cht/app.py +++ b/cht/app.py @@ -1,5 +1,7 @@ import logging +import os import sys +import threading import gi gi.require_version("Gtk", "4.0") @@ -25,14 +27,40 @@ class ChtApp(Adw.Application): win.present() -def _suppress_egl_warnings(domain, level, message, user_data): - if b"eglExportDMABUFImage" in message: - return - GLib.log_default_handler(domain, level, message, user_data) +_STDERR_SKIP = [b"eglExportDMABUFImage"] + + +def _filter_stderr(): + """Redirect fd 2 through a pipe; drop lines matching _STDERR_SKIP.""" + real_stderr_fd = os.dup(2) + real_stderr = os.fdopen(real_stderr_fd, "wb", buffering=0) + r_fd, w_fd = os.pipe() + os.dup2(w_fd, 2) + os.close(w_fd) + + def _pump(): + with os.fdopen(r_fd, "rb", buffering=0) as pipe: + buf = b"" + while True: + chunk = pipe.read(4096) + if not chunk: + break + buf += chunk + while b"\n" in buf: + line, buf = buf.split(b"\n", 1) + if line.strip() and not any(skip in line for skip in _STDERR_SKIP): + real_stderr.write(line + b"\n") + real_stderr.flush() + if buf: + real_stderr.write(buf) + real_stderr.flush() + + t = threading.Thread(target=_pump, daemon=True, name="stderr_filter") + t.start() def main(): - GLib.log_set_handler("Gdk", GLib.LogLevelFlags.LEVEL_WARNING, _suppress_egl_warnings, None) + _filter_stderr() logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(levelname)-7s %(name)s: %(message)s", diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index e913c31..ed05133 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -40,15 +40,22 @@ def receive_and_record(stream_url, output_path): def receive_record_and_relay(stream_url, output_path, relay_url): - """Receive TCP stream, write to MKV, and relay to UDP loopback for live display. + """Receive TCP stream, write to fragmented MP4, and relay to UDP loopback. - Uses ffmpeg tee via merge_outputs: one ffmpeg process handles both outputs - from the same decoded input, keeping them in sync with identical timestamps. + Fragmented MP4 (frag_keyframe+empty_moov) avoids MKV tail corruption: + each keyframe boundary closes a self-contained fragment, so the file is + always valid up to the last complete fragment (~1 keyframe interval ≈ 2s). + This allows the scene detector to use a 2s safety margin instead of 6s. + + Uses ffmpeg tee via merge_outputs: one process, identical timestamps. """ stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") file_out = ffmpeg.output( stream, str(output_path), - c="copy", f="matroska", flush_packets=1, + c="copy", f="mp4", + movflags="frag_keyframe+empty_moov+default_base_moof", + flush_packets=1, + **{"bsf:a": "aac_adtstoasc"}, ) relay_out = ffmpeg.output( stream, relay_url, diff --git a/cht/stream/manager.py b/cht/stream/manager.py index 5090a5c..ac5fd1f 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -56,7 +56,7 @@ class StreamManager: @property def recording_path(self): - return self.stream_dir / "recording.mkv" + return self.stream_dir / "recording.mp4" # -- Recording -- @@ -81,9 +81,13 @@ class StreamManager: def _detect(): processed_time = 0.0 frame_count = 0 + idle_cycles = 0 # consecutive cycles with no new frames while "stop" not in self._stop_flags: - time.sleep(5) + # Adaptive sleep: 1s after finding frames, then 2→4→8→10s backoff + sleep_secs = 1 if idle_cycles == 0 else min(2, 2 ** idle_cycles) + time.sleep(sleep_secs) + if not self.recording_path.exists(): continue @@ -91,16 +95,14 @@ class StreamManager: if size < 100_000: continue - # Get current duration. Use a 6s safety margin — MKV tail can - # be corrupt for several seconds after the last flush, causing - # ffmpeg to crash even with a 3s margin. + # 2s safety margin — fragmented MP4 is valid up to last complete + # keyframe fragment (~1 keyframe interval); 2s covers worst case. safe_duration = self._estimate_safe_duration() - if safe_duration is None or safe_duration <= processed_time + 8: + if safe_duration is None: continue - # Process from last checkpoint to safe point - process_to = safe_duration - 6 # 6s safety margin for MKV tail - if process_to <= processed_time: + process_to = safe_duration - 2 + if process_to <= processed_time + 0.5: continue log.info("Scene detection: %.1fs → %.1fs", processed_time, process_to) @@ -112,9 +114,12 @@ class StreamManager: if new_frames: frame_count += len(new_frames) + idle_cycles = 0 # reset — check again quickly log.info("Found %d new scene frames (total: %d)", len(new_frames), frame_count) if self._on_new_frames: self._on_new_frames(new_frames) + else: + idle_cycles += 1 # back off: 2s, 4s, 8s, 10s processed_time = process_to @@ -125,13 +130,23 @@ class StreamManager: self._threads["scene_detector"] = t def _estimate_safe_duration(self): - """Estimate recording duration. Uses ffprobe, falls back to file size.""" + """Estimate recording duration. Uses ffprobe, falls back to file size. + + For fragmented MP4 (empty_moov), format-level duration is 0 so we + check stream duration from the last video stream instead. + """ try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(self.recording_path)) + # Format duration works for non-fragmented; 0 for empty_moov fMP4 dur = float(info.get("format", {}).get("duration", 0)) if dur > 0: return dur + # Fragmented MP4: check video stream duration + for stream in info.get("streams", []): + sdur = float(stream.get("duration", 0)) + if sdur > 0: + return sdur except Exception: pass diff --git a/cht/stream/tracker.py b/cht/stream/tracker.py index 6849547..d299bf6 100644 --- a/cht/stream/tracker.py +++ b/cht/stream/tracker.py @@ -1,15 +1,13 @@ """ -RecordingTracker: monitors the growing recording file and estimates duration. +RecordingTracker: monitors the growing recording file and reports duration. -Polls file size periodically. Uses ffprobe occasionally for accurate -duration calibration. Feeds duration updates to the Timeline. +Probes with ffprobe every cycle. No bitrate estimation — initial burst frames +make calibration unreliable. Falls back to file-size heuristic only when +ffprobe returns nothing (e.g. file too new). """ -import json import logging -import subprocess import time -from pathlib import Path from threading import Thread import ffmpeg as ffmpeg_lib @@ -18,13 +16,12 @@ log = logging.getLogger(__name__) class RecordingTracker: - """Tracks a growing recording file and estimates its duration.""" + """Tracks a growing recording file and reports its duration.""" def __init__(self, recording_path, on_duration_update=None): self._path = recording_path self._on_duration = on_duration_update self._duration = 0.0 - self._avg_bitrate = None # bytes per second, calibrated by ffprobe self._stop = False self._thread = None @@ -43,9 +40,6 @@ class RecordingTracker: log.info("RecordingTracker stopped") def _poll_loop(self): - probe_interval = 0 # probe on first data - cycles = 0 - while not self._stop: time.sleep(2) @@ -56,28 +50,31 @@ class RecordingTracker: if size < 10_000: continue - # Calibrate with ffprobe every ~30s or on first data - cycles += 1 - if self._avg_bitrate is None or cycles % 15 == 0: - probed = self._probe_duration() - if probed and probed > 0 and size > 0: - self._avg_bitrate = size / probed - self._duration = probed - log.info("Probed duration: %.1fs (bitrate: %.0f B/s)", - probed, self._avg_bitrate) - elif self._avg_bitrate: - # Estimate from file size between probes - self._duration = size / self._avg_bitrate - - if self._on_duration and self._duration > 0: - self._on_duration(self._duration) + duration = self._probe_duration() + if duration and duration > self._duration: + self._duration = duration + log.info("Duration: %.1fs", duration) + if self._on_duration: + self._on_duration(self._duration) def _probe_duration(self): - """Use ffprobe to get accurate duration of the recording.""" + """Probe recording duration via ffprobe.""" try: info = ffmpeg_lib.probe(str(self._path)) - duration = float(info.get("format", {}).get("duration", 0)) - return duration + # Format-level duration is 0 for fragmented MP4 (empty_moov) + dur = float(info.get("format", {}).get("duration", 0)) + if dur > 0: + return dur + # Fragmented MP4: check video stream duration + for stream in info.get("streams", []): + sdur = float(stream.get("duration", 0)) + if sdur > 0: + return sdur except Exception as e: - log.debug("ffprobe failed (file still growing): %s", e) + log.debug("ffprobe failed: %s", e) + + # Last resort: file size heuristic (~500kbps for this stream type) + try: + return self._path.stat().st_size / 65_000 + except Exception: return None diff --git a/cht/ui/monitor.py b/cht/ui/monitor.py index de2a6ec..ea7b007 100644 --- a/cht/ui/monitor.py +++ b/cht/ui/monitor.py @@ -179,19 +179,19 @@ class MonitorWidget(Gtk.Box): else: # Scrub mode if current == "live": - # Transitioning from live: seek review player to live position + # Transitioning from live: load MKV at cursor position atomically pos = s.cursor # already set by toggle_live() if self._review_player and self._recording_path: - self._review_player.load(self._recording_path) - if s.paused: - self._review_player.show_frame_at(pos) - else: - self._review_player.seek(pos) + self._review_player.load_at(self._recording_path, pos, pause=s.paused) + if not s.paused: self._review_player.play() self._stack.set_visible_child_name("review") else: - # Already in review: just apply paused state + # Already in review: seek if cursor moved, then apply pause/play if self._review_player: + player_pos = self._review_player.time_pos or 0 + if abs(s.cursor - player_pos) > 1.0: + self._review_player.seek(s.cursor) if s.paused: self._review_player.pause() else: diff --git a/cht/ui/mpv.py b/cht/ui/mpv.py index a2bd257..005a904 100644 --- a/cht/ui/mpv.py +++ b/cht/ui/mpv.py @@ -92,6 +92,12 @@ class Player: log.info("mpv load: %s", path) self._player.loadfile(str(path), mode="replace") + def load_at(self, path, seconds, pause=True): + """Load a file and seek to position atomically. Avoids async seek race.""" + log.info("mpv load_at: %s at %.1fs pause=%s", path, seconds, pause) + self._player["pause"] = pause + self._player.loadfile(str(path), mode="replace", start=str(seconds)) + def load_live(self, url): """Load a live stream URL with low-latency options.""" self._player["cache"] = "no" diff --git a/cht/ui/timeline.py b/cht/ui/timeline.py index 0e7d21c..b80ae0c 100644 --- a/cht/ui/timeline.py +++ b/cht/ui/timeline.py @@ -10,7 +10,6 @@ consumers read timeline.state directly. """ import logging -import time from dataclasses import dataclass, field import gi @@ -88,7 +87,9 @@ class Timeline(GObject.Object): """Go to live mode at the recording end.""" self.state.live = True self.state.paused = False - self.state.cursor = self.state.duration + if self.state.duration > 0: + self.state.cursor = self.state.duration + # else: keep current cursor (tick_live will continue from here) self._emit() def toggle_live(self, live_player_pos=None): @@ -102,12 +103,17 @@ class Timeline(GObject.Object): self.state.live = False self.state.paused = True if live_player_pos is not None and live_player_pos > 0: - pos = max(0.0, min(live_player_pos, self.state.duration)) + pos = max(0.0, live_player_pos) + # Only clamp to duration if duration is known + if self.state.duration > 0: + pos = min(pos, self.state.duration) self.state.cursor = pos else: self.state.live = True self.state.paused = False - self.state.cursor = self.state.duration + if self.state.duration > 0: + self.state.cursor = self.state.duration + # else: keep current tick-based cursor, set_duration will snap later self._emit() def play(self): @@ -138,35 +144,28 @@ class Timeline(GObject.Object): class TimelineControls(Gtk.Box): - """Shared slider + play/pause/live controls. + """Slider + LIVE toggle. Scrub mode is always paused (seek-only, like a video editor). - Play/Pause and slider are insensitive in live mode. LIVE button is a toggle — active style when live=True. + Slider is insensitive in live mode. """ def __init__(self, timeline, **kwargs): super().__init__(orientation=Gtk.Orientation.HORIZONTAL, spacing=4, **kwargs) self._timeline = timeline - self._updating_slider = False self._dragging = False - self._wall_clock_start = None self.set_margin_start(4) self.set_margin_end(4) self.set_margin_top(2) self.set_margin_bottom(4) - # Play/Pause button - self._play_btn = Gtk.Button(label="Play") - self._play_btn.connect("clicked", self._on_play_clicked) - self.append(self._play_btn) - # Current time label self._time_label = Gtk.Label(label="00:00") self._time_label.set_width_chars(6) self.append(self._time_label) - # Slider + # Slider — disabled in live mode, scrub-seeks on release self._slider = Gtk.Scale(orientation=Gtk.Orientation.HORIZONTAL) self._slider.set_hexpand(True) self._slider.set_range(0, 1) @@ -193,16 +192,8 @@ class TimelineControls(Gtk.Box): timeline.connect("changed", self._on_changed) GLib.timeout_add(1000, self._tick_total) - def _on_play_clicked(self, btn): - s = self._timeline.state - if s.paused: - self._timeline.play() - else: - self._timeline.pause() - def set_live_toggle_callback(self, cb): - """Override the LIVE button handler. cb() should return the live player - position (float or None) and call timeline.toggle_live() itself.""" + """Override the LIVE button handler.""" self._live_toggle_cb = cb def _on_live_clicked(self, btn): @@ -226,12 +217,6 @@ class TimelineControls(Gtk.Box): def _on_changed(self, timeline): s = timeline.state - # Start wall clock when first going live (not on duration, which arrives ~30s later) - if s.live and self._wall_clock_start is None: - self._wall_clock_start = time.monotonic() - - # Live mode: disable scrub controls - self._play_btn.set_sensitive(not s.live) self._slider.set_sensitive(not s.live) if s.live: @@ -239,15 +224,9 @@ class TimelineControls(Gtk.Box): else: self._live_btn.remove_css_class("suggested-action") - # Play button label (only relevant in scrub mode) - self._play_btn.set_label("Pause" if not s.paused else "Play") - - # Slider position if not self._dragging: - self._updating_slider = True self._slider.set_range(0, max(s.duration, 0.1)) self._slider.set_value(s.cursor) - self._updating_slider = False self._time_label.set_text(self._fmt_time(s.cursor)) self._update_duration_label() @@ -258,10 +237,8 @@ class TimelineControls(Gtk.Box): def _update_duration_label(self): s = self._timeline.state - loaded = s.duration - total = (time.monotonic() - self._wall_clock_start) if self._wall_clock_start else loaded self._duration_label.set_text( - f"{self._fmt_time(loaded)} / {self._fmt_time(total)}" + f"{self._fmt_time(s.cursor)} / {self._fmt_time(s.duration)}" ) @staticmethod diff --git a/cht/window.py b/cht/window.py index 7d9820f..b31df6e 100644 --- a/cht/window.py +++ b/cht/window.py @@ -25,6 +25,7 @@ class ChtWindow(Adw.ApplicationWindow): self.set_title(APP_NAME) self.set_default_size(1400, 900) self._streaming = False + self._gone_live = False self._stream_mgr = None self._tracker = None self._known_frames = set() @@ -72,6 +73,7 @@ class ChtWindow(Adw.ApplicationWindow): self._connect_btn.remove_css_class("suggested-action") self._connect_btn.add_css_class("destructive-action") self._streaming = True + self._gone_live = False # Create session self._stream_mgr = StreamManager() @@ -91,15 +93,11 @@ class ChtWindow(Adw.ApplicationWindow): ) self._tracker.start() - # Go LIVE after a short delay — ffmpeg needs time to establish TCP - # and begin writing both outputs. UDP relay starts immediately after. - GLib.timeout_add(4000, self._go_live_once) - # Start scene detection self._stream_mgr.start_scene_detector(on_new_frames=self._on_new_scene_frames) # Start polling for frame thumbnails - GLib.timeout_add(3000, self._poll_frames) + GLib.timeout_add(1000, self._poll_frames) # Tick the LIVE cursor every second GLib.timeout_add(1000, self._tick_live) @@ -123,6 +121,9 @@ class ChtWindow(Adw.ApplicationWindow): def _on_duration_update(self, duration): """Called from RecordingTracker thread.""" GLib.idle_add(self._timeline.set_duration, duration) + if not self._gone_live: + self._gone_live = True + GLib.idle_add(self._go_live_once) def _on_new_scene_frames(self, frames): """Called from scene detector thread when new frames are found.""" @@ -223,7 +224,8 @@ class ChtWindow(Adw.ApplicationWindow): # Horizontal scrolling strip — storyboard style self._frames_scroll = Gtk.ScrolledWindow() self._frames_scroll.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.NEVER) - self._frames_scroll.set_min_content_height(180) # 144px thumb + label + padding + self._frames_scroll.set_min_content_height(168) + self._frames_scroll.set_size_request(-1, 168) # 144px thumb + label + padding self._frames_strip = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) self._frames_strip.set_margin_start(4) @@ -366,9 +368,10 @@ class ChtWindow(Adw.ApplicationWindow): label.set_ellipsize(Pango.EllipsizeMode.END) box.append(label) - # Click to seek into scrub mode + # Click to highlight — does NOT switch mode or seek + # (future: jump to timestamp in scrub bar without leaving live) gesture = Gtk.GestureClick() - gesture.connect("released", lambda g, n, x, y: self._timeline.seek(timestamp)) + gesture.connect("released", lambda g, n, x, y: log.debug("Frame clicked: %s at %.1fs", frame_id, timestamp)) box.add_controller(gesture) self._frames_strip.append(box) diff --git a/sender/stream_av.sh b/sender/stream_av.sh index b0d7551..b497c6d 100755 --- a/sender/stream_av.sh +++ b/sender/stream_av.sh @@ -30,12 +30,12 @@ if [ -n "$WEBCAM_MIC" ]; then exec ffmpeg \ -init_hw_device drm=drm:/dev/dri/card0 \ -init_hw_device vaapi=va@drm \ - -device /dev/dri/card0 -f kmsgrab -i - \ + -device /dev/dri/card0 -f kmsgrab -framerate 30 -i - \ -f pulse -i "$MONITOR" \ -f pulse -i "$WEBCAM_MIC" \ -filter_complex "[1:a][2:a]amix=inputs=2:duration=longest[aout]" \ -map 0:v -map "[aout]" \ - -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12' \ + -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12,fps=30' \ -c:v h264_vaapi -qp 20 -g 30 -keyint_min 30 -bf 0 \ -c:a aac -b:a 128k \ -flush_packets 1 -fflags nobuffer -muxdelay 0 -muxpreload 0 \ @@ -46,9 +46,9 @@ else exec ffmpeg \ -init_hw_device drm=drm:/dev/dri/card0 \ -init_hw_device vaapi=va@drm \ - -device /dev/dri/card0 -f kmsgrab -i - \ + -device /dev/dri/card0 -f kmsgrab -framerate 30 -i - \ -f pulse -i "$MONITOR" \ - -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12' \ + -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12,fps=30' \ -c:v h264_vaapi -qp 20 -g 30 -keyint_min 30 -bf 0 \ -c:a aac -b:a 128k \ -flush_packets 1 -fflags nobuffer -muxdelay 0 -muxpreload 0 \