diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index ed05133..7a14a34 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -117,6 +117,18 @@ def extract_scene_frames(input_path, output_dir, scene_threshold=0.10, return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") +def extract_frame_at(input_path, output_path, timestamp): + """Extract a single frame at the given timestamp.""" + output = ( + ffmpeg.input(str(input_path), ss=timestamp) + .output(str(output_path), vframes=1, **{"q:v": "2"}) + .overwrite_output() + .global_args(*QUIET_ARGS) + ) + log.info("extract_frame_at: %s", " ".join(output.compile())) + output.run(capture_stdout=True, capture_stderr=True) + + def run_async(output_node, pipe_stdout=False, pipe_stderr=False): """Start an ffmpeg pipeline asynchronously via ffmpeg-python's run_async.""" log.info("run_async: %s", " ".join(output_node.compile())) diff --git a/cht/stream/manager.py b/cht/stream/manager.py index ac5fd1f..1a034c5 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -40,6 +40,8 @@ class StreamManager: self._procs = {} self._threads = {} self._stop_flags = set() + self._segment = 0 + self.scene_threshold = SCENE_THRESHOLD log.info("Session: %s", session_id) def setup_dirs(self): @@ -56,12 +58,36 @@ class StreamManager: @property def recording_path(self): - return self.stream_dir / "recording.mp4" + """Current recording segment path.""" + return self.stream_dir / f"recording_{self._segment:03d}.mp4" + + @property + def recording_segments(self): + """All recording segments in order.""" + return sorted(self.stream_dir.glob("recording_*.mp4")) # -- Recording -- def start_recorder(self): - """Start ffmpeg to receive TCP stream, write to MKV, and relay to UDP.""" + """Start ffmpeg to receive TCP stream, write to fMP4, and relay to UDP.""" + self._segment = 0 + self._launch_recorder() + + def restart_recorder(self): + """Restart recorder into a new segment. Session stays alive.""" + old = self._procs.pop("recorder", None) + if old: + ff.stop_proc(old) + self._segment += 1 + log.info("Restarting recorder → segment %d", self._segment) + self._launch_recorder() + + def recorder_alive(self): + """Check if the recorder process is still running.""" + proc = self._procs.get("recorder") + return proc is not None and proc.poll() is None + + def _launch_recorder(self): node = ff.receive_record_and_relay(self.stream_url, self.recording_path, self.relay_url) proc = ff.run_async(node, pipe_stderr=True) self._procs["recorder"] = proc @@ -80,27 +106,37 @@ class StreamManager: def _detect(): processed_time = 0.0 - frame_count = 0 - idle_cycles = 0 # consecutive cycles with no new frames + idle_cycles = 0 + current_segment = None while "stop" not in self._stop_flags: - # Adaptive sleep: 1s after finding frames, then 2→4→8→10s backoff - sleep_secs = 1 if idle_cycles == 0 else min(2, 2 ** idle_cycles) + # Adaptive sleep: faster at lower thresholds (more sensitive) + # threshold 0.01→1s base, 0.10→1s, 0.50→2s + base = max(1.0, min(2.0, self.scene_threshold * 10)) + sleep_secs = base if idle_cycles == 0 else min(base * 2, base * (2 ** idle_cycles)) time.sleep(sleep_secs) - if not self.recording_path.exists(): + seg = self.recording_path + if not seg.exists(): continue - size = self.recording_path.stat().st_size + # New segment started — reset per-segment progress + if seg != current_segment: + current_segment = seg + processed_time = 0.0 + idle_cycles = 0 + log.info("Scene detector: switched to %s", seg.name) + + size = seg.stat().st_size if size < 100_000: continue - # 2s safety margin — fragmented MP4 is valid up to last complete - # keyframe fragment (~1 keyframe interval); 2s covers worst case. + # Probe current segment duration directly (not total across segments) safe_duration = self._estimate_safe_duration() - if safe_duration is None: + if safe_duration is None or safe_duration <= 0: continue + # 2s safety margin for incomplete tail fragments process_to = safe_duration - 2 if process_to <= processed_time + 0.5: continue @@ -109,17 +145,16 @@ class StreamManager: new_frames = self._detect_scenes( start_time=processed_time, end_time=process_to, - start_number=frame_count + 1, ) if new_frames: - frame_count += len(new_frames) - idle_cycles = 0 # reset — check again quickly - log.info("Found %d new scene frames (total: %d)", len(new_frames), frame_count) + idle_cycles = 0 + log.info("Found %d new scene frames (total: %d)", + len(new_frames), self._next_frame_number() - 1) if self._on_new_frames: self._on_new_frames(new_frames) else: - idle_cycles += 1 # back off: 2s, 4s, 8s, 10s + idle_cycles += 1 processed_time = process_to @@ -157,16 +192,24 @@ class StreamManager: except Exception: return None - def _detect_scenes(self, start_time, end_time, start_number): + def _next_frame_number(self): + """Determine next frame number from the index (source of truth).""" + index_path = self.frames_dir / "index.json" + if index_path.exists(): + index = json.loads(index_path.read_text()) + return len(index) + 1 + return 1 + + def _detect_scenes(self, start_time, end_time): """Run ffmpeg scene detection on a time range. Returns list of new frame entries.""" duration = end_time - start_time - existing_before = set(f.name for f in self.frames_dir.glob("F*.jpg")) + start_number = self._next_frame_number() try: _stdout, stderr = ff.extract_scene_frames( self.recording_path, self.frames_dir, - scene_threshold=SCENE_THRESHOLD, + scene_threshold=self.scene_threshold, start_number=start_number, start_time=start_time, duration=duration, @@ -175,7 +218,8 @@ class StreamManager: log.error("Scene detection failed: %s", e) return [] - # Parse new frames from showinfo output + # Parse new frames from showinfo output — match each showinfo line + # to the corresponding file ffmpeg wrote (sequential from start_number) new_frames = [] index_path = self.frames_dir / "index.json" index = json.loads(index_path.read_text()) if index_path.exists() else [] @@ -189,7 +233,7 @@ class StreamManager: pts_time = float(pts_match.group(1)) frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" - if frame_path.exists() and frame_path.name not in existing_before: + if frame_path.exists(): entry = { "id": frame_id, "timestamp": pts_time, @@ -198,11 +242,55 @@ class StreamManager: } index.append(entry) new_frames.append(entry) - frame_num += 1 + frame_num += 1 index_path.write_text(json.dumps(index, indent=2)) return new_frames + def capture_now(self, on_new_frames=None): + """Capture a single frame from the current recording position. + + Grabs the latest available frame (safe_duration - 1s) and adds it + to the index. Runs in a thread to avoid blocking the UI. + """ + def _capture(): + safe_duration = self._estimate_safe_duration() + if not safe_duration or safe_duration < 1: + log.warning("capture_now: recording too short") + return + + timestamp = safe_duration - 1 + index_path = self.frames_dir / "index.json" + index = json.loads(index_path.read_text()) if index_path.exists() else [] + frame_num = len(index) + 1 + frame_id = f"F{frame_num:04d}" + frame_path = self.frames_dir / f"{frame_id}.jpg" + + try: + ff.extract_frame_at(self.recording_path, frame_path, timestamp) + except Exception as e: + log.error("capture_now failed: %s", e) + return + + if not frame_path.exists(): + log.warning("capture_now: frame not written") + return + + entry = { + "id": frame_id, + "timestamp": timestamp, + "path": str(frame_path), + "sent_to_agent": False, + } + index.append(entry) + index_path.write_text(json.dumps(index, indent=2)) + log.info("Manual capture: %s at %.1fs", frame_id, timestamp) + + if on_new_frames: + on_new_frames([entry]) + + Thread(target=_capture, daemon=True, name="capture_now").start() + # -- Lifecycle -- def stop_all(self): diff --git a/cht/stream/tracker.py b/cht/stream/tracker.py index d299bf6..56662f5 100644 --- a/cht/stream/tracker.py +++ b/cht/stream/tracker.py @@ -4,10 +4,14 @@ RecordingTracker: monitors the growing recording file and reports duration. Probes with ffprobe every cycle. No bitrate estimation — initial burst frames make calibration unreliable. Falls back to file-size heuristic only when ffprobe returns nothing (e.g. file too new). + +Supports multi-segment recording: sums completed segment durations and adds +the current segment's growing duration. """ import logging import time +from pathlib import Path from threading import Thread import ffmpeg as ffmpeg_lib @@ -16,12 +20,18 @@ log = logging.getLogger(__name__) class RecordingTracker: - """Tracks a growing recording file and reports its duration.""" + """Tracks growing recording segments and reports total duration.""" - def __init__(self, recording_path, on_duration_update=None): - self._path = recording_path + def __init__(self, get_segments, on_duration_update=None): + """ + Args: + get_segments: callable returning list of Path objects (all segments, ordered) + on_duration_update: callback(duration_float) + """ + self._get_segments = get_segments self._on_duration = on_duration_update self._duration = 0.0 + self._segment_cache = {} # path → finalized duration (only for completed segments) self._stop = False self._thread = None @@ -33,7 +43,7 @@ class RecordingTracker: self._stop = False self._thread = Thread(target=self._poll_loop, daemon=True, name="rec_tracker") self._thread.start() - log.info("RecordingTracker started: %s", self._path) + log.info("RecordingTracker started") def stop(self): self._stop = True @@ -43,38 +53,53 @@ class RecordingTracker: while not self._stop: time.sleep(2) - if not self._path.exists(): + segments = self._get_segments() + if not segments: continue - size = self._path.stat().st_size - if size < 10_000: - continue + total = 0.0 + for i, seg in enumerate(segments): + is_last = (i == len(segments) - 1) - duration = self._probe_duration() - if duration and duration > self._duration: - self._duration = duration - log.info("Duration: %.1fs", duration) + if not is_last and seg in self._segment_cache: + # Completed segment — use cached duration + total += self._segment_cache[seg] + continue + + if not seg.exists(): + continue + size = seg.stat().st_size + if size < 10_000: + continue + + dur = self._probe_duration(seg) + if dur: + if not is_last: + # Segment is done growing — cache it + self._segment_cache[seg] = dur + total += dur + + if total > self._duration: + self._duration = total + log.info("Duration: %.1fs (%d segments)", total, len(segments)) if self._on_duration: self._on_duration(self._duration) - def _probe_duration(self): + def _probe_duration(self, path): """Probe recording duration via ffprobe.""" try: - info = ffmpeg_lib.probe(str(self._path)) - # Format-level duration is 0 for fragmented MP4 (empty_moov) + info = ffmpeg_lib.probe(str(path)) dur = float(info.get("format", {}).get("duration", 0)) if dur > 0: return dur - # Fragmented MP4: check video stream duration for stream in info.get("streams", []): sdur = float(stream.get("duration", 0)) if sdur > 0: return sdur except Exception as e: - log.debug("ffprobe failed: %s", e) + log.debug("ffprobe failed for %s: %s", path, e) - # Last resort: file size heuristic (~500kbps for this stream type) try: - return self._path.stat().st_size / 65_000 + return path.stat().st_size / 65_000 except Exception: return None diff --git a/cht/window.py b/cht/window.py index b31df6e..cd03217 100644 --- a/cht/window.py +++ b/cht/window.py @@ -8,9 +8,9 @@ import gi gi.require_version("Gtk", "4.0") gi.require_version("Adw", "1") gi.require_version("GdkPixbuf", "2.0") -from gi.repository import Gtk, Adw, GLib, Pango, GdkPixbuf +from gi.repository import Gtk, Gdk, Adw, GLib, Pango, GdkPixbuf -from cht.config import APP_NAME +from cht.config import APP_NAME, SCENE_THRESHOLD from cht.ui.timeline import Timeline, TimelineControls from cht.ui.monitor import MonitorWidget from cht.stream.manager import StreamManager @@ -61,6 +61,9 @@ class ChtWindow(Adw.ApplicationWindow): self.connect("close-request", self._on_close) log.info("Window initialized") + # Auto-connect on startup + GLib.idle_add(self._start_stream) + def _on_connect_clicked(self, button): if self._streaming: self._stop_stream() @@ -86,9 +89,9 @@ class ChtWindow(Adw.ApplicationWindow): self._monitor.set_recording(self._stream_mgr.recording_path) self._monitor.set_live_source(self._stream_mgr.relay_url) - # Start tracking recording duration + # Start tracking recording duration (across segments) self._tracker = RecordingTracker( - self._stream_mgr.recording_path, + get_segments=lambda: self._stream_mgr.recording_segments if self._stream_mgr else [], on_duration_update=self._on_duration_update, ) self._tracker.start() @@ -102,6 +105,9 @@ class ChtWindow(Adw.ApplicationWindow): # Tick the LIVE cursor every second GLib.timeout_add(1000, self._tick_live) + # Watchdog: restart recorder on crash/disconnect + GLib.timeout_add(2000, self._check_recorder) + log.info("Waiting for sender...") def _go_live_once(self): @@ -124,12 +130,26 @@ class ChtWindow(Adw.ApplicationWindow): if not self._gone_live: self._gone_live = True GLib.idle_add(self._go_live_once) + # Capture initial frame — scene detector only fires on changes + if self._stream_mgr: + self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames) def _on_new_scene_frames(self, frames): """Called from scene detector thread when new frames are found.""" for f in frames: GLib.idle_add(self._timeline.add_scene_marker, f["timestamp"]) + def _check_recorder(self): + """Watchdog: restart recorder if it died (sender disconnect, etc).""" + if not self._streaming or not self._stream_mgr: + return False # stop polling + if not self._stream_mgr.recorder_alive(): + log.warning("Recorder died — restarting into new segment") + self._stream_mgr.restart_recorder() + # Re-point monitor to new recording segment + self._monitor.set_recording(self._stream_mgr.recording_path) + return True # keep polling + def _on_live_toggle(self): """LIVE button handler — passes the live player's current position.""" pos = self._monitor.get_live_position() @@ -213,13 +233,42 @@ class ChtWindow(Adw.ApplicationWindow): frame.set_child(box) return frame + def _on_capture_clicked(self, button): + if self._stream_mgr: + self._stream_mgr.capture_now(on_new_frames=self._on_new_scene_frames) + + def _on_scene_threshold_changed(self, scale): + val = scale.get_value() + self._scene_label.set_label(f"Frames (scene: {val:.2f})") + if self._stream_mgr: + self._stream_mgr.scene_threshold = val + def _build_frames_panel(self): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) - label = Gtk.Label(label="Frames Extracted") - label.add_css_class("heading") - label.set_margin_top(4) - label.set_margin_bottom(4) - box.append(label) + + header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + header.set_margin_top(4) + header.set_margin_bottom(4) + header.set_margin_start(8) + header.set_margin_end(8) + + self._scene_label = Gtk.Label(label=f"Frames (scene: {SCENE_THRESHOLD:.2f})") + self._scene_label.add_css_class("heading") + header.append(self._scene_label) + + scale = Gtk.Scale.new_with_range(Gtk.Orientation.HORIZONTAL, 0.01, 0.50, 0.01) + scale.set_value(SCENE_THRESHOLD) + scale.set_hexpand(True) + scale.set_draw_value(False) + scale.connect("value-changed", self._on_scene_threshold_changed) + header.append(scale) + + capture_btn = Gtk.Button(label="Capture") + capture_btn.add_css_class("flat") + capture_btn.connect("clicked", self._on_capture_clicked) + header.append(capture_btn) + + box.append(header) # Horizontal scrolling strip — storyboard style self._frames_scroll = Gtk.ScrolledWindow() @@ -357,10 +406,12 @@ class ChtWindow(Adw.ApplicationWindow): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) box.set_size_request(256, -1) - img = Gtk.Image.new_from_pixbuf(pixbuf) - img.set_size_request(256, 144) - img.set_vexpand(False) - box.append(img) + texture = Gdk.Texture.new_for_pixbuf(pixbuf) + pic = Gtk.Picture.new_for_paintable(texture) + pic.set_content_fit(Gtk.ContentFit.CONTAIN) + pic.set_size_request(256, 144) + pic.set_vexpand(False) + box.append(pic) m, s = divmod(int(timestamp), 60) label = Gtk.Label(label=f"{frame_id} [{m:02d}:{s:02d}]") diff --git a/sender/stream_av.sh b/sender/stream_av.sh index b497c6d..33bf6ae 100755 --- a/sender/stream_av.sh +++ b/sender/stream_av.sh @@ -5,11 +5,16 @@ # # Requires: sudo for kmsgrab, PulseAudio for audio capture # Audio is non-blocking (monitor source = passive tap) +# +# Auto-restarts on stall: a watchdog checks ffmpeg's frame counter +# and kills/restarts if video freezes (DRM/VAAPI contention from +# other apps using the GPU, e.g. video calls). set -uo pipefail RECEIVER_IP="${1:-mcrndeb}" PORT="${2:-4444}" +STALL_TIMEOUT=10 # seconds with no frame progress before restart # Let root access the user's PulseAudio session REAL_UID="${SUDO_UID:-$(id -u)}" @@ -24,34 +29,84 @@ echo "Monitor source: $MONITOR" echo "Webcam mic: ${WEBCAM_MIC:-not found}" echo "Streaming to: ${RECEIVER_IP}:${PORT}" -if [ -n "$WEBCAM_MIC" ]; then - echo "Webcam mic found, mixing desktop + mic" - # Two pulse inputs: desktop monitor + webcam mic, mixed into one audio stream - exec ffmpeg \ - -init_hw_device drm=drm:/dev/dri/card0 \ - -init_hw_device vaapi=va@drm \ - -device /dev/dri/card0 -f kmsgrab -framerate 30 -i - \ - -f pulse -i "$MONITOR" \ - -f pulse -i "$WEBCAM_MIC" \ - -filter_complex "[1:a][2:a]amix=inputs=2:duration=longest[aout]" \ - -map 0:v -map "[aout]" \ - -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12,fps=30' \ - -c:v h264_vaapi -qp 20 -g 30 -keyint_min 30 -bf 0 \ - -c:a aac -b:a 128k \ - -flush_packets 1 -fflags nobuffer -muxdelay 0 -muxpreload 0 \ - -f mpegts "tcp://${RECEIVER_IP}:${PORT}" \ - -hide_banner -else - echo "No webcam mic, desktop audio only" - exec ffmpeg \ - -init_hw_device drm=drm:/dev/dri/card0 \ - -init_hw_device vaapi=va@drm \ - -device /dev/dri/card0 -f kmsgrab -framerate 30 -i - \ - -f pulse -i "$MONITOR" \ - -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12,fps=30' \ - -c:v h264_vaapi -qp 20 -g 30 -keyint_min 30 -bf 0 \ - -c:a aac -b:a 128k \ - -flush_packets 1 -fflags nobuffer -muxdelay 0 -muxpreload 0 \ - -f mpegts "tcp://${RECEIVER_IP}:${PORT}" \ - -hide_banner -fi +# Raise fd limit for long sessions (DMA-BUF fds from kmsgrab) +ulimit -n 65536 + +PROGRESS_FILE=$(mktemp) +trap 'rm -f "$PROGRESS_FILE"' EXIT + +start_ffmpeg() { + local args=( + ffmpeg + -init_hw_device drm=drm:/dev/dri/card0 + -init_hw_device vaapi=va@drm + -thread_queue_size 64 -device /dev/dri/card0 -f kmsgrab -framerate 30 -i - + -thread_queue_size 1024 -f pulse -i "$MONITOR" + ) + + if [ -n "$WEBCAM_MIC" ]; then + args+=(-thread_queue_size 1024 -f pulse -i "$WEBCAM_MIC") + args+=(-filter_complex "[1:a][2:a]amix=inputs=2:duration=longest[aout]") + args+=(-map 0:v -map "[aout]") + fi + + args+=( + -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12,fps=30' + -c:v h264_vaapi -qp 20 -g 30 -keyint_min 30 -bf 0 + -c:a aac -b:a 128k + -max_muxing_queue_size 64 + -flush_packets 1 -fflags nobuffer -muxdelay 0 -muxpreload 0 + -f mpegts "tcp://${RECEIVER_IP}:${PORT}" + -hide_banner -progress "$PROGRESS_FILE" + ) + + "${args[@]}" & + echo $! +} + +get_frame_count() { + # -progress file writes key=value pairs; frame= is the video frame counter + grep -oP '^frame=\K[0-9]+' "$PROGRESS_FILE" 2>/dev/null | tail -1 +} + +while true; do + echo "--- Starting sender $(date) ---" + > "$PROGRESS_FILE" # reset + + FFPID=$(start_ffmpeg) + echo "ffmpeg started: pid=$FFPID" + + last_frame=0 + stall_since=$SECONDS + + while kill -0 "$FFPID" 2>/dev/null; do + sleep 2 + + cur_frame=$(get_frame_count) + cur_frame=${cur_frame:-0} + + if (( cur_frame > last_frame )); then + last_frame=$cur_frame + stall_since=$SECONDS + fi + + if (( SECONDS - stall_since > STALL_TIMEOUT )); then + echo "Video stalled at frame $last_frame for ${STALL_TIMEOUT}s — killing ffmpeg" + kill "$FFPID" 2>/dev/null + wait "$FFPID" 2>/dev/null + break + fi + done + + if ! kill -0 "$FFPID" 2>/dev/null; then + wait "$FFPID" 2>/dev/null + rc=$? + if (( rc == 0 )); then + echo "ffmpeg exited cleanly" + break + fi + fi + + echo "Restarting in 2s..." + sleep 2 +done