diff --git a/media/Cargo.lock b/media/Cargo.lock index fa6f3e8..4be98fa 100644 --- a/media/Cargo.lock +++ b/media/Cargo.lock @@ -17,6 +17,24 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "itertools", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + [[package]] name = "bitflags" version = "2.11.0" @@ -29,6 +47,25 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cc" +version = "1.2.59" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7a4d3ec6524d28a329fc53654bbadc9bdd7b0431f5d65f1a56ffb28a1ee5283" +dependencies = [ + "find-msvc-tools", + "shlex", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -41,6 +78,7 @@ version = "0.1.0" dependencies = [ "anyhow", "cht-common", + "ffmpeg-next", "tokio", "tracing", "tracing-subscriber", @@ -69,6 +107,23 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "errno" version = "0.3.14" @@ -79,6 +134,58 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "ffmpeg-next" +version = "8.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c4bd5ab1ac61f29c634df1175d350ded29cf74c3c6d4f7030431a5ae3c7d5d" +dependencies = [ + "bitflags", + "ffmpeg-sys-next", + "libc", +] + +[[package]] +name = "ffmpeg-sys-next" +version = "8.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a314bc0e022a33a99567ed4bd2576bd58ffd8fcff7891c29194cfecc26a62547" +dependencies = [ + "bindgen", + "cc", + "libc", + "num_cpus", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.18" @@ -97,6 +204,16 @@ version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link", +] + [[package]] name = "lock_api" version = "0.4.14" @@ -127,6 +244,12 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "1.2.0" @@ -138,6 +261,16 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -147,6 +280,16 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -182,6 +325,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "proc-macro2" version = "1.0.106" @@ -209,6 +358,18 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + [[package]] name = "regex-automata" version = "0.4.14" @@ -226,6 +387,12 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rustc-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" + [[package]] name = "scopeguard" version = "1.2.0" @@ -284,6 +451,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.8" @@ -431,6 +604,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" diff --git a/media/client/Cargo.toml b/media/client/Cargo.toml index 440a580..f0253b2 100644 --- a/media/client/Cargo.toml +++ b/media/client/Cargo.toml @@ -9,3 +9,4 @@ tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } anyhow = { workspace = true } +ffmpeg = { package = "ffmpeg-next", version = "8" } diff --git a/media/client/src/capture.rs b/media/client/src/capture.rs new file mode 100644 index 0000000..d4cb60b --- /dev/null +++ b/media/client/src/capture.rs @@ -0,0 +1,128 @@ +//! Screen capture via KMS/DRM using ffmpeg's kmsgrab device. +//! +//! Opens `/dev/dri/card0` via ffmpeg's kmsgrab input format, +//! reads DRM/DMA-BUF frames at the compositor's refresh rate, +//! and decimates to the target FPS. + +use std::ffi::CString; + +use anyhow::{Context, Result}; +use tracing::info; + +/// Configuration for KMS screen capture. +pub struct CaptureConfig { + /// DRM device path (e.g. "/dev/dri/card0") + pub device: String, + /// Target framerate (source frames are decimated to this) + pub fps: u32, + /// Output width (0 = native) + pub width: u32, + /// Output height (0 = native) + pub height: u32, +} + +impl Default for CaptureConfig { + fn default() -> Self { + Self { + device: "/dev/dri/card0".into(), + fps: 30, + width: 1920, + height: 1080, + } + } +} + +/// KMS screen capture source. +/// +/// Uses ffmpeg's kmsgrab input device. Requires DRM master or root. +/// Outputs DRM_PRIME frames that can be mapped to VAAPI for zero-copy encode. +pub struct KmsCapture { + input_ctx: ffmpeg::format::context::Input, + video_stream_idx: usize, + decoder: ffmpeg::decoder::Video, +} + +impl KmsCapture { + /// Open KMS capture with the given config. + /// + /// This is a blocking call (ffmpeg device open). Run on a dedicated thread. + pub fn open(config: &CaptureConfig) -> Result { + ffmpeg::init().context("ffmpeg init")?; + + // Set up the kmsgrab input device + let mut opts = ffmpeg::Dictionary::new(); + opts.set("device", &config.device); + opts.set("framerate", &config.fps.to_string()); + + let fmt_name = CString::new("kmsgrab").unwrap(); + let fmt_ptr = unsafe { ffmpeg::ffi::av_find_input_format(fmt_name.as_ptr()) }; + anyhow::ensure!( + !fmt_ptr.is_null(), + "kmsgrab input format not found — is ffmpeg built with --enable-libdrm?" + ); + let fmt = unsafe { ffmpeg::format::Input::wrap(fmt_ptr as *mut _) }; + + let input_ctx = ffmpeg::format::open_with("-", &ffmpeg::Format::Input(fmt), opts) + .context("failed to open kmsgrab device")? + .input(); + + let video_stream = input_ctx + .streams() + .best(ffmpeg::media::Type::Video) + .context("no video stream from kmsgrab")?; + + let video_stream_idx = video_stream.index(); + + // Set up decoder with DRM hardware context + let decoder_ctx = ffmpeg::codec::context::Context::from_parameters(video_stream.parameters()) + .context("decoder context from parameters")?; + + let decoder = decoder_ctx.decoder().video().context("open video decoder")?; + + info!( + "KMS capture opened: {}x{} @ {}fps, stream idx={}", + decoder.width(), + decoder.height(), + config.fps, + video_stream_idx, + ); + + Ok(Self { + input_ctx, + video_stream_idx, + decoder, + }) + } + + /// Read the next raw (DRM_PRIME) frame from the capture device. + /// + /// Blocking — call from a dedicated thread. + /// Returns None at EOF (shouldn't happen for live capture). + pub fn next_frame(&mut self) -> Result> { + loop { + match self.input_ctx.packets().next() { + Some((stream, packet)) => { + if stream.index() != self.video_stream_idx { + continue; + } + self.decoder.send_packet(&packet)?; + let mut frame = ffmpeg::frame::Video::empty(); + match self.decoder.receive_frame(&mut frame) { + Ok(()) => return Ok(Some(frame)), + Err(ffmpeg::Error::Other { errno: ffmpeg::error::EAGAIN }) => continue, + Err(e) => return Err(e.into()), + } + } + None => return Ok(None), + } + } + } + + pub fn width(&self) -> u32 { + self.decoder.width() + } + + pub fn height(&self) -> u32 { + self.decoder.height() + } +} diff --git a/media/client/src/encoder.rs b/media/client/src/encoder.rs new file mode 100644 index 0000000..073c299 --- /dev/null +++ b/media/client/src/encoder.rs @@ -0,0 +1,285 @@ +//! VAAPI H.264 hardware encoding. +//! +//! Two-phase initialization: +//! - Phase 1 (`VaapiEncoder::new`): store config, no ffmpeg calls. +//! - Phase 2 (first call to `encode`): build filter graph using `hw_frames_ctx` +//! from the first DRM_PRIME frame to wire up hwmap, then open the codec. +//! +//! The split exists because the filter graph needs the DRM hardware device +//! context that kmsgrab attaches to each frame — that context isn't available +//! until the first frame arrives, so the graph can't be validated at +//! construction time. + +use anyhow::{Context, Result}; +use tracing::info; + +/// Encoding configuration. +pub struct EncoderConfig { + pub width: u32, + pub height: u32, + pub fps: u32, + /// Quantization parameter (lower = higher quality). Default: 20 + pub qp: u32, + /// Keyframe interval in frames. Default: 30 (1 keyframe/sec at 30fps) + pub gop_size: u32, +} + +impl Default for EncoderConfig { + fn default() -> Self { + Self { + width: 1920, + height: 1080, + fps: 30, + qp: 20, + gop_size: 30, + } + } +} + +/// VAAPI H.264 encoder with lazy filter graph initialization. +pub struct VaapiEncoder { + config: EncoderConfig, + inner: Option, +} + +struct EncoderInner { + encoder: ffmpeg::encoder::Video, + filter_graph: ffmpeg::filter::Graph, + frame_count: u64, + time_base: ffmpeg::Rational, +} + +impl VaapiEncoder { + /// Create an encoder from config. No ffmpeg resources are allocated yet. + pub fn new(config: EncoderConfig) -> Self { + Self { config, inner: None } + } + + /// Encode a frame. On the first call, initializes the filter graph and + /// codec using the frame's `hw_frames_ctx`. Returns 0 or more packets. + pub fn encode(&mut self, frame: &ffmpeg::frame::Video) -> Result> { + if self.inner.is_none() { + self.inner = Some(EncoderInner::open(&self.config, frame).context("encoder init")?); + } + self.inner.as_mut().unwrap().encode(frame) + } + + /// Flush remaining packets out of the encoder. + pub fn flush(&mut self) -> Result> { + match self.inner.as_mut() { + Some(inner) => inner.flush(), + None => Ok(vec![]), + } + } +} + +impl EncoderInner { + fn open(config: &EncoderConfig, first_frame: &ffmpeg::frame::Video) -> Result { + let time_base = ffmpeg::Rational::new(1, config.fps as i32); + let filter_graph = Self::build_filter_graph(config, first_frame)?; + + let codec = ffmpeg::encoder::find_by_name("h264_vaapi") + .context("h264_vaapi encoder not found")?; + + let mut encoder_ctx = ffmpeg::codec::context::Context::new_with_codec(codec) + .encoder() + .video()?; + + encoder_ctx.set_width(config.width); + encoder_ctx.set_height(config.height); + encoder_ctx.set_time_base(time_base); + encoder_ctx.set_frame_rate(Some(ffmpeg::Rational::new(config.fps as i32, 1))); + encoder_ctx.set_gop(config.gop_size); + encoder_ctx.set_max_b_frames(0); + encoder_ctx.set_format(ffmpeg::format::Pixel::VAAPI); + + let mut opts = ffmpeg::Dictionary::new(); + opts.set("qp", &config.qp.to_string()); + + let encoder = encoder_ctx + .open_with(opts) + .context("open h264_vaapi encoder")?; + + info!( + "VAAPI encoder opened: {}x{} @ {}fps, qp={}, gop={}", + config.width, config.height, config.fps, config.qp, config.gop_size, + ); + + Ok(Self { encoder, filter_graph, frame_count: 0, time_base }) + } + + /// Build the filter graph: DRM_PRIME → hwmap(vaapi) → scale_vaapi → buffersink. + /// + /// Mirrors the ffmpeg CLI approach: + /// -init_hw_device drm=drm:/dev/dri/card0 + /// -init_hw_device vaapi=va@drm + /// -filter_hw_device va + /// + /// We create both devices explicitly and set the VAAPI device on the graph, + /// then attach the frame's hw_frames_ctx to the buffersrc so hwmap can map + /// DRM_PRIME frames to VAAPI surfaces. + fn build_filter_graph( + config: &EncoderConfig, + first_frame: &ffmpeg::frame::Video, + ) -> Result { + let input_width = first_frame.width(); + let input_height = first_frame.height(); + + // Create DRM device, then derive VAAPI from it (like -init_hw_device vaapi=va@drm) + let (drm_device, vaapi_device) = unsafe { + let mut drm_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut(); + let dev_path = std::ffi::CString::new("/dev/dri/card0").unwrap(); + let ret = ffmpeg::ffi::av_hwdevice_ctx_create( + &mut drm_ref, + ffmpeg::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_DRM, + dev_path.as_ptr(), + std::ptr::null_mut(), + 0, + ); + anyhow::ensure!(ret >= 0, "av_hwdevice_ctx_create(drm): {ret}"); + info!("DRM device created"); + + let mut vaapi_ref: *mut ffmpeg::ffi::AVBufferRef = std::ptr::null_mut(); + let ret = ffmpeg::ffi::av_hwdevice_ctx_create_derived( + &mut vaapi_ref, + ffmpeg::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_VAAPI, + drm_ref, + 0, + ); + anyhow::ensure!(ret >= 0, "av_hwdevice_ctx_create_derived(vaapi): {ret}"); + info!("VAAPI device derived from DRM"); + + (drm_ref, vaapi_ref) + }; + + let mut graph = ffmpeg::filter::Graph::new(); + + let args = format!( + "video_size={}x{}:pix_fmt={}:time_base=1/{}:pixel_aspect=1/1", + input_width, + input_height, + ffmpeg::format::Pixel::DRM_PRIME as i32, + config.fps, + ); + + graph + .add(&ffmpeg::filter::find("buffer").unwrap(), "in", &args) + .context("add buffersrc")?; + graph + .add(&ffmpeg::filter::find("buffersink").unwrap(), "out", "") + .context("add buffersink")?; + + let filter_spec = format!( + "hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12", + config.width, config.height, + ); + + graph.output("in", 0)?.input("out", 0)?.parse(&filter_spec)?; + + // Set the VAAPI device on filter contexts that need it + // (equivalent to -filter_hw_device va in the CLI) + unsafe { + let graph_ptr = graph.as_mut_ptr(); + for i in 0..(*graph_ptr).nb_filters { + let fctx = *(*graph_ptr).filters.add(i as usize); + (*fctx).hw_device_ctx = ffmpeg::ffi::av_buffer_ref(vaapi_device); + } + } + + // Attach hw_frames_ctx from the first DRM_PRIME frame to the buffersrc + unsafe { + let hw_frames_ctx = (*first_frame.as_ptr()).hw_frames_ctx; + anyhow::ensure!( + !hw_frames_ctx.is_null(), + "first KMS frame has no hw_frames_ctx" + ); + + let mut buffersrc = graph.get("in").unwrap(); + let par = ffmpeg::ffi::av_buffersrc_parameters_alloc(); + anyhow::ensure!(!par.is_null(), "av_buffersrc_parameters_alloc OOM"); + (*par).hw_frames_ctx = ffmpeg::ffi::av_buffer_ref(hw_frames_ctx); + let ret = ffmpeg::ffi::av_buffersrc_parameters_set(buffersrc.as_mut_ptr(), par); + ffmpeg::ffi::av_free(par as *mut _); + anyhow::ensure!(ret >= 0, "av_buffersrc_parameters_set: {ret}"); + } + + graph.validate().context("validate filter graph")?; + info!("Filter graph ready: {filter_spec}"); + + // Keep device refs alive — they're ref-counted by the graph now, + // but we drop our refs here. + unsafe { + ffmpeg::ffi::av_buffer_unref(&mut (drm_device as *mut _)); + ffmpeg::ffi::av_buffer_unref(&mut (vaapi_device as *mut _)); + } + + Ok(graph) + } + + fn encode(&mut self, frame: &ffmpeg::frame::Video) -> Result> { + self.filter_graph + .get("in") + .unwrap() + .source() + .add(frame) + .context("buffersrc add")?; + + let mut packets = Vec::new(); + let mut filtered = ffmpeg::frame::Video::empty(); + + while self + .filter_graph + .get("out") + .unwrap() + .sink() + .frame(&mut filtered) + .is_ok() + { + filtered.set_pts(Some(self.frame_count as i64)); + self.frame_count += 1; + + self.encoder.send_frame(&filtered)?; + + let mut encoded = ffmpeg::Packet::empty(); + while self.encoder.receive_packet(&mut encoded).is_ok() { + packets.push(EncodedPacket { + data: encoded.data().unwrap_or(&[]).to_vec(), + pts: encoded.pts().unwrap_or(0), + dts: encoded.dts().unwrap_or(0), + keyframe: encoded.is_key(), + time_base_num: self.time_base.numerator() as u32, + time_base_den: self.time_base.denominator() as u32, + }); + } + } + + Ok(packets) + } + + fn flush(&mut self) -> Result> { + self.encoder.send_eof()?; + let mut packets = Vec::new(); + let mut encoded = ffmpeg::Packet::empty(); + while self.encoder.receive_packet(&mut encoded).is_ok() { + packets.push(EncodedPacket { + data: encoded.data().unwrap_or(&[]).to_vec(), + pts: encoded.pts().unwrap_or(0), + dts: encoded.dts().unwrap_or(0), + keyframe: encoded.is_key(), + time_base_num: self.time_base.numerator() as u32, + time_base_den: self.time_base.denominator() as u32, + }); + } + Ok(packets) + } +} + +/// An encoded video packet ready for transport. +pub struct EncodedPacket { + pub data: Vec, + pub pts: i64, + pub dts: i64, + pub keyframe: bool, + pub time_base_num: u32, + pub time_base_den: u32, +} diff --git a/media/client/src/lib.rs b/media/client/src/lib.rs new file mode 100644 index 0000000..229f3e6 --- /dev/null +++ b/media/client/src/lib.rs @@ -0,0 +1,3 @@ +pub mod capture; +pub mod encoder; +pub mod pipeline; diff --git a/media/client/src/main.rs b/media/client/src/main.rs index 78201a7..06c7d0a 100644 --- a/media/client/src/main.rs +++ b/media/client/src/main.rs @@ -7,6 +7,10 @@ use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::net::TcpStream; use tracing::info; +use cht_client::capture::CaptureConfig; +use cht_client::encoder::EncoderConfig; +use cht_client::pipeline::Pipeline; + const DEFAULT_SERVER: &str = "mcrndeb:4444"; #[tokio::main] @@ -23,14 +27,17 @@ async fn main() -> Result<()> { let mut writer = BufWriter::new(stream); + let capture_config = CaptureConfig::default(); + let encoder_config = EncoderConfig::default(); + // Send session_start let session_start = ControlMessage::SessionStart { - id: chrono_session_id(), + id: session_id(), video: VideoParams { - width: 1920, - height: 1080, + width: encoder_config.width, + height: encoder_config.height, codec: "h264".into(), - fps: 30, + fps: encoder_config.fps, }, audio: AudioParams { sample_rate: 48000, @@ -39,60 +46,80 @@ async fn main() -> Result<()> { }, }; protocol::write_packet(&mut writer, &session_start.to_wire_packet()?).await?; + writer.flush().await?; info!("Sent session_start"); - // Send test packets (placeholder — will be replaced by real capture) - let frame_interval_ns = 33_333_333u64; // ~30fps - for i in 0u64..300 { - let ts = i * frame_interval_ns; - let keyframe = i % 30 == 0; + // Start capture pipeline on a dedicated thread + let (packet_tx, mut packet_rx) = tokio::sync::mpsc::channel(64); + let mut pipeline = Pipeline::start(capture_config, encoder_config, packet_tx); - // Fake video packet - let video = WirePacket { - header: PacketHeader { - packet_type: PacketType::Video, - flags: if keyframe { FLAG_KEYFRAME } else { 0 }, - length: 1024, - timestamp_ns: ts, - }, - payload: vec![0u8; 1024], - }; - protocol::write_packet(&mut writer, &video).await?; + // Forward encoded packets to the server + let mut video_count = 0u64; + let mut keepalive_interval = tokio::time::interval(std::time::Duration::from_secs(5)); - // Fake audio packet every 3 video frames - if i % 3 == 0 { - let audio = WirePacket { - header: PacketHeader { - packet_type: PacketType::Audio, - flags: 0, - length: 512, - timestamp_ns: ts, - }, - payload: vec![0u8; 512], - }; - protocol::write_packet(&mut writer, &audio).await?; + loop { + tokio::select! { + pkt = packet_rx.recv() => { + match pkt { + Some(encoded) => { + let wire = WirePacket { + header: PacketHeader { + packet_type: PacketType::Video, + flags: if encoded.keyframe { FLAG_KEYFRAME } else { 0 }, + length: encoded.data.len() as u32, + timestamp_ns: pts_to_ns( + encoded.pts, + encoded.time_base_num, + encoded.time_base_den, + ), + }, + payload: encoded.data, + }; + protocol::write_packet(&mut writer, &wire).await?; + video_count += 1; + + if video_count % 300 == 1 { + info!("Sent {video_count} video packets"); + writer.flush().await?; + } + } + None => { + info!("Pipeline channel closed"); + break; + } + } + } + _ = keepalive_interval.tick() => { + let keepalive = ControlMessage::Keepalive; + protocol::write_packet(&mut writer, &keepalive.to_wire_packet()?).await?; + writer.flush().await?; + } + _ = tokio::signal::ctrl_c() => { + info!("Ctrl+C received, stopping..."); + break; + } } - - // Keepalive every 150 frames (~5s) - if i % 150 == 0 && i > 0 { - let keepalive = ControlMessage::Keepalive; - protocol::write_packet(&mut writer, &keepalive.to_wire_packet()?).await?; - } - - tokio::time::sleep(std::time::Duration::from_nanos(frame_interval_ns)).await; } - // Send session_stop and flush + pipeline.stop(); + let stop = ControlMessage::SessionStop; protocol::write_packet(&mut writer, &stop.to_wire_packet()?).await?; writer.flush().await?; writer.shutdown().await?; - info!("Sent session_stop, done"); + info!("Sent session_stop, {video_count} video packets total"); Ok(()) } -fn chrono_session_id() -> String { +fn pts_to_ns(pts: i64, tb_num: u32, tb_den: u32) -> u64 { + if tb_den == 0 { + return 0; + } + ((pts as u128 * tb_num as u128 * 1_000_000_000) / tb_den as u128) as u64 +} + +fn session_id() -> String { use std::time::{SystemTime, UNIX_EPOCH}; let secs = SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/media/client/src/pipeline.rs b/media/client/src/pipeline.rs new file mode 100644 index 0000000..ee0ec66 --- /dev/null +++ b/media/client/src/pipeline.rs @@ -0,0 +1,106 @@ +//! Capture pipeline: ties capture → encode → transport on a dedicated thread. +//! +//! This is the main loop that runs on a blocking thread, reading frames +//! from KMS, encoding with VAAPI, and sending encoded packets through a channel. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use tracing::{error, info, warn}; + +use crate::capture::{CaptureConfig, KmsCapture}; +use crate::encoder::{EncodedPacket, EncoderConfig, VaapiEncoder}; + +/// A running capture pipeline that produces encoded packets. +pub struct Pipeline { + thread: Option>, + stop: Arc, +} + +impl Pipeline { + /// Start the capture → encode pipeline on a dedicated thread. + /// + /// Encoded packets are sent through `packet_tx`. + /// The pipeline runs until `stop()` is called or an error occurs. + pub fn start( + capture_config: CaptureConfig, + encoder_config: EncoderConfig, + packet_tx: tokio::sync::mpsc::Sender, + ) -> Self { + let stop = Arc::new(AtomicBool::new(false)); + let stop_clone = stop.clone(); + + let thread = std::thread::Builder::new() + .name("capture-pipeline".into()) + .spawn(move || { + if let Err(e) = run_pipeline(capture_config, encoder_config, packet_tx, stop_clone) { + error!("Pipeline error: {e:#}"); + } + }) + .expect("spawn capture thread"); + + Self { + thread: Some(thread), + stop, + } + } + + /// Signal the pipeline to stop and wait for the thread to finish. + pub fn stop(&mut self) { + self.stop.store(true, Ordering::Relaxed); + if let Some(handle) = self.thread.take() { + let _ = handle.join(); + } + } +} + +impl Drop for Pipeline { + fn drop(&mut self) { + self.stop(); + } +} + +fn run_pipeline( + capture_config: CaptureConfig, + encoder_config: EncoderConfig, + packet_tx: tokio::sync::mpsc::Sender, + stop: Arc, +) -> Result<()> { + info!("Opening KMS capture..."); + let mut capture = KmsCapture::open(&capture_config).context("open capture")?; + + info!("Encoder ready (will initialize on first frame)"); + let mut encoder = VaapiEncoder::new(encoder_config); + + info!("Pipeline running"); + + while !stop.load(Ordering::Relaxed) { + let frame = match capture.next_frame()? { + Some(f) => f, + None => { + warn!("Capture returned EOF"); + break; + } + }; + + let packets = encoder.encode(&frame).context("encode frame")?; + + for pkt in packets { + if packet_tx.blocking_send(pkt).is_err() { + info!("Packet channel closed, stopping pipeline"); + return Ok(()); + } + } + } + + // Flush encoder + info!("Flushing encoder..."); + let flush_pkts = encoder.flush()?; + for pkt in flush_pkts { + let _ = packet_tx.blocking_send(pkt); + } + + info!("Pipeline stopped"); + Ok(()) +} diff --git a/media/ctrl/build.sh b/media/ctrl/build.sh new file mode 100755 index 0000000..357ccfe --- /dev/null +++ b/media/ctrl/build.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Build media transport targets +# Usage: ./build.sh [client|server|all] [--release] +# client build cht-client (default) +# server build cht-server +# all build both +set -euo pipefail + +MEDIA_DIR="$(cd "$(dirname "$0")/.." && pwd)" +CARGO="${CARGO:-$HOME/.cargo/bin/cargo}" + +TARGET="${1:-client}" +shift || true + +CARGO_FLAGS=() +for arg in "$@"; do + CARGO_FLAGS+=("$arg") +done + +LOG_DIR="$MEDIA_DIR/logs" +mkdir -p "$LOG_DIR" + +build() { + local pkg="$1" + local log="$LOG_DIR/build-$pkg.log" + echo "==> building $pkg ${CARGO_FLAGS[*]+"${CARGO_FLAGS[@]}"}" + "$CARGO" build -p "cht-$pkg" "${CARGO_FLAGS[@]}" --manifest-path "$MEDIA_DIR/Cargo.toml" 2>&1 | tee "$log" + return "${PIPESTATUS[0]}" +} + +case "$TARGET" in + client) build client ;; + server) build server ;; + all) build client || true; build server ;; + *) echo "Usage: $0 [client|server|all] [--release]" >&2; exit 1 ;; +esac diff --git a/media/ctrl/client.sh b/media/ctrl/client.sh new file mode 100755 index 0000000..3209995 --- /dev/null +++ b/media/ctrl/client.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Build and run the media client (sender) +# Requires DRM master access — runs under sudo unless already root. +# Usage: ./client.sh [server_addr] e.g. ./client.sh mcrndeb:4444 +set -euo pipefail + +MEDIA_DIR="$(cd "$(dirname "$0")/.." && pwd)" +CARGO="${CARGO:-$HOME/.cargo/bin/cargo}" + +LOG_DIR="$MEDIA_DIR/logs" +mkdir -p "$LOG_DIR" +"$CARGO" build -p cht-client --manifest-path "$MEDIA_DIR/Cargo.toml" 2>&1 | tee "$LOG_DIR/build-client.log" +if [ "${PIPESTATUS[0]}" -ne 0 ]; then exit 1; fi + +BIN="$MEDIA_DIR/target/debug/cht-client" + +if [ "$(id -u)" -ne 0 ]; then + exec sudo "$BIN" "$@" +else + exec "$BIN" "$@" +fi diff --git a/media/ctrl/docs.sh b/media/ctrl/docs.sh new file mode 100755 index 0000000..39162a6 --- /dev/null +++ b/media/ctrl/docs.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Re-render all Graphviz diagrams to SVG. +# Run this after each phase when .dot files are updated. +# Usage: ./docs.sh +set -euo pipefail + +DOCS_DIR="$(cd "$(dirname "$0")/../docs" && pwd)" + +if ! command -v dot &>/dev/null; then + echo "graphviz not found — install with: sudo apt install graphviz" >&2 + exit 1 +fi + +for f in "$DOCS_DIR"/*.dot; do + svg="${f%.dot}.svg" + echo "==> $(basename "$f") → $(basename "$svg")" + dot -Tsvg "$f" -o "$svg" +done + +echo "==> done. Serving at http://localhost:9099 (ctrl-c to stop)" +cd "$DOCS_DIR" && python3 -m http.server 9099 diff --git a/media/ctrl/server.sh b/media/ctrl/server.sh new file mode 100755 index 0000000..ad37b05 --- /dev/null +++ b/media/ctrl/server.sh @@ -0,0 +1,15 @@ +#!/bin/bash +# Build and run the media server (receiver). +# Run this on mcrndeb directly. +# Usage: ./server.sh [port] +set -euo pipefail + +MEDIA_DIR="$(cd "$(dirname "$0")/.." && pwd)" +CARGO="${CARGO:-$HOME/.cargo/bin/cargo}" + +LOG_DIR="$MEDIA_DIR/logs" +mkdir -p "$LOG_DIR" +"$CARGO" build -p cht-server --manifest-path "$MEDIA_DIR/Cargo.toml" 2>&1 | tee "$LOG_DIR/build-server.log" +if [ "${PIPESTATUS[0]}" -ne 0 ]; then exit 1; fi + +exec "$MEDIA_DIR/target/debug/cht-server" "$@" diff --git a/media/docs/client-pipeline.dot b/media/docs/client-pipeline.dot new file mode 100644 index 0000000..d38f688 --- /dev/null +++ b/media/docs/client-pipeline.dot @@ -0,0 +1,51 @@ +// Client pipeline data flow — Phase 2 +// Sender machine (Wayland, VAAPI GPU) +digraph client_pipeline { + graph [fontname="monospace" bgcolor="#1e1e2e" rankdir=TB pad="0.6" splines=polyline] + node [fontname="monospace" fontcolor="#cdd6f4" style=filled shape=box + fillcolor="#313244" color="#585b70" margin="0.25,0.12"] + edge [color="#585b70" fontname="monospace" fontcolor="#a6adc8" labelfontname="monospace"] + + // Hardware + drm [label="/dev/dri/card0\n(KMS scanout)" shape=cylinder fillcolor="#1e3a2f" color="#a6e3a1"] + vaapi [label="/dev/dri/renderD128\n(VAAPI)" shape=cylinder fillcolor="#1e3a2f" color="#a6e3a1"] + net [label="TCP :4444\nmcrndeb" shape=parallelogram fillcolor="#1e2a3e" color="#89b4fa"] + + // Thread boundary + subgraph cluster_main { + label="main thread (tokio async)" fontcolor="#a6adc8" color="#45475a" fontname="monospace" + + session_start [label="session_start\ncontrol message" fillcolor="#2d2038" color="#cba6f7"] + mux [label="select!\npkt_rx | keepalive | ctrl-c" fillcolor="#2d2038" color="#cba6f7"] + keepalive [label="keepalive / 5s" fillcolor="#2d2038" color="#cba6f7"] + write [label="BufWriter\nwrite_packet()" fillcolor="#1e2d3e" color="#89b4fa"] + } + + subgraph cluster_pipeline { + label="capture-pipeline thread (blocking)" fontcolor="#a6adc8" color="#45475a" fontname="monospace" + + capture [label="KmsCapture\n─────────────────\nffmpeg kmsgrab device\ndecoder: passthrough\noutput: DRM_PRIME frames\n+ hw_frames_ctx (DRM device)" + fillcolor="#1e2d3e" color="#89b4fa"] + + encoder [label="VaapiEncoder\n─────────────────\n[lazy init on frame 1]\nbuffersrc ← hw_frames_ctx\nhwmap derive_device=vaapi\nscale_vaapi NV12 1920×1080\nh264_vaapi QP=20 GOP=30" + fillcolor="#1e2d3e" color="#89b4fa"] + + chan [label="mpsc::channel(64)\nEncodedPacket" shape=parallelogram fillcolor="#2d2038" color="#cba6f7"] + } + + // Flow + drm -> capture [label="DMA-BUF\n(zero copy)"] + vaapi -> encoder [label="hw device\n(derived)" style=dashed color="#a6e3a1"] + capture -> encoder [label="AVFrame\nDRM_PRIME"] + encoder -> chan [label="EncodedPacket\n{ data, pts, keyframe, … }"] + chan -> mux + session_start -> write + mux -> write [label="WirePacket"] + mux -> keepalive [style=dashed] + keepalive -> write + write -> net + + // Types note + types [label="EncodedPacket\n─────────────\ndata: Vec\ (H.264 NALUs)\npts / dts: i64\nkeyframe: bool\ntime_base: num/den" + shape=note fillcolor="#2a2a3e" color="#585b70"] +} diff --git a/media/docs/client-pipeline.svg b/media/docs/client-pipeline.svg new file mode 100644 index 0000000..a1f7014 --- /dev/null +++ b/media/docs/client-pipeline.svg @@ -0,0 +1,185 @@ + + + + + + +client_pipeline + + +cluster_main + +main thread  (tokio async) + + +cluster_pipeline + +capture-pipeline thread  (blocking) + + + +drm + + +/dev/dri/card0 +(KMS scanout) + + + +capture + +KmsCapture +───────────────── +ffmpeg kmsgrab device +decoder: passthrough +output: DRM_PRIME frames ++ hw_frames_ctx (DRM device) + + + +drm->capture + + +DMA-BUF +(zero copy) + + + +vaapi + + +/dev/dri/renderD128 +(VAAPI) + + + +encoder + +VaapiEncoder +───────────────── +[lazy init on frame 1] +buffersrc ← hw_frames_ctx +hwmap derive_device=vaapi +scale_vaapi NV12 1920×1080 +h264_vaapi QP=20 GOP=30 + + + +vaapi->encoder + + +hw device +(derived) + + + +net + +TCP :4444 +mcrndeb + + + +session_start + +session_start +control message + + + +write + +BufWriter +write_packet() + + + +session_start->write + + + + + +mux + +select! +pkt_rx  |  keepalive  |  ctrl-c + + + +keepalive + +keepalive / 5s + + + +mux->keepalive + + + + + +mux->write + + +WirePacket + + + +keepalive->write + + + + + +write->net + + + + + +capture->encoder + + +AVFrame +DRM_PRIME + + + +chan + +mpsc::channel(64) +EncodedPacket + + + +encoder->chan + + +EncodedPacket +{ data, pts, keyframe, … } + + + +chan->mux + + + + + +types + + + +EncodedPacket +───────────── +data: Vec<u8>  (H.264 NALUs) +pts / dts: i64 +keyframe: bool +time_base: num/den + + + diff --git a/media/docs/crates.dot b/media/docs/crates.dot new file mode 100644 index 0000000..cbb8e92 --- /dev/null +++ b/media/docs/crates.dot @@ -0,0 +1,51 @@ +// Cargo workspace crate dependency graph +// Phase 2: client capture + encode implemented; server is a stub +digraph crates { + graph [fontname="monospace" bgcolor="#1e1e2e" pad="0.5"] + node [fontname="monospace" fontcolor="#cdd6f4" style=filled shape=box + fillcolor="#313244" color="#585b70" margin="0.2,0.1"] + edge [color="#585b70" fontname="monospace" fontcolor="#a6adc8"] + + // External + ffmpeg_next [label="ffmpeg-next 8\n(ffmpeg-sys-next)" shape=component fillcolor="#1e3a2f" color="#a6e3a1"] + tokio [label="tokio 1\n(async runtime)" shape=component fillcolor="#1e2a3e" color="#89b4fa"] + serde [label="serde / serde_json" shape=component fillcolor="#2a2a3e" color="#cba6f7"] + tracing [label="tracing\ntracing-subscriber" shape=component fillcolor="#2a2a3e" color="#cba6f7"] + anyhow [label="anyhow" shape=component fillcolor="#2a2a3e" color="#cba6f7"] + + // Workspace crates + common [label="cht-common\n─────────────\nprotocol.rs (wire framing)\nframe.rs (Frame, AudioBuffer)\nlogging.rs" + fillcolor="#2d2038" color="#cba6f7"] + + client [label="cht-client [sender, Wayland]\n─────────────────────────────\ncapture.rs KMS/DRM → DRM_PRIME frames\nencoder.rs VAAPI H.264 (lazy init)\npipeline.rs capture→encode thread\nmain.rs TCP transport + keepalive" + fillcolor="#1e2d3e" color="#89b4fa"] + + server [label="cht-server [receiver, mcrn]\n─────────────────────────────\nmain.rs TCP listener (stub)\n counts packets, no decode yet" + fillcolor="#2d1e1e" color="#f38ba8"] + + // Deps + client -> common + server -> common + + common -> serde + common -> tokio + common -> tracing + common -> anyhow + + client -> ffmpeg_next + client -> tokio + client -> tracing + client -> anyhow + + server -> tokio + server -> tracing + server -> anyhow + + // Legend + subgraph cluster_legend { + label="Legend" fontcolor="#a6adc8" color="#585b70" fontname="monospace" + l1 [label="implemented" fillcolor="#1e2d3e" color="#89b4fa" shape=box] + l2 [label="stub / planned" fillcolor="#2d1e1e" color="#f38ba8" shape=box] + l3 [label="external crate" fillcolor="#1e3a2f" color="#a6e3a1" shape=component] + } +} diff --git a/media/docs/crates.svg b/media/docs/crates.svg new file mode 100644 index 0000000..d2188af --- /dev/null +++ b/media/docs/crates.svg @@ -0,0 +1,189 @@ + + + + + + +crates + + +cluster_legend + +Legend + + + +ffmpeg_next + + + +ffmpeg-next 8 +(ffmpeg-sys-next) + + + +tokio + + + +tokio 1 +(async runtime) + + + +serde + + + +serde / serde_json + + + +tracing + + + +tracing +tracing-subscriber + + + +anyhow + + + +anyhow + + + +common + +cht-common +───────────── +protocol.rs  (wire framing) +frame.rs     (Frame, AudioBuffer) +logging.rs + + + +common->tokio + + + + + +common->serde + + + + + +common->tracing + + + + + +common->anyhow + + + + + +client + +cht-client  [sender, Wayland] +───────────────────────────── +capture.rs   KMS/DRM → DRM_PRIME frames +encoder.rs   VAAPI H.264 (lazy init) +pipeline.rs  capture→encode thread +main.rs      TCP transport + keepalive + + + +client->ffmpeg_next + + + + + +client->tokio + + + + + +client->tracing + + + + + +client->anyhow + + + + + +client->common + + + + + +server + +cht-server  [receiver, mcrn] +───────────────────────────── +main.rs      TCP listener (stub) +             counts packets, no decode yet + + + +server->tokio + + + + + +server->tracing + + + + + +server->anyhow + + + + + +server->common + + + + + +l1 + +implemented + + + +l2 + +stub / planned + + + +l3 + + + +external crate + + + diff --git a/media/docs/index.html b/media/docs/index.html new file mode 100644 index 0000000..1750dfb --- /dev/null +++ b/media/docs/index.html @@ -0,0 +1,168 @@ + + + + +Media Transport — Architecture + + + + + + +
+
+

Select a diagram

+ +
+
+

← pick a diagram from the sidebar

+
+
+ + + + diff --git a/media/docs/server-pipeline.dot b/media/docs/server-pipeline.dot new file mode 100644 index 0000000..bbce3dd --- /dev/null +++ b/media/docs/server-pipeline.dot @@ -0,0 +1,54 @@ +// Server pipeline — Phase 2 (stub) + planned architecture +// Receiver machine (X11, RTX 3080, NVDEC) +digraph server_pipeline { + graph [fontname="monospace" bgcolor="#1e1e2e" rankdir=TB pad="0.6" splines=polyline] + node [fontname="monospace" fontcolor="#cdd6f4" style=filled shape=box + fillcolor="#313244" color="#585b70" margin="0.25,0.12"] + edge [color="#585b70" fontname="monospace" fontcolor="#a6adc8"] + + net [label="TCP :4444" shape=parallelogram fillcolor="#1e2a3e" color="#89b4fa"] + python [label="Python app\n(stream/manager.py)" shape=parallelogram fillcolor="#2a2a3e" color="#cba6f7"] + + subgraph cluster_implemented { + label="Implemented (Phase 2)" fontcolor="#a6e3a1" color="#a6e3a1" fontname="monospace" + + listener [label="Listener\n─────────────\nTCP accept loop\nspawns task per client\nreads WirePacket headers\ncounts video/audio pkts\nlogs keyframes + ts" + fillcolor="#1e2d3e" color="#89b4fa"] + } + + subgraph cluster_planned { + label="Planned" fontcolor="#f38ba8" color="#f38ba8" fontname="monospace" style=dashed + + decoder [label="Decoder (Phase 3)\n─────────────\nNVDEC H.264 → NV12\nGPU frames" fillcolor="#2d1e1e" color="#f38ba8"] + scene [label="Scene Detector (Phase 3)\n─────────────\nffmpeg select filter\nin-process (no subprocess)\nJPEG → frames/\nframes/index.json" fillcolor="#2d1e1e" color="#f38ba8"] + audio [label="Audio Extractor (Phase 4)\n─────────────\nAAC decode\nWAV chunks → audio/" fillcolor="#2d1e1e" color="#f38ba8"] + writer [label="Segment Writer (Phase 3)\n─────────────\nfMP4 segments → stream/\nkeyframe boundaries" fillcolor="#2d1e1e" color="#f38ba8"] + framebuf [label="Frame Buffer (Phase 6)\n─────────────\nGPU ring buffer ~300 frames\nscrub: GPU→CPU on demand\n→ /dev/shm/cht_scrub_frame" fillcolor="#2d1e1e" color="#f38ba8"] + ipc [label="IPC Server (Phase 5)\n─────────────\nUnix socket JSON-lines\ncommands: start/stop/get_frame\nevents: frame_detected/audio_chunk/…" fillcolor="#2d1e1e" color="#f38ba8"] + } + + // Flow — implemented + net -> listener [label="WirePacket"] + + // Flow — planned + listener -> decoder [style=dashed label="H.264 payload"] + decoder -> scene [style=dashed label="NV12 frame"] + decoder -> writer [style=dashed label="encoded pkt"] + decoder -> framebuf [style=dashed label="GPU frame"] + decoder -> audio [style=dashed label="audio pkt"] + scene -> ipc [style=dashed label="frame_detected"] + audio -> ipc [style=dashed label="audio_chunk"] + writer -> ipc [style=dashed label="segment_completed"] + ipc -> python [style=dashed label="JSON-lines\n(Unix socket)"] + + // Outputs + frames_dir [label="frames/\nindex.json + *.jpg" shape=folder fillcolor="#2a2a3e" color="#585b70"] + audio_dir [label="audio/\n*.wav chunks" shape=folder fillcolor="#2a2a3e" color="#585b70"] + stream_dir [label="stream/\n*.mp4 segments" shape=folder fillcolor="#2a2a3e" color="#585b70"] + shm [label="/dev/shm/cht_scrub_frame\nraw RGBA pixels" shape=folder fillcolor="#2a2a3e" color="#585b70"] + + scene -> frames_dir [style=dashed] + audio -> audio_dir [style=dashed] + writer -> stream_dir [style=dashed] + framebuf -> shm [style=dashed label="get_frame cmd"] +} diff --git a/media/docs/server-pipeline.svg b/media/docs/server-pipeline.svg new file mode 100644 index 0000000..8e4c8a0 --- /dev/null +++ b/media/docs/server-pipeline.svg @@ -0,0 +1,230 @@ + + + + + + +server_pipeline + + +cluster_implemented + +Implemented (Phase 2) + + +cluster_planned + +Planned + + + +net + +TCP :4444 + + + +listener + +Listener +───────────── +TCP accept loop +spawns task per client +reads WirePacket headers +counts video/audio pkts +logs keyframes + ts + + + +net->listener + + +WirePacket + + + +python + +Python app +(stream/manager.py) + + + +decoder + +Decoder  (Phase 3) +───────────── +NVDEC H.264 → NV12 +GPU frames + + + +listener->decoder + + +H.264 payload + + + +scene + +Scene Detector  (Phase 3) +───────────── +ffmpeg select filter +in-process (no subprocess) +JPEG → frames/ +frames/index.json + + + +decoder->scene + + +NV12 frame + + + +audio + +Audio Extractor  (Phase 4) +───────────── +AAC decode +WAV chunks → audio/ + + + +decoder->audio + + +audio pkt + + + +writer + +Segment Writer  (Phase 3) +───────────── +fMP4 segments → stream/ +keyframe boundaries + + + +decoder->writer + + +encoded pkt + + + +framebuf + +Frame Buffer  (Phase 6) +───────────── +GPU ring buffer ~300 frames +scrub: GPU→CPU on demand +→ /dev/shm/cht_scrub_frame + + + +decoder->framebuf + + +GPU frame + + + +ipc + +IPC Server  (Phase 5) +───────────── +Unix socket JSON-lines +commands: start/stop/get_frame +events: frame_detected/audio_chunk/… + + + +scene->ipc + + +frame_detected + + + +frames_dir + +frames/ +index.json + *.jpg + + + +scene->frames_dir + + + + + +audio->ipc + + +audio_chunk + + + +audio_dir + +audio/ +*.wav chunks + + + +audio->audio_dir + + + + + +writer->ipc + + +segment_completed + + + +stream_dir + +stream/ +*.mp4 segments + + + +writer->stream_dir + + + + + +shm + +/dev/shm/cht_scrub_frame +raw RGBA pixels + + + +framebuf->shm + + +get_frame cmd + + + +ipc->python + + +JSON-lines +(Unix socket) + + +