diff --git a/cht/stream/processor.py b/cht/stream/processor.py index 5c717e2..8e971e1 100644 --- a/cht/stream/processor.py +++ b/cht/stream/processor.py @@ -22,7 +22,7 @@ import socket import time from pathlib import Path from queue import Queue, Empty -from threading import Thread +from threading import Thread, Event from cht.config import ( AUDIO_EXTRACT_INTERVAL, @@ -43,7 +43,7 @@ class SessionProcessor: self.frames_dir = session_dir / "frames" self.audio_dir = session_dir / "audio" - self._stop_flags: set[str] = set() + self._stop_event = Event() self._threads: dict[str, Thread] = {} self._on_new_frames = None self._on_new_audio = None @@ -289,26 +289,26 @@ class SessionProcessor: socket_path = DATA_DIR / "scene.sock" # Wait for the socket to appear (server creates it on session start). - while "stop" not in self._stop_flags: + while not self._stop_event.is_set(): if socket_path.exists(): break time.sleep(0.5) - if "stop" in self._stop_flags: + if self._stop_event.is_set(): return - while "stop" not in self._stop_flags: + while not self._stop_event.is_set(): try: self._run_scene_session(socket_path, threshold) except Exception: log.exception("Scene detector error") - if "stop" in self._stop_flags: + if self._stop_event.is_set(): break # If the socket is gone, the session ended — don't retry. if not socket_path.exists(): log.info("Scene detector: socket gone, session ended") break log.info("Scene detector: reconnecting in 2s...") - time.sleep(2.0) + self._stop_event.wait(timeout=2.0) log.info("Scene detector stopped") @@ -333,7 +333,7 @@ class SessionProcessor: # Thread: socket → ffmpeg stdin def _feed_stdin(): try: - while "stop" not in self._stop_flags: + while not self._stop_event.is_set(): data = sock.recv(65536) if not data: break @@ -411,10 +411,16 @@ class SessionProcessor: self._threads["audio_extractor"] = t def stop(self): - self._stop_flags.add("stop") + self._stop_event.set() for name, proc in getattr(self, "_procs", {}).items(): ff.stop_proc(proc, timeout=3) - self._procs = {} if hasattr(self, "_procs") else {} + self._procs = {} + # Join all threads so caller knows they're done before starting a new session + for name, t in list(self._threads.items()): + t.join(timeout=5) + if t.is_alive(): + log.warning("Thread %s still alive after stop timeout", name) + self._threads.clear() def _has_audio_stream(self, seg: Path) -> bool: try: @@ -441,9 +447,7 @@ class SessionProcessor: chunk_num = 0 current_source = None - while "stop" not in self._stop_flags: - time.sleep(AUDIO_EXTRACT_INTERVAL) - + while not self._stop_event.wait(timeout=AUDIO_EXTRACT_INTERVAL): source = self._find_audio_source() if not source: continue @@ -461,6 +465,16 @@ class SessionProcessor: if safe_duration is None or safe_duration <= 0: continue + # Fail-safe: processed_time can accumulate past the file if the + # source was recreated (e.g. server restarted same session). + if processed_time > safe_duration: + log.warning( + "Audio extractor: processed_time %.1fs > file duration %.1fs — resetting", + processed_time, safe_duration, + ) + processed_time = 0.0 + chunk_num = 0 + process_to = safe_duration - AUDIO_SAFETY_MARGIN if process_to <= processed_time + 1.0: continue diff --git a/cht/ui/monitor.py b/cht/ui/monitor.py index 00fe821..756f34f 100644 --- a/cht/ui/monitor.py +++ b/cht/ui/monitor.py @@ -147,7 +147,7 @@ class MonitorWidget(Gtk.Box): gl_area.make_current() self._live_player = Player() self._live_player.init_gl( - update_callback=lambda: GLib.idle_add(self._live_gl.queue_render) + update_callback=lambda: GLib.idle_add(self._live_gl.queue_render, priority=GLib.PRIORITY_HIGH) ) log.info("Live player created") if self._live_source_url and not self._live_loaded: @@ -162,7 +162,7 @@ class MonitorWidget(Gtk.Box): self._live_loaded = False def _on_live_render(self, gl_area, _ctx): - if not self._live_player: + if not self._live_player or not self._live_loaded: return True fbo = ctypes.c_int(0) _libGL.glGetIntegerv(GL_DRAW_FRAMEBUFFER_BINDING, ctypes.byref(fbo)) @@ -175,7 +175,7 @@ class MonitorWidget(Gtk.Box): gl_area.make_current() self._review_player = Player() self._review_player.init_gl( - update_callback=lambda: GLib.idle_add(self._review_gl.queue_render) + update_callback=lambda: GLib.idle_add(self._review_gl.queue_render, priority=GLib.PRIORITY_HIGH) ) log.info("Review player created") diff --git a/cht/ui/mpv.py b/cht/ui/mpv.py index 80e7060..7546ba2 100644 --- a/cht/ui/mpv.py +++ b/cht/ui/mpv.py @@ -103,12 +103,9 @@ class Player: def load_live(self, url): """Load a live stream URL with low-latency options.""" self._player["cache"] = "no" - self._player["demuxer-max-bytes"] = "256KiB" - self._player["demuxer-readahead-secs"] = 0 + self._player["demuxer-max-bytes"] = "512KiB" + self._player["demuxer-readahead-secs"] = 0.5 self._player["audio-buffer"] = 0.1 - self._player["untimed"] = True - self._player["video-latency-hacks"] = True - self._player["interpolation"] = False log.info("mpv load_live: %s", url) self._player.loadfile(str(url), mode="replace") diff --git a/cht/window.py b/cht/window.py index 041d653..c4aa514 100644 --- a/cht/window.py +++ b/cht/window.py @@ -508,7 +508,8 @@ class ChtWindow(Adw.ApplicationWindow): self._connect_btn.add_css_class("suggested-action") if reload_session and last_session_id: - # Transition to review mode — _load_session handles UI setup + # Stop live player before transitioning to review mode + self._monitor.reset() self._load_session(last_session_id) return diff --git a/media/client/src/backends/subprocess.rs b/media/client/src/backends/subprocess.rs index 12bc605..77a861c 100644 --- a/media/client/src/backends/subprocess.rs +++ b/media/client/src/backends/subprocess.rs @@ -1,13 +1,16 @@ //! Subprocess backend: spawn ffmpeg CLI for capture+encode. //! //! Spawns ffmpeg with the same hardware pipeline as `stream_av.sh`: -//! kmsgrab → hwmap=derive_device=vaapi → scale_vaapi → h264_vaapi +//! kmsgrab -vblank_source vsync → hwmap=derive_device=vaapi → scale_vaapi → h264_vaapi //! + PulseAudio desktop audio + mic → amix → AAC //! +//! -vblank_source vsync forces a frame grab on every display vblank, regardless +//! of page flips. Without it, kmsgrab only grabs when the compositor flips a +//! new buffer — a static/slow screen yields 1fps. +//! //! ffmpeg outputs NUT format to stdout. We demux that pipe with ffmpeg-next //! to get proper AVPackets (keyframe flags, timestamps) without parsing -//! bytestreams. NUT is lighter than mpegts — no TS overhead, exact packet -//! metadata in the container layer. +//! bytestreams. use std::os::fd::AsRawFd; use std::os::unix::io::RawFd; @@ -76,8 +79,6 @@ pub fn run( while !stop_watcher.load(Ordering::Relaxed) { std::thread::sleep(std::time::Duration::from_millis(100)); } - // Send SIGINT to ffmpeg so it flushes and closes stdout, - // which unblocks the packet iterator in demux_and_send. use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; let _ = kill(Pid::from_raw(child_pid as i32), Signal::SIGINT); @@ -148,6 +149,9 @@ fn detect_default_source(pulse_server: &str) -> Option { fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result { let audio = detect_audio_sources(); + // fps filter after scale_vaapi pads/duplicates frames to fill gaps when + // kmsgrab captures fewer frames than the target rate (e.g. compositor + // skips flips on static content). Keeps the output stream at a stable fps. let filter = format!( "hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12,fps={}", cfg.width, cfg.height, cfg.fps, @@ -158,6 +162,9 @@ fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result { "-init_hw_device".into(), format!("drm=drm:{}", cfg.device), "-init_hw_device".into(), "vaapi=va@drm".into(), // Video input (kmsgrab) + // -vblank_source vsync: grab on every display vblank, not just page flips. + // Without this, a static screen (e.g. talking-head meeting) gives 1fps + // because the compositor rarely flips a new buffer. "-thread_queue_size".into(), "512".into(), "-device".into(), cfg.device.clone(), "-f".into(), "kmsgrab".into(), diff --git a/media/ctrl/client.sh b/media/ctrl/client.sh index 3209995..ebfe4fe 100755 --- a/media/ctrl/client.sh +++ b/media/ctrl/client.sh @@ -1,7 +1,7 @@ #!/bin/bash # Build and run the media client (sender) # Requires DRM master access — runs under sudo unless already root. -# Usage: ./client.sh [server_addr] e.g. ./client.sh mcrndeb:4444 +# Usage: ./client.sh [server_addr] e.g. ./client.sh mcrndeb:4447 set -euo pipefail MEDIA_DIR="$(cd "$(dirname "$0")/.." && pwd)"