diff --git a/media/Cargo.lock b/media/Cargo.lock index 4be98fa..09e979e 100644 --- a/media/Cargo.lock +++ b/media/Cargo.lock @@ -72,6 +72,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "cht-client" version = "0.1.0" @@ -79,6 +85,7 @@ dependencies = [ "anyhow", "cht-common", "ffmpeg-next", + "nix", "tokio", "tracing", "tracing-subscriber", @@ -261,6 +268,18 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" diff --git a/media/client/Cargo.toml b/media/client/Cargo.toml index f0253b2..efbbe53 100644 --- a/media/client/Cargo.toml +++ b/media/client/Cargo.toml @@ -10,3 +10,4 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } anyhow = { workspace = true } ffmpeg = { package = "ffmpeg-next", version = "8" } +nix = { version = "0.29", features = ["signal", "process"] } diff --git a/media/client/src/backends/mod.rs b/media/client/src/backends/mod.rs new file mode 100644 index 0000000..a925f01 --- /dev/null +++ b/media/client/src/backends/mod.rs @@ -0,0 +1,17 @@ +/// Which capture+encode backend to use. +pub enum Backend { + /// Spawn ffmpeg CLI for capture+encode (default, proven GPU path). + /// Uses hwmap via fftools' private device context — works with X2RGB10LE. + Subprocess, + /// Direct VAAPI via av_hwframe_map + scale_vaapi (experimental). + /// GPU driver dependent — fails with EPERM on Mesa radeonsi + X2RGB10LE. + VaapiDirect, +} + +impl Default for Backend { + fn default() -> Self { + Self::Subprocess + } +} + +pub mod subprocess; diff --git a/media/client/src/backends/subprocess.rs b/media/client/src/backends/subprocess.rs new file mode 100644 index 0000000..835cc7a --- /dev/null +++ b/media/client/src/backends/subprocess.rs @@ -0,0 +1,231 @@ +//! Subprocess backend: spawn ffmpeg CLI for capture+encode. +//! +//! Spawns ffmpeg with the same hardware pipeline as `stream_av.py`: +//! kmsgrab → hwmap=derive_device=vaapi → scale_vaapi → h264_vaapi +//! +//! ffmpeg outputs NUT format to stdout. We demux that pipe with ffmpeg-next +//! to get proper AVPackets (keyframe flags, timestamps) without parsing +//! bytestreams. NUT is lighter than mpegts — no TS overhead, exact packet +//! metadata in the container layer. +//! +//! This approach works where the direct VAAPI API path fails: hwmap uses +//! fftools' internal AVFilterGraph.hw_device_ctx (removed from public API +//! in ffmpeg 7+), so X2RGB10LE format negotiation succeeds. + +use std::os::fd::AsRawFd; +use std::os::unix::io::RawFd; +use std::process::{Child, Command, Stdio}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use tracing::{error, info, warn}; + +use crate::encoder::EncodedPacket; + +pub struct SubprocessConfig { + pub device: String, + pub fps: u32, + pub width: u32, + pub height: u32, + pub qp: u32, + pub gop_size: u32, +} + +impl Default for SubprocessConfig { + fn default() -> Self { + Self { + device: "/dev/dri/card0".into(), + fps: 30, + width: 1920, + height: 1080, + qp: 20, + gop_size: 30, + } + } +} + +/// Run the subprocess pipeline. Blocks until stop is set or ffmpeg exits. +pub fn run( + config: SubprocessConfig, + packet_tx: tokio::sync::mpsc::Sender, + stop: Arc, +) -> Result<()> { + let mut child = spawn_ffmpeg(&config).context("spawn ffmpeg")?; + info!("ffmpeg subprocess pid={}", child.id()); + + // Drain stderr on a separate thread so ffmpeg doesn't block on a full pipe. + let stderr = child.stderr.take().expect("stderr piped"); + let stop_on_fatal = stop.clone(); + std::thread::Builder::new() + .name("ffmpeg-stderr".into()) + .spawn(move || watch_stderr(stderr, stop_on_fatal)) + .expect("spawn stderr thread"); + + // Get the raw fd from stdout before handing it to ffmpeg-next. + // ffmpeg-next takes ownership of the input context but we keep the Child + // alive so the fd stays valid. + let stdout = child.stdout.take().expect("stdout piped"); + let fd: RawFd = stdout.as_raw_fd(); + + // Keep stdout alive for the duration of demuxing. + let _stdout_guard = stdout; + + let result = demux_and_send(fd, packet_tx, stop, &mut child); + + // Clean up subprocess regardless of result. + kill_child(&mut child); + + result +} + +fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result { + let filter = format!( + "hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12,fps={}", + cfg.width, cfg.height, cfg.fps, + ); + + let child = Command::new("ffmpeg") + .args([ + "-init_hw_device", &format!("drm=drm:{}", cfg.device), + "-init_hw_device", "vaapi=va@drm", + "-thread_queue_size", "64", + "-device", &cfg.device, + "-f", "kmsgrab", + "-framerate", &cfg.fps.to_string(), + "-i", "-", + "-vf", &filter, + "-c:v", "h264_vaapi", + "-qp", &cfg.qp.to_string(), + "-g", &cfg.gop_size.to_string(), + "-bf", "0", + "-flush_packets", "1", + "-fflags", "nobuffer", + "-f", "nut", + "pipe:1", + "-hide_banner", + ]) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .context("failed to spawn ffmpeg — is it in PATH?")?; + + Ok(child) +} + +fn demux_and_send( + fd: RawFd, + packet_tx: tokio::sync::mpsc::Sender, + stop: Arc, + child: &mut Child, +) -> Result<()> { + ffmpeg::init().context("ffmpeg init")?; + + // Open the NUT stream from the pipe fd. + let pipe_url = format!("pipe:{fd}"); + let mut input_ctx = ffmpeg::format::input(&pipe_url) + .context("open ffmpeg input from pipe")?; + + let video_stream = input_ctx + .streams() + .best(ffmpeg::media::Type::Video) + .context("no video stream in NUT output")?; + + let stream_idx = video_stream.index(); + let time_base = video_stream.time_base(); + let tb_num = time_base.numerator() as u32; + let tb_den = time_base.denominator() as u32; + + info!( + "Subprocess demux ready: stream_idx={}, time_base={}/{}", + stream_idx, tb_num, tb_den + ); + + let mut packet_count = 0u64; + + for (stream, packet) in input_ctx.packets() { + if stop.load(Ordering::Relaxed) { + break; + } + + // ffmpeg process died + if let Some(status) = child.try_wait().ok().flatten() { + warn!("ffmpeg exited with {status}"); + break; + } + + if stream.index() != stream_idx { + continue; + } + + let data = match packet.data() { + Some(d) => d.to_vec(), + None => continue, + }; + + let encoded = EncodedPacket { + data, + pts: packet.pts().unwrap_or(0), + dts: packet.dts().unwrap_or(0), + keyframe: packet.is_key(), + time_base_num: tb_num, + time_base_den: tb_den, + }; + + packet_count += 1; + if packet_count % 300 == 1 { + info!("Subprocess: {packet_count} packets encoded"); + } + + if packet_tx.blocking_send(encoded).is_err() { + info!("Packet channel closed, stopping subprocess pipeline"); + break; + } + } + + info!("Subprocess pipeline stopped ({packet_count} packets)"); + Ok(()) +} + +fn watch_stderr(stderr: std::process::ChildStderr, stop: Arc) { + use std::io::{BufRead, BufReader}; + + const FATAL: &[&str] = &["framebuffer format changed", "Error during demuxing"]; + + for line in BufReader::new(stderr).lines() { + let line = match line { + Ok(l) => l, + Err(_) => break, + }; + if !line.is_empty() { + info!("[ffmpeg] {line}"); + } + if FATAL.iter().any(|p| line.contains(p)) { + error!("Fatal ffmpeg error — stopping: {line}"); + stop.store(true, Ordering::Relaxed); + } + } + info!("[ffmpeg] stderr closed"); +} + +fn kill_child(child: &mut Child) { + use nix::sys::signal::{killpg, Signal}; + use nix::unistd::Pid; + + if child.try_wait().ok().flatten().is_some() { + return; // already exited + } + + // Send SIGINT to the process group so ffmpeg can flush cleanly. + if let Ok(pgid) = nix::unistd::getpgid(Some(Pid::from_raw(child.id() as i32))) { + let _ = killpg(pgid, Signal::SIGINT); + } else { + child.kill().ok(); + } + + match child.wait() { + Ok(s) => info!("ffmpeg exited: {s}"), + Err(e) => warn!("ffmpeg wait error: {e}"), + } +} diff --git a/media/client/src/capture.rs b/media/client/src/capture.rs index d4cb60b..2f0b273 100644 --- a/media/client/src/capture.rs +++ b/media/client/src/capture.rs @@ -49,10 +49,14 @@ impl KmsCapture { pub fn open(config: &CaptureConfig) -> Result { ffmpeg::init().context("ffmpeg init")?; - // Set up the kmsgrab input device + // Set up the kmsgrab input device. + // Request bgr0 (XRGB8888) explicitly — if the compositor exposes an 8-bit + // plane alongside the 10-bit one, kmsgrab will prefer it. This avoids the + // X2RGB10LE→VAAPI import limitation without changing any system config. let mut opts = ffmpeg::Dictionary::new(); opts.set("device", &config.device); opts.set("framerate", &config.fps.to_string()); + opts.set("format", "bgr0"); let fmt_name = CString::new("kmsgrab").unwrap(); let fmt_ptr = unsafe { ffmpeg::ffi::av_find_input_format(fmt_name.as_ptr()) }; diff --git a/media/client/src/encoder.rs b/media/client/src/encoder.rs index 073c299..3cc0949 100644 --- a/media/client/src/encoder.rs +++ b/media/client/src/encoder.rs @@ -1,14 +1,13 @@ -//! VAAPI H.264 hardware encoding. +//! VAAPI H.264 hardware encoding via direct frame mapping. +//! +//! Bypasses ffmpeg's filter graph (hwmap has format negotiation bugs with +//! 10-bit DRM formats). Instead uses av_hwframe_map() to map DRM_PRIME +//! frames directly to VAAPI surfaces — same approach as mpv. //! //! Two-phase initialization: //! - Phase 1 (`VaapiEncoder::new`): store config, no ffmpeg calls. -//! - Phase 2 (first call to `encode`): build filter graph using `hw_frames_ctx` -//! from the first DRM_PRIME frame to wire up hwmap, then open the codec. -//! -//! The split exists because the filter graph needs the DRM hardware device -//! context that kmsgrab attaches to each frame — that context isn't available -//! until the first frame arrives, so the graph can't be validated at -//! construction time. +//! - Phase 2 (first call to `encode`): create VAAPI device + frames context +//! using the first DRM_PRIME frame's hw context, open the encoder. use anyhow::{Context, Result}; use tracing::info; @@ -36,7 +35,7 @@ impl Default for EncoderConfig { } } -/// VAAPI H.264 encoder with lazy filter graph initialization. +/// VAAPI H.264 encoder with lazy initialization. pub struct VaapiEncoder { config: EncoderConfig, inner: Option, @@ -44,19 +43,25 @@ pub struct VaapiEncoder { struct EncoderInner { encoder: ffmpeg::encoder::Video, - filter_graph: ffmpeg::filter::Graph, + /// VAAPI frames context for mapping DRM_PRIME → VAAPI (matches DRM sw_format) + vaapi_frames_ref: *mut ffmpeg::ffi::AVBufferRef, + /// scale_vaapi filter graph: converts VAAPI X2RGB10LE → VAAPI NV12 + scale_graph: ffmpeg::filter::Graph, frame_count: u64, time_base: ffmpeg::Rational, } +// The raw pointer is only used from the pipeline thread. +unsafe impl Send for EncoderInner {} + impl VaapiEncoder { /// Create an encoder from config. No ffmpeg resources are allocated yet. pub fn new(config: EncoderConfig) -> Self { Self { config, inner: None } } - /// Encode a frame. On the first call, initializes the filter graph and - /// codec using the frame's `hw_frames_ctx`. Returns 0 or more packets. + /// Encode a frame. On the first call, initializes the VAAPI device and + /// encoder using the first DRM_PRIME frame's hw context. pub fn encode(&mut self, frame: &ffmpeg::frame::Video) -> Result> { if self.inner.is_none() { self.inner = Some(EncoderInner::open(&self.config, frame).context("encoder init")?); @@ -73,11 +78,140 @@ impl VaapiEncoder { } } +impl Drop for EncoderInner { + fn drop(&mut self) { + unsafe { + ffmpeg::ffi::av_buffer_unref(&mut self.vaapi_frames_ref); + } + } +} + impl EncoderInner { fn open(config: &EncoderConfig, first_frame: &ffmpeg::frame::Video) -> Result { let time_base = ffmpeg::Rational::new(1, config.fps as i32); - let filter_graph = Self::build_filter_graph(config, first_frame)?; + // Derive VAAPI device from the DRM device in the frame's hw context + let vaapi_device_ref = unsafe { + let hw_frames_ctx = (*first_frame.as_ptr()).hw_frames_ctx; + anyhow::ensure!( + !hw_frames_ctx.is_null(), + "first KMS frame has no hw_frames_ctx" + ); + let frames_ctx = &*((*hw_frames_ctx).data as *const ffmpeg::ffi::AVHWFramesContext); + let drm_device_ref = frames_ctx.device_ref; + + info!( + "DRM frame: {}x{}, sw_format={:?}", + frames_ctx.width, frames_ctx.height, frames_ctx.sw_format + ); + + let mut vaapi_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut(); + let ret = ffmpeg::ffi::av_hwdevice_ctx_create_derived( + &mut vaapi_ref, + ffmpeg::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_VAAPI, + drm_device_ref, + 0, + ); + anyhow::ensure!(ret >= 0, "derive VAAPI device from DRM: error {ret}"); + info!("VAAPI device derived from DRM"); + vaapi_ref + }; + + // Get the DRM sw_format from the first frame + let drm_sw_format = unsafe { + let hw_frames_ctx = (*first_frame.as_ptr()).hw_frames_ctx; + let fctx = &*((*hw_frames_ctx).data as *const ffmpeg::ffi::AVHWFramesContext); + fctx.sw_format + }; + + // Create VAAPI frames context with matching sw_format so av_hwframe_map + // can import the DMA-BUF without format conversion + let vaapi_frames_ref = unsafe { + let frames_ref = ffmpeg::ffi::av_hwframe_ctx_alloc(vaapi_device_ref); + anyhow::ensure!(!frames_ref.is_null(), "av_hwframe_ctx_alloc failed"); + + let frames_ctx = &mut *((*frames_ref).data as *mut ffmpeg::ffi::AVHWFramesContext); + frames_ctx.format = ffmpeg::ffi::AVPixelFormat::AV_PIX_FMT_VAAPI; + frames_ctx.sw_format = drm_sw_format; // match DRM buffer (e.g. X2RGB10LE) + frames_ctx.width = config.width as i32; + frames_ctx.height = config.height as i32; + frames_ctx.initial_pool_size = 4; + + let ret = ffmpeg::ffi::av_hwframe_ctx_init(frames_ref); + anyhow::ensure!(ret >= 0, "av_hwframe_ctx_init: error {ret}"); + info!("VAAPI frames context created: {}x{}, sw_format={:?}", config.width, config.height, drm_sw_format); + + frames_ref + }; + + // Build a VAAPI-only scale graph: VAAPI(drm_sw_format) → scale_vaapi → VAAPI(NV12) + // Both sides are VAAPI so format negotiation is straightforward — no cross-API issues. + let scale_graph = { + let mut graph = ffmpeg::filter::Graph::new(); + + let args = format!( + "video_size={}x{}:pix_fmt={}:time_base=1/{}:pixel_aspect=1/1", + config.width, config.height, + ffmpeg::format::Pixel::VAAPI as i32, + config.fps, + ); + graph.add(&ffmpeg::filter::find("buffer").unwrap(), "in", &args) + .context("add buffersrc")?; + graph.add(&ffmpeg::filter::find("buffersink").unwrap(), "out", "") + .context("add buffersink")?; + + let filter_spec = format!( + "scale_vaapi=w={}:h={}:format=nv12", + config.width, config.height, + ); + graph.output("in", 0)?.input("out", 0)?.parse(&filter_spec)?; + + // Set VAAPI device on all filter contexts + unsafe { + let graph_ptr = graph.as_mut_ptr(); + for i in 0..(*graph_ptr).nb_filters { + let fctx = *(*graph_ptr).filters.add(i as usize); + (*fctx).hw_device_ctx = ffmpeg::ffi::av_buffer_ref(vaapi_device_ref); + } + } + + // Attach the VAAPI frames context to buffersrc, explicitly marking format=VAAPI + unsafe { + let mut buffersrc = graph.get("in").unwrap(); + let par = ffmpeg::ffi::av_buffersrc_parameters_alloc(); + anyhow::ensure!(!par.is_null(), "av_buffersrc_parameters_alloc OOM"); + (*par).format = ffmpeg::ffi::AVPixelFormat::AV_PIX_FMT_VAAPI as i32; + (*par).hw_frames_ctx = ffmpeg::ffi::av_buffer_ref(vaapi_frames_ref); + let ret = ffmpeg::ffi::av_buffersrc_parameters_set(buffersrc.as_mut_ptr(), par); + ffmpeg::ffi::av_free(par as *mut _); + anyhow::ensure!(ret >= 0, "av_buffersrc_parameters_set: {ret}"); + } + + graph.validate().context("validate scale_vaapi graph")?; + info!("scale_vaapi filter graph ready"); + graph + }; + + // Get the NV12 VAAPI frames context from the scale graph's output + let nv12_frames_ref = unsafe { + let graph_ptr = scale_graph.as_ptr(); + let mut nv12_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut(); + for i in 0..(*graph_ptr).nb_filters { + let fctx = *(*graph_ptr).filters.add(i as usize); + let name = std::ffi::CStr::from_ptr((*(*fctx).filter).name).to_str().unwrap_or(""); + if name == "buffersink" && (*fctx).nb_inputs > 0 { + let link = *(*fctx).inputs; + if !link.is_null() { + nv12_ref = ffmpeg::ffi::avfilter_link_get_hw_frames_ctx(link); + } + break; + } + } + anyhow::ensure!(!nv12_ref.is_null(), "could not get NV12 frames context from scale graph"); + nv12_ref + }; + + // Open h264_vaapi encoder with the NV12 VAAPI frames context let codec = ffmpeg::encoder::find_by_name("h264_vaapi") .context("h264_vaapi encoder not found")?; @@ -93,6 +227,11 @@ impl EncoderInner { encoder_ctx.set_max_b_frames(0); encoder_ctx.set_format(ffmpeg::format::Pixel::VAAPI); + unsafe { + let ctx_ptr = encoder_ctx.as_mut_ptr(); + (*ctx_ptr).hw_frames_ctx = nv12_frames_ref; + } + let mut opts = ffmpeg::Dictionary::new(); opts.set("qp", &config.qp.to_string()); @@ -105,159 +244,68 @@ impl EncoderInner { config.width, config.height, config.fps, config.qp, config.gop_size, ); - Ok(Self { encoder, filter_graph, frame_count: 0, time_base }) + unsafe { + ffmpeg::ffi::av_buffer_unref(&mut (vaapi_device_ref as *mut _)); + } + + Ok(Self { + encoder, + vaapi_frames_ref, + scale_graph, + frame_count: 0, + time_base, + }) } - /// Build the filter graph: DRM_PRIME → hwmap(vaapi) → scale_vaapi → buffersink. - /// - /// Mirrors the ffmpeg CLI approach: - /// -init_hw_device drm=drm:/dev/dri/card0 - /// -init_hw_device vaapi=va@drm - /// -filter_hw_device va - /// - /// We create both devices explicitly and set the VAAPI device on the graph, - /// then attach the frame's hw_frames_ctx to the buffersrc so hwmap can map - /// DRM_PRIME frames to VAAPI surfaces. - fn build_filter_graph( - config: &EncoderConfig, - first_frame: &ffmpeg::frame::Video, - ) -> Result { - let input_width = first_frame.width(); - let input_height = first_frame.height(); + fn encode(&mut self, drm_frame: &ffmpeg::frame::Video) -> Result> { + // Step 1: Map DRM_PRIME → VAAPI surface (zero-copy, same sw_format) + let vaapi_frame = unsafe { + let mut dst = ffmpeg::ffi::av_frame_alloc(); + anyhow::ensure!(!dst.is_null(), "av_frame_alloc failed"); + (*dst).hw_frames_ctx = ffmpeg::ffi::av_buffer_ref(self.vaapi_frames_ref); - // Create DRM device, then derive VAAPI from it (like -init_hw_device vaapi=va@drm) - let (drm_device, vaapi_device) = unsafe { - let mut drm_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut(); - let dev_path = std::ffi::CString::new("/dev/dri/card0").unwrap(); - let ret = ffmpeg::ffi::av_hwdevice_ctx_create( - &mut drm_ref, - ffmpeg::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_DRM, - dev_path.as_ptr(), - std::ptr::null_mut(), - 0, + let ret = ffmpeg::ffi::av_hwframe_map( + dst, + drm_frame.as_ptr(), + ffmpeg::ffi::AV_HWFRAME_MAP_READ as i32 + | ffmpeg::ffi::AV_HWFRAME_MAP_DIRECT as i32, ); - anyhow::ensure!(ret >= 0, "av_hwdevice_ctx_create(drm): {ret}"); - info!("DRM device created"); - - let mut vaapi_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut(); - let ret = ffmpeg::ffi::av_hwdevice_ctx_create_derived( - &mut vaapi_ref, - ffmpeg::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_VAAPI, - drm_ref, - 0, - ); - anyhow::ensure!(ret >= 0, "av_hwdevice_ctx_create_derived(vaapi): {ret}"); - info!("VAAPI device derived from DRM"); - - (drm_ref, vaapi_ref) + if ret < 0 { + ffmpeg::ffi::av_frame_free(&mut dst); + anyhow::bail!("av_hwframe_map DRM_PRIME→VAAPI: error {ret}"); + } + dst }; - let mut graph = ffmpeg::filter::Graph::new(); - - let args = format!( - "video_size={}x{}:pix_fmt={}:time_base=1/{}:pixel_aspect=1/1", - input_width, - input_height, - ffmpeg::format::Pixel::DRM_PRIME as i32, - config.fps, - ); - - graph - .add(&ffmpeg::filter::find("buffer").unwrap(), "in", &args) - .context("add buffersrc")?; - graph - .add(&ffmpeg::filter::find("buffersink").unwrap(), "out", "") - .context("add buffersink")?; - - let filter_spec = format!( - "hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12", - config.width, config.height, - ); - - graph.output("in", 0)?.input("out", 0)?.parse(&filter_spec)?; - - // Set the VAAPI device on filter contexts that need it - // (equivalent to -filter_hw_device va in the CLI) + // Step 2: scale_vaapi VAAPI(X2RGB10LE) → VAAPI(NV12) unsafe { - let graph_ptr = graph.as_mut_ptr(); - for i in 0..(*graph_ptr).nb_filters { - let fctx = *(*graph_ptr).filters.add(i as usize); - (*fctx).hw_device_ctx = ffmpeg::ffi::av_buffer_ref(vaapi_device); - } + // Wrap raw AVFrame pointer as ffmpeg::frame::Video without taking ownership + let vaapi_fframe = ffmpeg::frame::Video::wrap(vaapi_frame as *mut _); + self.scale_graph + .get("in").unwrap() + .source() + .add(&vaapi_fframe) + .context("scale buffersrc add")?; + // Prevent drop from freeing the frame — we free it below + std::mem::forget(vaapi_fframe); + ffmpeg::ffi::av_frame_free(&mut (vaapi_frame as *mut _)); } - // Attach hw_frames_ctx from the first DRM_PRIME frame to the buffersrc - unsafe { - let hw_frames_ctx = (*first_frame.as_ptr()).hw_frames_ctx; - anyhow::ensure!( - !hw_frames_ctx.is_null(), - "first KMS frame has no hw_frames_ctx" - ); - - let mut buffersrc = graph.get("in").unwrap(); - let par = ffmpeg::ffi::av_buffersrc_parameters_alloc(); - anyhow::ensure!(!par.is_null(), "av_buffersrc_parameters_alloc OOM"); - (*par).hw_frames_ctx = ffmpeg::ffi::av_buffer_ref(hw_frames_ctx); - let ret = ffmpeg::ffi::av_buffersrc_parameters_set(buffersrc.as_mut_ptr(), par); - ffmpeg::ffi::av_free(par as *mut _); - anyhow::ensure!(ret >= 0, "av_buffersrc_parameters_set: {ret}"); - } - - graph.validate().context("validate filter graph")?; - info!("Filter graph ready: {filter_spec}"); - - // Keep device refs alive — they're ref-counted by the graph now, - // but we drop our refs here. - unsafe { - ffmpeg::ffi::av_buffer_unref(&mut (drm_device as *mut _)); - ffmpeg::ffi::av_buffer_unref(&mut (vaapi_device as *mut _)); - } - - Ok(graph) - } - - fn encode(&mut self, frame: &ffmpeg::frame::Video) -> Result> { - self.filter_graph - .get("in") - .unwrap() - .source() - .add(frame) - .context("buffersrc add")?; - let mut packets = Vec::new(); - let mut filtered = ffmpeg::frame::Video::empty(); - - while self - .filter_graph - .get("out") - .unwrap() - .sink() - .frame(&mut filtered) - .is_ok() - { - filtered.set_pts(Some(self.frame_count as i64)); + let mut scaled = ffmpeg::frame::Video::empty(); + while self.scale_graph.get("out").unwrap().sink().frame(&mut scaled).is_ok() { + scaled.set_pts(Some(self.frame_count as i64)); self.frame_count += 1; - self.encoder.send_frame(&filtered)?; - - let mut encoded = ffmpeg::Packet::empty(); - while self.encoder.receive_packet(&mut encoded).is_ok() { - packets.push(EncodedPacket { - data: encoded.data().unwrap_or(&[]).to_vec(), - pts: encoded.pts().unwrap_or(0), - dts: encoded.dts().unwrap_or(0), - keyframe: encoded.is_key(), - time_base_num: self.time_base.numerator() as u32, - time_base_den: self.time_base.denominator() as u32, - }); - } + // Step 3: encode VAAPI NV12 frame + self.encoder.send_frame(&scaled)?; + packets.extend(self.receive_packets()?); } Ok(packets) } - fn flush(&mut self) -> Result> { - self.encoder.send_eof()?; + fn receive_packets(&mut self) -> Result> { let mut packets = Vec::new(); let mut encoded = ffmpeg::Packet::empty(); while self.encoder.receive_packet(&mut encoded).is_ok() { @@ -272,6 +320,11 @@ impl EncoderInner { } Ok(packets) } + + fn flush(&mut self) -> Result> { + self.encoder.send_eof()?; + self.receive_packets() + } } /// An encoded video packet ready for transport. diff --git a/media/client/src/lib.rs b/media/client/src/lib.rs index 229f3e6..ff085c8 100644 --- a/media/client/src/lib.rs +++ b/media/client/src/lib.rs @@ -1,3 +1,4 @@ +pub mod backends; pub mod capture; pub mod encoder; pub mod pipeline; diff --git a/media/client/src/main.rs b/media/client/src/main.rs index 06c7d0a..18dd855 100644 --- a/media/client/src/main.rs +++ b/media/client/src/main.rs @@ -7,6 +7,7 @@ use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::net::TcpStream; use tracing::info; +use cht_client::backends::Backend; use cht_client::capture::CaptureConfig; use cht_client::encoder::EncoderConfig; use cht_client::pipeline::Pipeline; @@ -17,10 +18,23 @@ const DEFAULT_SERVER: &str = "mcrndeb:4444"; async fn main() -> Result<()> { cht_common::logging::init("client"); - let server_addr = std::env::args() - .nth(1) + let args: Vec = std::env::args().collect(); + + let server_addr = args.get(1) + .filter(|a| !a.starts_with('-')) + .cloned() .unwrap_or_else(|| DEFAULT_SERVER.to_string()); + let backend = if args.iter().any(|a| a == "--backend" ) + && args.windows(2).any(|w| w[0] == "--backend" && w[1] == "direct") + { + info!("Backend: VaapiDirect (--backend direct)"); + Backend::VaapiDirect + } else { + info!("Backend: Subprocess (default)"); + Backend::Subprocess + }; + info!("Connecting to {server_addr}..."); let stream = TcpStream::connect(&server_addr).await?; info!("Connected"); @@ -51,7 +65,7 @@ async fn main() -> Result<()> { // Start capture pipeline on a dedicated thread let (packet_tx, mut packet_rx) = tokio::sync::mpsc::channel(64); - let mut pipeline = Pipeline::start(capture_config, encoder_config, packet_tx); + let mut pipeline = Pipeline::start(capture_config, encoder_config, backend, packet_tx); // Forward encoded packets to the server let mut video_count = 0u64; diff --git a/media/client/src/pipeline.rs b/media/client/src/pipeline.rs index ee0ec66..6c3332a 100644 --- a/media/client/src/pipeline.rs +++ b/media/client/src/pipeline.rs @@ -1,7 +1,8 @@ //! Capture pipeline: ties capture → encode → transport on a dedicated thread. //! -//! This is the main loop that runs on a blocking thread, reading frames -//! from KMS, encoding with VAAPI, and sending encoded packets through a channel. +//! Dispatches to one of two backends: +//! - `Subprocess`: spawn ffmpeg CLI (default, proven GPU path) +//! - `VaapiDirect`: KmsCapture + VaapiEncoder via av_hwframe_map (experimental) use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -9,6 +10,8 @@ use std::sync::Arc; use anyhow::{Context, Result}; use tracing::{error, info, warn}; +use crate::backends::Backend; +use crate::backends::subprocess::SubprocessConfig; use crate::capture::{CaptureConfig, KmsCapture}; use crate::encoder::{EncodedPacket, EncoderConfig, VaapiEncoder}; @@ -26,6 +29,7 @@ impl Pipeline { pub fn start( capture_config: CaptureConfig, encoder_config: EncoderConfig, + backend: Backend, packet_tx: tokio::sync::mpsc::Sender, ) -> Self { let stop = Arc::new(AtomicBool::new(false)); @@ -34,7 +38,7 @@ impl Pipeline { let thread = std::thread::Builder::new() .name("capture-pipeline".into()) .spawn(move || { - if let Err(e) = run_pipeline(capture_config, encoder_config, packet_tx, stop_clone) { + if let Err(e) = run_pipeline(capture_config, encoder_config, backend, packet_tx, stop_clone) { error!("Pipeline error: {e:#}"); } }) @@ -62,6 +66,33 @@ impl Drop for Pipeline { } fn run_pipeline( + capture_config: CaptureConfig, + encoder_config: EncoderConfig, + backend: Backend, + packet_tx: tokio::sync::mpsc::Sender, + stop: Arc, +) -> Result<()> { + match backend { + Backend::Subprocess => { + info!("Backend: Subprocess (ffmpeg CLI)"); + let config = SubprocessConfig { + device: capture_config.device, + fps: capture_config.fps, + width: encoder_config.width, + height: encoder_config.height, + qp: encoder_config.qp, + gop_size: encoder_config.gop_size, + }; + crate::backends::subprocess::run(config, packet_tx, stop) + } + Backend::VaapiDirect => { + info!("Backend: VaapiDirect (av_hwframe_map, experimental)"); + run_direct_pipeline(capture_config, encoder_config, packet_tx, stop) + } + } +} + +fn run_direct_pipeline( capture_config: CaptureConfig, encoder_config: EncoderConfig, packet_tx: tokio::sync::mpsc::Sender, @@ -94,7 +125,6 @@ fn run_pipeline( } } - // Flush encoder info!("Flushing encoder..."); let flush_pkts = encoder.flush()?; for pkt in flush_pkts {