phase 3: subprocess backend, dual-backend pipeline, packets flowing

This commit is contained in:
2026-04-09 19:46:20 -03:00
parent 5b467ffba8
commit ff96dcb4f7
9 changed files with 527 additions and 157 deletions

19
media/Cargo.lock generated
View File

@@ -72,6 +72,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "cfg_aliases"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]] [[package]]
name = "cht-client" name = "cht-client"
version = "0.1.0" version = "0.1.0"
@@ -79,6 +85,7 @@ dependencies = [
"anyhow", "anyhow",
"cht-common", "cht-common",
"ffmpeg-next", "ffmpeg-next",
"nix",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
@@ -261,6 +268,18 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "nix"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
dependencies = [
"bitflags",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.3" version = "7.1.3"

View File

@@ -10,3 +10,4 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
ffmpeg = { package = "ffmpeg-next", version = "8" } ffmpeg = { package = "ffmpeg-next", version = "8" }
nix = { version = "0.29", features = ["signal", "process"] }

View File

@@ -0,0 +1,17 @@
/// Which capture+encode backend to use.
pub enum Backend {
/// Spawn ffmpeg CLI for capture+encode (default, proven GPU path).
/// Uses hwmap via fftools' private device context — works with X2RGB10LE.
Subprocess,
/// Direct VAAPI via av_hwframe_map + scale_vaapi (experimental).
/// GPU driver dependent — fails with EPERM on Mesa radeonsi + X2RGB10LE.
VaapiDirect,
}
impl Default for Backend {
fn default() -> Self {
Self::Subprocess
}
}
pub mod subprocess;

View File

@@ -0,0 +1,231 @@
//! Subprocess backend: spawn ffmpeg CLI for capture+encode.
//!
//! Spawns ffmpeg with the same hardware pipeline as `stream_av.py`:
//! kmsgrab → hwmap=derive_device=vaapi → scale_vaapi → h264_vaapi
//!
//! ffmpeg outputs NUT format to stdout. We demux that pipe with ffmpeg-next
//! to get proper AVPackets (keyframe flags, timestamps) without parsing
//! bytestreams. NUT is lighter than mpegts — no TS overhead, exact packet
//! metadata in the container layer.
//!
//! This approach works where the direct VAAPI API path fails: hwmap uses
//! fftools' internal AVFilterGraph.hw_device_ctx (removed from public API
//! in ffmpeg 7+), so X2RGB10LE format negotiation succeeds.
use std::os::fd::AsRawFd;
use std::os::unix::io::RawFd;
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::{Context, Result};
use tracing::{error, info, warn};
use crate::encoder::EncodedPacket;
pub struct SubprocessConfig {
pub device: String,
pub fps: u32,
pub width: u32,
pub height: u32,
pub qp: u32,
pub gop_size: u32,
}
impl Default for SubprocessConfig {
fn default() -> Self {
Self {
device: "/dev/dri/card0".into(),
fps: 30,
width: 1920,
height: 1080,
qp: 20,
gop_size: 30,
}
}
}
/// Run the subprocess pipeline. Blocks until stop is set or ffmpeg exits.
pub fn run(
config: SubprocessConfig,
packet_tx: tokio::sync::mpsc::Sender<EncodedPacket>,
stop: Arc<AtomicBool>,
) -> Result<()> {
let mut child = spawn_ffmpeg(&config).context("spawn ffmpeg")?;
info!("ffmpeg subprocess pid={}", child.id());
// Drain stderr on a separate thread so ffmpeg doesn't block on a full pipe.
let stderr = child.stderr.take().expect("stderr piped");
let stop_on_fatal = stop.clone();
std::thread::Builder::new()
.name("ffmpeg-stderr".into())
.spawn(move || watch_stderr(stderr, stop_on_fatal))
.expect("spawn stderr thread");
// Get the raw fd from stdout before handing it to ffmpeg-next.
// ffmpeg-next takes ownership of the input context but we keep the Child
// alive so the fd stays valid.
let stdout = child.stdout.take().expect("stdout piped");
let fd: RawFd = stdout.as_raw_fd();
// Keep stdout alive for the duration of demuxing.
let _stdout_guard = stdout;
let result = demux_and_send(fd, packet_tx, stop, &mut child);
// Clean up subprocess regardless of result.
kill_child(&mut child);
result
}
fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result<Child> {
let filter = format!(
"hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12,fps={}",
cfg.width, cfg.height, cfg.fps,
);
let child = Command::new("ffmpeg")
.args([
"-init_hw_device", &format!("drm=drm:{}", cfg.device),
"-init_hw_device", "vaapi=va@drm",
"-thread_queue_size", "64",
"-device", &cfg.device,
"-f", "kmsgrab",
"-framerate", &cfg.fps.to_string(),
"-i", "-",
"-vf", &filter,
"-c:v", "h264_vaapi",
"-qp", &cfg.qp.to_string(),
"-g", &cfg.gop_size.to_string(),
"-bf", "0",
"-flush_packets", "1",
"-fflags", "nobuffer",
"-f", "nut",
"pipe:1",
"-hide_banner",
])
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("failed to spawn ffmpeg — is it in PATH?")?;
Ok(child)
}
fn demux_and_send(
fd: RawFd,
packet_tx: tokio::sync::mpsc::Sender<EncodedPacket>,
stop: Arc<AtomicBool>,
child: &mut Child,
) -> Result<()> {
ffmpeg::init().context("ffmpeg init")?;
// Open the NUT stream from the pipe fd.
let pipe_url = format!("pipe:{fd}");
let mut input_ctx = ffmpeg::format::input(&pipe_url)
.context("open ffmpeg input from pipe")?;
let video_stream = input_ctx
.streams()
.best(ffmpeg::media::Type::Video)
.context("no video stream in NUT output")?;
let stream_idx = video_stream.index();
let time_base = video_stream.time_base();
let tb_num = time_base.numerator() as u32;
let tb_den = time_base.denominator() as u32;
info!(
"Subprocess demux ready: stream_idx={}, time_base={}/{}",
stream_idx, tb_num, tb_den
);
let mut packet_count = 0u64;
for (stream, packet) in input_ctx.packets() {
if stop.load(Ordering::Relaxed) {
break;
}
// ffmpeg process died
if let Some(status) = child.try_wait().ok().flatten() {
warn!("ffmpeg exited with {status}");
break;
}
if stream.index() != stream_idx {
continue;
}
let data = match packet.data() {
Some(d) => d.to_vec(),
None => continue,
};
let encoded = EncodedPacket {
data,
pts: packet.pts().unwrap_or(0),
dts: packet.dts().unwrap_or(0),
keyframe: packet.is_key(),
time_base_num: tb_num,
time_base_den: tb_den,
};
packet_count += 1;
if packet_count % 300 == 1 {
info!("Subprocess: {packet_count} packets encoded");
}
if packet_tx.blocking_send(encoded).is_err() {
info!("Packet channel closed, stopping subprocess pipeline");
break;
}
}
info!("Subprocess pipeline stopped ({packet_count} packets)");
Ok(())
}
fn watch_stderr(stderr: std::process::ChildStderr, stop: Arc<AtomicBool>) {
use std::io::{BufRead, BufReader};
const FATAL: &[&str] = &["framebuffer format changed", "Error during demuxing"];
for line in BufReader::new(stderr).lines() {
let line = match line {
Ok(l) => l,
Err(_) => break,
};
if !line.is_empty() {
info!("[ffmpeg] {line}");
}
if FATAL.iter().any(|p| line.contains(p)) {
error!("Fatal ffmpeg error — stopping: {line}");
stop.store(true, Ordering::Relaxed);
}
}
info!("[ffmpeg] stderr closed");
}
fn kill_child(child: &mut Child) {
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
if child.try_wait().ok().flatten().is_some() {
return; // already exited
}
// Send SIGINT to the process group so ffmpeg can flush cleanly.
if let Ok(pgid) = nix::unistd::getpgid(Some(Pid::from_raw(child.id() as i32))) {
let _ = killpg(pgid, Signal::SIGINT);
} else {
child.kill().ok();
}
match child.wait() {
Ok(s) => info!("ffmpeg exited: {s}"),
Err(e) => warn!("ffmpeg wait error: {e}"),
}
}

View File

@@ -49,10 +49,14 @@ impl KmsCapture {
pub fn open(config: &CaptureConfig) -> Result<Self> { pub fn open(config: &CaptureConfig) -> Result<Self> {
ffmpeg::init().context("ffmpeg init")?; ffmpeg::init().context("ffmpeg init")?;
// Set up the kmsgrab input device // Set up the kmsgrab input device.
// Request bgr0 (XRGB8888) explicitly — if the compositor exposes an 8-bit
// plane alongside the 10-bit one, kmsgrab will prefer it. This avoids the
// X2RGB10LE→VAAPI import limitation without changing any system config.
let mut opts = ffmpeg::Dictionary::new(); let mut opts = ffmpeg::Dictionary::new();
opts.set("device", &config.device); opts.set("device", &config.device);
opts.set("framerate", &config.fps.to_string()); opts.set("framerate", &config.fps.to_string());
opts.set("format", "bgr0");
let fmt_name = CString::new("kmsgrab").unwrap(); let fmt_name = CString::new("kmsgrab").unwrap();
let fmt_ptr = unsafe { ffmpeg::ffi::av_find_input_format(fmt_name.as_ptr()) }; let fmt_ptr = unsafe { ffmpeg::ffi::av_find_input_format(fmt_name.as_ptr()) };

View File

@@ -1,14 +1,13 @@
//! VAAPI H.264 hardware encoding. //! VAAPI H.264 hardware encoding via direct frame mapping.
//!
//! Bypasses ffmpeg's filter graph (hwmap has format negotiation bugs with
//! 10-bit DRM formats). Instead uses av_hwframe_map() to map DRM_PRIME
//! frames directly to VAAPI surfaces — same approach as mpv.
//! //!
//! Two-phase initialization: //! Two-phase initialization:
//! - Phase 1 (`VaapiEncoder::new`): store config, no ffmpeg calls. //! - Phase 1 (`VaapiEncoder::new`): store config, no ffmpeg calls.
//! - Phase 2 (first call to `encode`): build filter graph using `hw_frames_ctx` //! - Phase 2 (first call to `encode`): create VAAPI device + frames context
//! from the first DRM_PRIME frame to wire up hwmap, then open the codec. //! using the first DRM_PRIME frame's hw context, open the encoder.
//!
//! The split exists because the filter graph needs the DRM hardware device
//! context that kmsgrab attaches to each frame — that context isn't available
//! until the first frame arrives, so the graph can't be validated at
//! construction time.
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use tracing::info; use tracing::info;
@@ -36,7 +35,7 @@ impl Default for EncoderConfig {
} }
} }
/// VAAPI H.264 encoder with lazy filter graph initialization. /// VAAPI H.264 encoder with lazy initialization.
pub struct VaapiEncoder { pub struct VaapiEncoder {
config: EncoderConfig, config: EncoderConfig,
inner: Option<EncoderInner>, inner: Option<EncoderInner>,
@@ -44,19 +43,25 @@ pub struct VaapiEncoder {
struct EncoderInner { struct EncoderInner {
encoder: ffmpeg::encoder::Video, encoder: ffmpeg::encoder::Video,
filter_graph: ffmpeg::filter::Graph, /// VAAPI frames context for mapping DRM_PRIME → VAAPI (matches DRM sw_format)
vaapi_frames_ref: *mut ffmpeg::ffi::AVBufferRef,
/// scale_vaapi filter graph: converts VAAPI X2RGB10LE → VAAPI NV12
scale_graph: ffmpeg::filter::Graph,
frame_count: u64, frame_count: u64,
time_base: ffmpeg::Rational, time_base: ffmpeg::Rational,
} }
// The raw pointer is only used from the pipeline thread.
unsafe impl Send for EncoderInner {}
impl VaapiEncoder { impl VaapiEncoder {
/// Create an encoder from config. No ffmpeg resources are allocated yet. /// Create an encoder from config. No ffmpeg resources are allocated yet.
pub fn new(config: EncoderConfig) -> Self { pub fn new(config: EncoderConfig) -> Self {
Self { config, inner: None } Self { config, inner: None }
} }
/// Encode a frame. On the first call, initializes the filter graph and /// Encode a frame. On the first call, initializes the VAAPI device and
/// codec using the frame's `hw_frames_ctx`. Returns 0 or more packets. /// encoder using the first DRM_PRIME frame's hw context.
pub fn encode(&mut self, frame: &ffmpeg::frame::Video) -> Result<Vec<EncodedPacket>> { pub fn encode(&mut self, frame: &ffmpeg::frame::Video) -> Result<Vec<EncodedPacket>> {
if self.inner.is_none() { if self.inner.is_none() {
self.inner = Some(EncoderInner::open(&self.config, frame).context("encoder init")?); self.inner = Some(EncoderInner::open(&self.config, frame).context("encoder init")?);
@@ -73,11 +78,140 @@ impl VaapiEncoder {
} }
} }
impl Drop for EncoderInner {
fn drop(&mut self) {
unsafe {
ffmpeg::ffi::av_buffer_unref(&mut self.vaapi_frames_ref);
}
}
}
impl EncoderInner { impl EncoderInner {
fn open(config: &EncoderConfig, first_frame: &ffmpeg::frame::Video) -> Result<Self> { fn open(config: &EncoderConfig, first_frame: &ffmpeg::frame::Video) -> Result<Self> {
let time_base = ffmpeg::Rational::new(1, config.fps as i32); let time_base = ffmpeg::Rational::new(1, config.fps as i32);
let filter_graph = Self::build_filter_graph(config, first_frame)?;
// Derive VAAPI device from the DRM device in the frame's hw context
let vaapi_device_ref = unsafe {
let hw_frames_ctx = (*first_frame.as_ptr()).hw_frames_ctx;
anyhow::ensure!(
!hw_frames_ctx.is_null(),
"first KMS frame has no hw_frames_ctx"
);
let frames_ctx = &*((*hw_frames_ctx).data as *const ffmpeg::ffi::AVHWFramesContext);
let drm_device_ref = frames_ctx.device_ref;
info!(
"DRM frame: {}x{}, sw_format={:?}",
frames_ctx.width, frames_ctx.height, frames_ctx.sw_format
);
let mut vaapi_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut();
let ret = ffmpeg::ffi::av_hwdevice_ctx_create_derived(
&mut vaapi_ref,
ffmpeg::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_VAAPI,
drm_device_ref,
0,
);
anyhow::ensure!(ret >= 0, "derive VAAPI device from DRM: error {ret}");
info!("VAAPI device derived from DRM");
vaapi_ref
};
// Get the DRM sw_format from the first frame
let drm_sw_format = unsafe {
let hw_frames_ctx = (*first_frame.as_ptr()).hw_frames_ctx;
let fctx = &*((*hw_frames_ctx).data as *const ffmpeg::ffi::AVHWFramesContext);
fctx.sw_format
};
// Create VAAPI frames context with matching sw_format so av_hwframe_map
// can import the DMA-BUF without format conversion
let vaapi_frames_ref = unsafe {
let frames_ref = ffmpeg::ffi::av_hwframe_ctx_alloc(vaapi_device_ref);
anyhow::ensure!(!frames_ref.is_null(), "av_hwframe_ctx_alloc failed");
let frames_ctx = &mut *((*frames_ref).data as *mut ffmpeg::ffi::AVHWFramesContext);
frames_ctx.format = ffmpeg::ffi::AVPixelFormat::AV_PIX_FMT_VAAPI;
frames_ctx.sw_format = drm_sw_format; // match DRM buffer (e.g. X2RGB10LE)
frames_ctx.width = config.width as i32;
frames_ctx.height = config.height as i32;
frames_ctx.initial_pool_size = 4;
let ret = ffmpeg::ffi::av_hwframe_ctx_init(frames_ref);
anyhow::ensure!(ret >= 0, "av_hwframe_ctx_init: error {ret}");
info!("VAAPI frames context created: {}x{}, sw_format={:?}", config.width, config.height, drm_sw_format);
frames_ref
};
// Build a VAAPI-only scale graph: VAAPI(drm_sw_format) → scale_vaapi → VAAPI(NV12)
// Both sides are VAAPI so format negotiation is straightforward — no cross-API issues.
let scale_graph = {
let mut graph = ffmpeg::filter::Graph::new();
let args = format!(
"video_size={}x{}:pix_fmt={}:time_base=1/{}:pixel_aspect=1/1",
config.width, config.height,
ffmpeg::format::Pixel::VAAPI as i32,
config.fps,
);
graph.add(&ffmpeg::filter::find("buffer").unwrap(), "in", &args)
.context("add buffersrc")?;
graph.add(&ffmpeg::filter::find("buffersink").unwrap(), "out", "")
.context("add buffersink")?;
let filter_spec = format!(
"scale_vaapi=w={}:h={}:format=nv12",
config.width, config.height,
);
graph.output("in", 0)?.input("out", 0)?.parse(&filter_spec)?;
// Set VAAPI device on all filter contexts
unsafe {
let graph_ptr = graph.as_mut_ptr();
for i in 0..(*graph_ptr).nb_filters {
let fctx = *(*graph_ptr).filters.add(i as usize);
(*fctx).hw_device_ctx = ffmpeg::ffi::av_buffer_ref(vaapi_device_ref);
}
}
// Attach the VAAPI frames context to buffersrc, explicitly marking format=VAAPI
unsafe {
let mut buffersrc = graph.get("in").unwrap();
let par = ffmpeg::ffi::av_buffersrc_parameters_alloc();
anyhow::ensure!(!par.is_null(), "av_buffersrc_parameters_alloc OOM");
(*par).format = ffmpeg::ffi::AVPixelFormat::AV_PIX_FMT_VAAPI as i32;
(*par).hw_frames_ctx = ffmpeg::ffi::av_buffer_ref(vaapi_frames_ref);
let ret = ffmpeg::ffi::av_buffersrc_parameters_set(buffersrc.as_mut_ptr(), par);
ffmpeg::ffi::av_free(par as *mut _);
anyhow::ensure!(ret >= 0, "av_buffersrc_parameters_set: {ret}");
}
graph.validate().context("validate scale_vaapi graph")?;
info!("scale_vaapi filter graph ready");
graph
};
// Get the NV12 VAAPI frames context from the scale graph's output
let nv12_frames_ref = unsafe {
let graph_ptr = scale_graph.as_ptr();
let mut nv12_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut();
for i in 0..(*graph_ptr).nb_filters {
let fctx = *(*graph_ptr).filters.add(i as usize);
let name = std::ffi::CStr::from_ptr((*(*fctx).filter).name).to_str().unwrap_or("");
if name == "buffersink" && (*fctx).nb_inputs > 0 {
let link = *(*fctx).inputs;
if !link.is_null() {
nv12_ref = ffmpeg::ffi::avfilter_link_get_hw_frames_ctx(link);
}
break;
}
}
anyhow::ensure!(!nv12_ref.is_null(), "could not get NV12 frames context from scale graph");
nv12_ref
};
// Open h264_vaapi encoder with the NV12 VAAPI frames context
let codec = ffmpeg::encoder::find_by_name("h264_vaapi") let codec = ffmpeg::encoder::find_by_name("h264_vaapi")
.context("h264_vaapi encoder not found")?; .context("h264_vaapi encoder not found")?;
@@ -93,6 +227,11 @@ impl EncoderInner {
encoder_ctx.set_max_b_frames(0); encoder_ctx.set_max_b_frames(0);
encoder_ctx.set_format(ffmpeg::format::Pixel::VAAPI); encoder_ctx.set_format(ffmpeg::format::Pixel::VAAPI);
unsafe {
let ctx_ptr = encoder_ctx.as_mut_ptr();
(*ctx_ptr).hw_frames_ctx = nv12_frames_ref;
}
let mut opts = ffmpeg::Dictionary::new(); let mut opts = ffmpeg::Dictionary::new();
opts.set("qp", &config.qp.to_string()); opts.set("qp", &config.qp.to_string());
@@ -105,159 +244,68 @@ impl EncoderInner {
config.width, config.height, config.fps, config.qp, config.gop_size, config.width, config.height, config.fps, config.qp, config.gop_size,
); );
Ok(Self { encoder, filter_graph, frame_count: 0, time_base }) unsafe {
ffmpeg::ffi::av_buffer_unref(&mut (vaapi_device_ref as *mut _));
}
Ok(Self {
encoder,
vaapi_frames_ref,
scale_graph,
frame_count: 0,
time_base,
})
} }
/// Build the filter graph: DRM_PRIME → hwmap(vaapi) → scale_vaapi → buffersink. fn encode(&mut self, drm_frame: &ffmpeg::frame::Video) -> Result<Vec<EncodedPacket>> {
/// // Step 1: Map DRM_PRIME → VAAPI surface (zero-copy, same sw_format)
/// Mirrors the ffmpeg CLI approach: let vaapi_frame = unsafe {
/// -init_hw_device drm=drm:/dev/dri/card0 let mut dst = ffmpeg::ffi::av_frame_alloc();
/// -init_hw_device vaapi=va@drm anyhow::ensure!(!dst.is_null(), "av_frame_alloc failed");
/// -filter_hw_device va (*dst).hw_frames_ctx = ffmpeg::ffi::av_buffer_ref(self.vaapi_frames_ref);
///
/// We create both devices explicitly and set the VAAPI device on the graph,
/// then attach the frame's hw_frames_ctx to the buffersrc so hwmap can map
/// DRM_PRIME frames to VAAPI surfaces.
fn build_filter_graph(
config: &EncoderConfig,
first_frame: &ffmpeg::frame::Video,
) -> Result<ffmpeg::filter::Graph> {
let input_width = first_frame.width();
let input_height = first_frame.height();
// Create DRM device, then derive VAAPI from it (like -init_hw_device vaapi=va@drm) let ret = ffmpeg::ffi::av_hwframe_map(
let (drm_device, vaapi_device) = unsafe { dst,
let mut drm_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut(); drm_frame.as_ptr(),
let dev_path = std::ffi::CString::new("/dev/dri/card0").unwrap(); ffmpeg::ffi::AV_HWFRAME_MAP_READ as i32
let ret = ffmpeg::ffi::av_hwdevice_ctx_create( | ffmpeg::ffi::AV_HWFRAME_MAP_DIRECT as i32,
&mut drm_ref,
ffmpeg::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_DRM,
dev_path.as_ptr(),
std::ptr::null_mut(),
0,
); );
anyhow::ensure!(ret >= 0, "av_hwdevice_ctx_create(drm): {ret}"); if ret < 0 {
info!("DRM device created"); ffmpeg::ffi::av_frame_free(&mut dst);
anyhow::bail!("av_hwframe_map DRM_PRIME→VAAPI: error {ret}");
let mut vaapi_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut(); }
let ret = ffmpeg::ffi::av_hwdevice_ctx_create_derived( dst
&mut vaapi_ref,
ffmpeg::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_VAAPI,
drm_ref,
0,
);
anyhow::ensure!(ret >= 0, "av_hwdevice_ctx_create_derived(vaapi): {ret}");
info!("VAAPI device derived from DRM");
(drm_ref, vaapi_ref)
}; };
let mut graph = ffmpeg::filter::Graph::new(); // Step 2: scale_vaapi VAAPI(X2RGB10LE) → VAAPI(NV12)
let args = format!(
"video_size={}x{}:pix_fmt={}:time_base=1/{}:pixel_aspect=1/1",
input_width,
input_height,
ffmpeg::format::Pixel::DRM_PRIME as i32,
config.fps,
);
graph
.add(&ffmpeg::filter::find("buffer").unwrap(), "in", &args)
.context("add buffersrc")?;
graph
.add(&ffmpeg::filter::find("buffersink").unwrap(), "out", "")
.context("add buffersink")?;
let filter_spec = format!(
"hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12",
config.width, config.height,
);
graph.output("in", 0)?.input("out", 0)?.parse(&filter_spec)?;
// Set the VAAPI device on filter contexts that need it
// (equivalent to -filter_hw_device va in the CLI)
unsafe { unsafe {
let graph_ptr = graph.as_mut_ptr(); // Wrap raw AVFrame pointer as ffmpeg::frame::Video without taking ownership
for i in 0..(*graph_ptr).nb_filters { let vaapi_fframe = ffmpeg::frame::Video::wrap(vaapi_frame as *mut _);
let fctx = *(*graph_ptr).filters.add(i as usize); self.scale_graph
(*fctx).hw_device_ctx = ffmpeg::ffi::av_buffer_ref(vaapi_device); .get("in").unwrap()
} .source()
.add(&vaapi_fframe)
.context("scale buffersrc add")?;
// Prevent drop from freeing the frame — we free it below
std::mem::forget(vaapi_fframe);
ffmpeg::ffi::av_frame_free(&mut (vaapi_frame as *mut _));
} }
// Attach hw_frames_ctx from the first DRM_PRIME frame to the buffersrc
unsafe {
let hw_frames_ctx = (*first_frame.as_ptr()).hw_frames_ctx;
anyhow::ensure!(
!hw_frames_ctx.is_null(),
"first KMS frame has no hw_frames_ctx"
);
let mut buffersrc = graph.get("in").unwrap();
let par = ffmpeg::ffi::av_buffersrc_parameters_alloc();
anyhow::ensure!(!par.is_null(), "av_buffersrc_parameters_alloc OOM");
(*par).hw_frames_ctx = ffmpeg::ffi::av_buffer_ref(hw_frames_ctx);
let ret = ffmpeg::ffi::av_buffersrc_parameters_set(buffersrc.as_mut_ptr(), par);
ffmpeg::ffi::av_free(par as *mut _);
anyhow::ensure!(ret >= 0, "av_buffersrc_parameters_set: {ret}");
}
graph.validate().context("validate filter graph")?;
info!("Filter graph ready: {filter_spec}");
// Keep device refs alive — they're ref-counted by the graph now,
// but we drop our refs here.
unsafe {
ffmpeg::ffi::av_buffer_unref(&mut (drm_device as *mut _));
ffmpeg::ffi::av_buffer_unref(&mut (vaapi_device as *mut _));
}
Ok(graph)
}
fn encode(&mut self, frame: &ffmpeg::frame::Video) -> Result<Vec<EncodedPacket>> {
self.filter_graph
.get("in")
.unwrap()
.source()
.add(frame)
.context("buffersrc add")?;
let mut packets = Vec::new(); let mut packets = Vec::new();
let mut filtered = ffmpeg::frame::Video::empty(); let mut scaled = ffmpeg::frame::Video::empty();
while self.scale_graph.get("out").unwrap().sink().frame(&mut scaled).is_ok() {
while self scaled.set_pts(Some(self.frame_count as i64));
.filter_graph
.get("out")
.unwrap()
.sink()
.frame(&mut filtered)
.is_ok()
{
filtered.set_pts(Some(self.frame_count as i64));
self.frame_count += 1; self.frame_count += 1;
self.encoder.send_frame(&filtered)?; // Step 3: encode VAAPI NV12 frame
self.encoder.send_frame(&scaled)?;
let mut encoded = ffmpeg::Packet::empty(); packets.extend(self.receive_packets()?);
while self.encoder.receive_packet(&mut encoded).is_ok() {
packets.push(EncodedPacket {
data: encoded.data().unwrap_or(&[]).to_vec(),
pts: encoded.pts().unwrap_or(0),
dts: encoded.dts().unwrap_or(0),
keyframe: encoded.is_key(),
time_base_num: self.time_base.numerator() as u32,
time_base_den: self.time_base.denominator() as u32,
});
}
} }
Ok(packets) Ok(packets)
} }
fn flush(&mut self) -> Result<Vec<EncodedPacket>> { fn receive_packets(&mut self) -> Result<Vec<EncodedPacket>> {
self.encoder.send_eof()?;
let mut packets = Vec::new(); let mut packets = Vec::new();
let mut encoded = ffmpeg::Packet::empty(); let mut encoded = ffmpeg::Packet::empty();
while self.encoder.receive_packet(&mut encoded).is_ok() { while self.encoder.receive_packet(&mut encoded).is_ok() {
@@ -272,6 +320,11 @@ impl EncoderInner {
} }
Ok(packets) Ok(packets)
} }
fn flush(&mut self) -> Result<Vec<EncodedPacket>> {
self.encoder.send_eof()?;
self.receive_packets()
}
} }
/// An encoded video packet ready for transport. /// An encoded video packet ready for transport.

View File

@@ -1,3 +1,4 @@
pub mod backends;
pub mod capture; pub mod capture;
pub mod encoder; pub mod encoder;
pub mod pipeline; pub mod pipeline;

View File

@@ -7,6 +7,7 @@ use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tracing::info; use tracing::info;
use cht_client::backends::Backend;
use cht_client::capture::CaptureConfig; use cht_client::capture::CaptureConfig;
use cht_client::encoder::EncoderConfig; use cht_client::encoder::EncoderConfig;
use cht_client::pipeline::Pipeline; use cht_client::pipeline::Pipeline;
@@ -17,10 +18,23 @@ const DEFAULT_SERVER: &str = "mcrndeb:4444";
async fn main() -> Result<()> { async fn main() -> Result<()> {
cht_common::logging::init("client"); cht_common::logging::init("client");
let server_addr = std::env::args() let args: Vec<String> = std::env::args().collect();
.nth(1)
let server_addr = args.get(1)
.filter(|a| !a.starts_with('-'))
.cloned()
.unwrap_or_else(|| DEFAULT_SERVER.to_string()); .unwrap_or_else(|| DEFAULT_SERVER.to_string());
let backend = if args.iter().any(|a| a == "--backend" )
&& args.windows(2).any(|w| w[0] == "--backend" && w[1] == "direct")
{
info!("Backend: VaapiDirect (--backend direct)");
Backend::VaapiDirect
} else {
info!("Backend: Subprocess (default)");
Backend::Subprocess
};
info!("Connecting to {server_addr}..."); info!("Connecting to {server_addr}...");
let stream = TcpStream::connect(&server_addr).await?; let stream = TcpStream::connect(&server_addr).await?;
info!("Connected"); info!("Connected");
@@ -51,7 +65,7 @@ async fn main() -> Result<()> {
// Start capture pipeline on a dedicated thread // Start capture pipeline on a dedicated thread
let (packet_tx, mut packet_rx) = tokio::sync::mpsc::channel(64); let (packet_tx, mut packet_rx) = tokio::sync::mpsc::channel(64);
let mut pipeline = Pipeline::start(capture_config, encoder_config, packet_tx); let mut pipeline = Pipeline::start(capture_config, encoder_config, backend, packet_tx);
// Forward encoded packets to the server // Forward encoded packets to the server
let mut video_count = 0u64; let mut video_count = 0u64;

View File

@@ -1,7 +1,8 @@
//! Capture pipeline: ties capture → encode → transport on a dedicated thread. //! Capture pipeline: ties capture → encode → transport on a dedicated thread.
//! //!
//! This is the main loop that runs on a blocking thread, reading frames //! Dispatches to one of two backends:
//! from KMS, encoding with VAAPI, and sending encoded packets through a channel. //! - `Subprocess`: spawn ffmpeg CLI (default, proven GPU path)
//! - `VaapiDirect`: KmsCapture + VaapiEncoder via av_hwframe_map (experimental)
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
@@ -9,6 +10,8 @@ use std::sync::Arc;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use crate::backends::Backend;
use crate::backends::subprocess::SubprocessConfig;
use crate::capture::{CaptureConfig, KmsCapture}; use crate::capture::{CaptureConfig, KmsCapture};
use crate::encoder::{EncodedPacket, EncoderConfig, VaapiEncoder}; use crate::encoder::{EncodedPacket, EncoderConfig, VaapiEncoder};
@@ -26,6 +29,7 @@ impl Pipeline {
pub fn start( pub fn start(
capture_config: CaptureConfig, capture_config: CaptureConfig,
encoder_config: EncoderConfig, encoder_config: EncoderConfig,
backend: Backend,
packet_tx: tokio::sync::mpsc::Sender<EncodedPacket>, packet_tx: tokio::sync::mpsc::Sender<EncodedPacket>,
) -> Self { ) -> Self {
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
@@ -34,7 +38,7 @@ impl Pipeline {
let thread = std::thread::Builder::new() let thread = std::thread::Builder::new()
.name("capture-pipeline".into()) .name("capture-pipeline".into())
.spawn(move || { .spawn(move || {
if let Err(e) = run_pipeline(capture_config, encoder_config, packet_tx, stop_clone) { if let Err(e) = run_pipeline(capture_config, encoder_config, backend, packet_tx, stop_clone) {
error!("Pipeline error: {e:#}"); error!("Pipeline error: {e:#}");
} }
}) })
@@ -62,6 +66,33 @@ impl Drop for Pipeline {
} }
fn run_pipeline( fn run_pipeline(
capture_config: CaptureConfig,
encoder_config: EncoderConfig,
backend: Backend,
packet_tx: tokio::sync::mpsc::Sender<EncodedPacket>,
stop: Arc<AtomicBool>,
) -> Result<()> {
match backend {
Backend::Subprocess => {
info!("Backend: Subprocess (ffmpeg CLI)");
let config = SubprocessConfig {
device: capture_config.device,
fps: capture_config.fps,
width: encoder_config.width,
height: encoder_config.height,
qp: encoder_config.qp,
gop_size: encoder_config.gop_size,
};
crate::backends::subprocess::run(config, packet_tx, stop)
}
Backend::VaapiDirect => {
info!("Backend: VaapiDirect (av_hwframe_map, experimental)");
run_direct_pipeline(capture_config, encoder_config, packet_tx, stop)
}
}
}
fn run_direct_pipeline(
capture_config: CaptureConfig, capture_config: CaptureConfig,
encoder_config: EncoderConfig, encoder_config: EncoderConfig,
packet_tx: tokio::sync::mpsc::Sender<EncodedPacket>, packet_tx: tokio::sync::mpsc::Sender<EncodedPacket>,
@@ -94,7 +125,6 @@ fn run_pipeline(
} }
} }
// Flush encoder
info!("Flushing encoder..."); info!("Flushing encoder...");
let flush_pkts = encoder.flush()?; let flush_pkts = encoder.flush()?;
for pkt in flush_pkts { for pkt in flush_pkts {