somewhat stable

This commit is contained in:
2026-04-10 11:47:15 -03:00
parent e2ca18d120
commit 27c0181d77
6 changed files with 47 additions and 28 deletions

View File

@@ -22,7 +22,7 @@ import socket
import time import time
from pathlib import Path from pathlib import Path
from queue import Queue, Empty from queue import Queue, Empty
from threading import Thread from threading import Thread, Event
from cht.config import ( from cht.config import (
AUDIO_EXTRACT_INTERVAL, AUDIO_EXTRACT_INTERVAL,
@@ -43,7 +43,7 @@ class SessionProcessor:
self.frames_dir = session_dir / "frames" self.frames_dir = session_dir / "frames"
self.audio_dir = session_dir / "audio" self.audio_dir = session_dir / "audio"
self._stop_flags: set[str] = set() self._stop_event = Event()
self._threads: dict[str, Thread] = {} self._threads: dict[str, Thread] = {}
self._on_new_frames = None self._on_new_frames = None
self._on_new_audio = None self._on_new_audio = None
@@ -289,26 +289,26 @@ class SessionProcessor:
socket_path = DATA_DIR / "scene.sock" socket_path = DATA_DIR / "scene.sock"
# Wait for the socket to appear (server creates it on session start). # Wait for the socket to appear (server creates it on session start).
while "stop" not in self._stop_flags: while not self._stop_event.is_set():
if socket_path.exists(): if socket_path.exists():
break break
time.sleep(0.5) time.sleep(0.5)
if "stop" in self._stop_flags: if self._stop_event.is_set():
return return
while "stop" not in self._stop_flags: while not self._stop_event.is_set():
try: try:
self._run_scene_session(socket_path, threshold) self._run_scene_session(socket_path, threshold)
except Exception: except Exception:
log.exception("Scene detector error") log.exception("Scene detector error")
if "stop" in self._stop_flags: if self._stop_event.is_set():
break break
# If the socket is gone, the session ended — don't retry. # If the socket is gone, the session ended — don't retry.
if not socket_path.exists(): if not socket_path.exists():
log.info("Scene detector: socket gone, session ended") log.info("Scene detector: socket gone, session ended")
break break
log.info("Scene detector: reconnecting in 2s...") log.info("Scene detector: reconnecting in 2s...")
time.sleep(2.0) self._stop_event.wait(timeout=2.0)
log.info("Scene detector stopped") log.info("Scene detector stopped")
@@ -333,7 +333,7 @@ class SessionProcessor:
# Thread: socket → ffmpeg stdin # Thread: socket → ffmpeg stdin
def _feed_stdin(): def _feed_stdin():
try: try:
while "stop" not in self._stop_flags: while not self._stop_event.is_set():
data = sock.recv(65536) data = sock.recv(65536)
if not data: if not data:
break break
@@ -411,10 +411,16 @@ class SessionProcessor:
self._threads["audio_extractor"] = t self._threads["audio_extractor"] = t
def stop(self): def stop(self):
self._stop_flags.add("stop") self._stop_event.set()
for name, proc in getattr(self, "_procs", {}).items(): for name, proc in getattr(self, "_procs", {}).items():
ff.stop_proc(proc, timeout=3) ff.stop_proc(proc, timeout=3)
self._procs = {} if hasattr(self, "_procs") else {} self._procs = {}
# Join all threads so caller knows they're done before starting a new session
for name, t in list(self._threads.items()):
t.join(timeout=5)
if t.is_alive():
log.warning("Thread %s still alive after stop timeout", name)
self._threads.clear()
def _has_audio_stream(self, seg: Path) -> bool: def _has_audio_stream(self, seg: Path) -> bool:
try: try:
@@ -441,9 +447,7 @@ class SessionProcessor:
chunk_num = 0 chunk_num = 0
current_source = None current_source = None
while "stop" not in self._stop_flags: while not self._stop_event.wait(timeout=AUDIO_EXTRACT_INTERVAL):
time.sleep(AUDIO_EXTRACT_INTERVAL)
source = self._find_audio_source() source = self._find_audio_source()
if not source: if not source:
continue continue
@@ -461,6 +465,16 @@ class SessionProcessor:
if safe_duration is None or safe_duration <= 0: if safe_duration is None or safe_duration <= 0:
continue continue
# Fail-safe: processed_time can accumulate past the file if the
# source was recreated (e.g. server restarted same session).
if processed_time > safe_duration:
log.warning(
"Audio extractor: processed_time %.1fs > file duration %.1fs — resetting",
processed_time, safe_duration,
)
processed_time = 0.0
chunk_num = 0
process_to = safe_duration - AUDIO_SAFETY_MARGIN process_to = safe_duration - AUDIO_SAFETY_MARGIN
if process_to <= processed_time + 1.0: if process_to <= processed_time + 1.0:
continue continue

View File

@@ -147,7 +147,7 @@ class MonitorWidget(Gtk.Box):
gl_area.make_current() gl_area.make_current()
self._live_player = Player() self._live_player = Player()
self._live_player.init_gl( self._live_player.init_gl(
update_callback=lambda: GLib.idle_add(self._live_gl.queue_render) update_callback=lambda: GLib.idle_add(self._live_gl.queue_render, priority=GLib.PRIORITY_HIGH)
) )
log.info("Live player created") log.info("Live player created")
if self._live_source_url and not self._live_loaded: if self._live_source_url and not self._live_loaded:
@@ -162,7 +162,7 @@ class MonitorWidget(Gtk.Box):
self._live_loaded = False self._live_loaded = False
def _on_live_render(self, gl_area, _ctx): def _on_live_render(self, gl_area, _ctx):
if not self._live_player: if not self._live_player or not self._live_loaded:
return True return True
fbo = ctypes.c_int(0) fbo = ctypes.c_int(0)
_libGL.glGetIntegerv(GL_DRAW_FRAMEBUFFER_BINDING, ctypes.byref(fbo)) _libGL.glGetIntegerv(GL_DRAW_FRAMEBUFFER_BINDING, ctypes.byref(fbo))
@@ -175,7 +175,7 @@ class MonitorWidget(Gtk.Box):
gl_area.make_current() gl_area.make_current()
self._review_player = Player() self._review_player = Player()
self._review_player.init_gl( self._review_player.init_gl(
update_callback=lambda: GLib.idle_add(self._review_gl.queue_render) update_callback=lambda: GLib.idle_add(self._review_gl.queue_render, priority=GLib.PRIORITY_HIGH)
) )
log.info("Review player created") log.info("Review player created")

View File

@@ -103,12 +103,9 @@ class Player:
def load_live(self, url): def load_live(self, url):
"""Load a live stream URL with low-latency options.""" """Load a live stream URL with low-latency options."""
self._player["cache"] = "no" self._player["cache"] = "no"
self._player["demuxer-max-bytes"] = "256KiB" self._player["demuxer-max-bytes"] = "512KiB"
self._player["demuxer-readahead-secs"] = 0 self._player["demuxer-readahead-secs"] = 0.5
self._player["audio-buffer"] = 0.1 self._player["audio-buffer"] = 0.1
self._player["untimed"] = True
self._player["video-latency-hacks"] = True
self._player["interpolation"] = False
log.info("mpv load_live: %s", url) log.info("mpv load_live: %s", url)
self._player.loadfile(str(url), mode="replace") self._player.loadfile(str(url), mode="replace")

View File

@@ -508,7 +508,8 @@ class ChtWindow(Adw.ApplicationWindow):
self._connect_btn.add_css_class("suggested-action") self._connect_btn.add_css_class("suggested-action")
if reload_session and last_session_id: if reload_session and last_session_id:
# Transition to review mode — _load_session handles UI setup # Stop live player before transitioning to review mode
self._monitor.reset()
self._load_session(last_session_id) self._load_session(last_session_id)
return return

View File

@@ -1,13 +1,16 @@
//! Subprocess backend: spawn ffmpeg CLI for capture+encode. //! Subprocess backend: spawn ffmpeg CLI for capture+encode.
//! //!
//! Spawns ffmpeg with the same hardware pipeline as `stream_av.sh`: //! Spawns ffmpeg with the same hardware pipeline as `stream_av.sh`:
//! kmsgrab → hwmap=derive_device=vaapi → scale_vaapi → h264_vaapi //! kmsgrab -vblank_source vsync → hwmap=derive_device=vaapi → scale_vaapi → h264_vaapi
//! + PulseAudio desktop audio + mic → amix → AAC //! + PulseAudio desktop audio + mic → amix → AAC
//! //!
//! -vblank_source vsync forces a frame grab on every display vblank, regardless
//! of page flips. Without it, kmsgrab only grabs when the compositor flips a
//! new buffer — a static/slow screen yields 1fps.
//!
//! ffmpeg outputs NUT format to stdout. We demux that pipe with ffmpeg-next //! ffmpeg outputs NUT format to stdout. We demux that pipe with ffmpeg-next
//! to get proper AVPackets (keyframe flags, timestamps) without parsing //! to get proper AVPackets (keyframe flags, timestamps) without parsing
//! bytestreams. NUT is lighter than mpegts — no TS overhead, exact packet //! bytestreams.
//! metadata in the container layer.
use std::os::fd::AsRawFd; use std::os::fd::AsRawFd;
use std::os::unix::io::RawFd; use std::os::unix::io::RawFd;
@@ -76,8 +79,6 @@ pub fn run(
while !stop_watcher.load(Ordering::Relaxed) { while !stop_watcher.load(Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
} }
// Send SIGINT to ffmpeg so it flushes and closes stdout,
// which unblocks the packet iterator in demux_and_send.
use nix::sys::signal::{kill, Signal}; use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid; use nix::unistd::Pid;
let _ = kill(Pid::from_raw(child_pid as i32), Signal::SIGINT); let _ = kill(Pid::from_raw(child_pid as i32), Signal::SIGINT);
@@ -148,6 +149,9 @@ fn detect_default_source(pulse_server: &str) -> Option<String> {
fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result<Child> { fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result<Child> {
let audio = detect_audio_sources(); let audio = detect_audio_sources();
// fps filter after scale_vaapi pads/duplicates frames to fill gaps when
// kmsgrab captures fewer frames than the target rate (e.g. compositor
// skips flips on static content). Keeps the output stream at a stable fps.
let filter = format!( let filter = format!(
"hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12,fps={}", "hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12,fps={}",
cfg.width, cfg.height, cfg.fps, cfg.width, cfg.height, cfg.fps,
@@ -158,6 +162,9 @@ fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result<Child> {
"-init_hw_device".into(), format!("drm=drm:{}", cfg.device), "-init_hw_device".into(), format!("drm=drm:{}", cfg.device),
"-init_hw_device".into(), "vaapi=va@drm".into(), "-init_hw_device".into(), "vaapi=va@drm".into(),
// Video input (kmsgrab) // Video input (kmsgrab)
// -vblank_source vsync: grab on every display vblank, not just page flips.
// Without this, a static screen (e.g. talking-head meeting) gives 1fps
// because the compositor rarely flips a new buffer.
"-thread_queue_size".into(), "512".into(), "-thread_queue_size".into(), "512".into(),
"-device".into(), cfg.device.clone(), "-device".into(), cfg.device.clone(),
"-f".into(), "kmsgrab".into(), "-f".into(), "kmsgrab".into(),

View File

@@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
# Build and run the media client (sender) # Build and run the media client (sender)
# Requires DRM master access — runs under sudo unless already root. # Requires DRM master access — runs under sudo unless already root.
# Usage: ./client.sh [server_addr] e.g. ./client.sh mcrndeb:4444 # Usage: ./client.sh [server_addr] e.g. ./client.sh mcrndeb:4447
set -euo pipefail set -euo pipefail
MEDIA_DIR="$(cd "$(dirname "$0")/.." && pwd)" MEDIA_DIR="$(cd "$(dirname "$0")/.." && pwd)"