almost back to working state with rust transport

This commit is contained in:
2026-04-09 22:15:16 -03:00
parent ff96dcb4f7
commit 512d8ecef8
13 changed files with 1504 additions and 488 deletions

1
media/Cargo.lock generated
View File

@@ -85,6 +85,7 @@ dependencies = [
"anyhow",
"cht-common",
"ffmpeg-next",
"libc",
"nix",
"tokio",
"tracing",

View File

@@ -11,3 +11,4 @@ tracing-subscriber = { workspace = true }
anyhow = { workspace = true }
ffmpeg = { package = "ffmpeg-next", version = "8" }
nix = { version = "0.29", features = ["signal", "process"] }
libc = "0.2"

View File

@@ -1,16 +1,13 @@
//! Subprocess backend: spawn ffmpeg CLI for capture+encode.
//!
//! Spawns ffmpeg with the same hardware pipeline as `stream_av.py`:
//! Spawns ffmpeg with the same hardware pipeline as `stream_av.sh`:
//! kmsgrab → hwmap=derive_device=vaapi → scale_vaapi → h264_vaapi
//! + PulseAudio desktop audio + mic → amix → AAC
//!
//! ffmpeg outputs NUT format to stdout. We demux that pipe with ffmpeg-next
//! to get proper AVPackets (keyframe flags, timestamps) without parsing
//! bytestreams. NUT is lighter than mpegts — no TS overhead, exact packet
//! metadata in the container layer.
//!
//! This approach works where the direct VAAPI API path fails: hwmap uses
//! fftools' internal AVFilterGraph.hw_device_ctx (removed from public API
//! in ffmpeg 7+), so X2RGB10LE format negotiation succeeds.
use std::os::fd::AsRawFd;
use std::os::unix::io::RawFd;
@@ -21,7 +18,7 @@ use std::sync::Arc;
use anyhow::{Context, Result};
use tracing::{error, info, warn};
use crate::encoder::EncodedPacket;
use crate::encoder::{EncodedPacket, MediaType};
pub struct SubprocessConfig {
pub device: String,
@@ -63,8 +60,6 @@ pub fn run(
.expect("spawn stderr thread");
// Get the raw fd from stdout before handing it to ffmpeg-next.
// ffmpeg-next takes ownership of the input context but we keep the Child
// alive so the fd stays valid.
let stdout = child.stdout.take().expect("stdout piped");
let fd: RawFd = stdout.as_raw_fd();
@@ -79,32 +74,141 @@ pub fn run(
result
}
/// Detect PulseAudio audio sources for capture.
struct AudioSources {
monitor: Option<String>, // desktop audio (speaker tap)
mic: Option<String>, // microphone
pulse_server: String, // PULSE_SERVER env for root
}
fn detect_audio_sources() -> AudioSources {
// When running as root (sudo for kmsgrab), we need the real user's PulseAudio
let real_uid = std::env::var("SUDO_UID")
.unwrap_or_else(|_| unsafe { libc::getuid() }.to_string());
let pulse_server = format!("unix:/run/user/{real_uid}/pulse/native");
let monitor = detect_monitor_source(&pulse_server);
let mic = detect_default_source(&pulse_server);
// Don't use mic if it's the same as monitor (some systems set monitor as default)
let mic = match (&monitor, &mic) {
(Some(m), Some(d)) if m == d => None,
_ => mic,
};
info!("Audio sources — monitor: {:?}, mic: {:?}", monitor, mic);
AudioSources { monitor, mic, pulse_server }
}
fn detect_monitor_source(pulse_server: &str) -> Option<String> {
let output = Command::new("pactl")
.arg("info")
.env("PULSE_SERVER", pulse_server)
.output()
.ok()?;
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if line.contains("Default Sink:") {
let sink = line.split(':').nth(1)?.trim();
return Some(format!("{sink}.monitor"));
}
}
None
}
fn detect_default_source(pulse_server: &str) -> Option<String> {
let output = Command::new("pactl")
.args(["get-default-source"])
.env("PULSE_SERVER", pulse_server)
.output()
.ok()?;
let source = String::from_utf8_lossy(&output.stdout).trim().to_string();
if source.is_empty() { None } else { Some(source) }
}
fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result<Child> {
let audio = detect_audio_sources();
let filter = format!(
"hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12,fps={}",
cfg.width, cfg.height, cfg.fps,
);
let mut args: Vec<String> = vec![
// Hardware init
"-init_hw_device".into(), format!("drm=drm:{}", cfg.device),
"-init_hw_device".into(), "vaapi=va@drm".into(),
// Video input (kmsgrab)
"-thread_queue_size".into(), "64".into(),
"-device".into(), cfg.device.clone(),
"-f".into(), "kmsgrab".into(),
"-framerate".into(), cfg.fps.to_string(),
"-i".into(), "-".into(),
];
// Audio inputs
let has_monitor = audio.monitor.is_some();
let has_mic = audio.mic.is_some();
if let Some(ref monitor) = audio.monitor {
args.extend([
"-f".into(), "pulse".into(),
"-thread_queue_size".into(), "1024".into(),
"-i".into(), monitor.clone(),
]);
}
if let Some(ref mic) = audio.mic {
args.extend([
"-f".into(), "pulse".into(),
"-thread_queue_size".into(), "1024".into(),
"-i".into(), mic.clone(),
]);
}
// Audio filter: mix monitor + mic if both present
if has_monitor && has_mic {
args.extend([
"-filter_complex".into(),
"[1:a][2:a]amix=inputs=2:duration=longest[aout]".into(),
"-map".into(), "0:v".into(),
"-map".into(), "[aout]".into(),
]);
} else if has_monitor {
args.extend(["-map".into(), "0:v".into(), "-map".into(), "1:a".into()]);
}
// If no audio: no -map needed, only video output
// Video encoding
args.extend([
"-vf".into(), filter,
"-c:v".into(), "h264_vaapi".into(),
"-qp".into(), cfg.qp.to_string(),
"-g".into(), cfg.gop_size.to_string(),
"-bf".into(), "0".into(),
]);
// Audio encoding (if any audio source)
if has_monitor || has_mic {
args.extend([
"-c:a".into(), "aac".into(),
"-b:a".into(), "128k".into(),
]);
}
// Output
args.extend([
"-flush_packets".into(), "1".into(),
"-fflags".into(), "nobuffer".into(),
"-f".into(), "nut".into(),
"pipe:1".into(),
"-hide_banner".into(),
]);
info!("ffmpeg args: {:?}", args);
let child = Command::new("ffmpeg")
.args([
"-init_hw_device", &format!("drm=drm:{}", cfg.device),
"-init_hw_device", "vaapi=va@drm",
"-thread_queue_size", "64",
"-device", &cfg.device,
"-f", "kmsgrab",
"-framerate", &cfg.fps.to_string(),
"-i", "-",
"-vf", &filter,
"-c:v", "h264_vaapi",
"-qp", &cfg.qp.to_string(),
"-g", &cfg.gop_size.to_string(),
"-bf", "0",
"-flush_packets", "1",
"-fflags", "nobuffer",
"-f", "nut",
"pipe:1",
"-hide_banner",
])
.args(&args)
.env("PULSE_SERVER", &audio.pulse_server)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
@@ -127,22 +231,34 @@ fn demux_and_send(
let mut input_ctx = ffmpeg::format::input(&pipe_url)
.context("open ffmpeg input from pipe")?;
// Find video stream
let video_stream = input_ctx
.streams()
.best(ffmpeg::media::Type::Video)
.context("no video stream in NUT output")?;
let video_idx = video_stream.index();
let video_tb = video_stream.time_base();
let video_tb_num = video_tb.numerator() as u32;
let video_tb_den = video_tb.denominator() as u32;
let stream_idx = video_stream.index();
let time_base = video_stream.time_base();
let tb_num = time_base.numerator() as u32;
let tb_den = time_base.denominator() as u32;
// Find audio stream (may not exist if no PulseAudio sources found)
let audio_info = input_ctx
.streams()
.best(ffmpeg::media::Type::Audio)
.map(|s| {
let tb = s.time_base();
(s.index(), tb.numerator() as u32, tb.denominator() as u32)
});
info!(
"Subprocess demux ready: stream_idx={}, time_base={}/{}",
stream_idx, tb_num, tb_den
);
if let Some((idx, num, den)) = audio_info {
info!("Demux: video_idx={video_idx} tb={video_tb_num}/{video_tb_den}, \
audio_idx={idx} tb={num}/{den}");
} else {
info!("Demux: video_idx={video_idx} tb={video_tb_num}/{video_tb_den}, no audio");
}
let mut packet_count = 0u64;
let mut video_count = 0u64;
let mut audio_count = 0u64;
for (stream, packet) in input_ctx.packets() {
if stop.load(Ordering::Relaxed) {
@@ -155,36 +271,52 @@ fn demux_and_send(
break;
}
if stream.index() != stream_idx {
continue;
}
let data = match packet.data() {
Some(d) => d.to_vec(),
None => continue,
};
let encoded = EncodedPacket {
data,
pts: packet.pts().unwrap_or(0),
dts: packet.dts().unwrap_or(0),
keyframe: packet.is_key(),
time_base_num: tb_num,
time_base_den: tb_den,
};
let stream_idx = stream.index();
packet_count += 1;
if packet_count % 300 == 1 {
info!("Subprocess: {packet_count} packets encoded");
}
if packet_tx.blocking_send(encoded).is_err() {
info!("Packet channel closed, stopping subprocess pipeline");
break;
if stream_idx == video_idx {
let encoded = EncodedPacket {
media_type: MediaType::Video,
data,
pts: packet.pts().unwrap_or(0),
dts: packet.dts().unwrap_or(0),
keyframe: packet.is_key(),
time_base_num: video_tb_num,
time_base_den: video_tb_den,
};
video_count += 1;
if video_count % 300 == 1 {
info!("Subprocess: {video_count} video, {audio_count} audio packets");
}
if packet_tx.blocking_send(encoded).is_err() {
info!("Packet channel closed");
break;
}
} else if let Some((audio_idx, audio_tb_num, audio_tb_den)) = audio_info {
if stream_idx == audio_idx {
let encoded = EncodedPacket {
media_type: MediaType::Audio,
data,
pts: packet.pts().unwrap_or(0),
dts: packet.dts().unwrap_or(0),
keyframe: packet.is_key(),
time_base_num: audio_tb_num,
time_base_den: audio_tb_den,
};
audio_count += 1;
if packet_tx.blocking_send(encoded).is_err() {
info!("Packet channel closed");
break;
}
}
}
}
info!("Subprocess pipeline stopped ({packet_count} packets)");
info!("Subprocess pipeline stopped ({video_count} video, {audio_count} audio packets)");
Ok(())
}
@@ -224,8 +356,15 @@ fn kill_child(child: &mut Child) {
child.kill().ok();
}
match child.wait() {
Ok(s) => info!("ffmpeg exited: {s}"),
Err(e) => warn!("ffmpeg wait error: {e}"),
// Wait up to 3 seconds, then SIGKILL.
for _ in 0..30 {
if child.try_wait().ok().flatten().is_some() {
info!("ffmpeg exited cleanly");
return;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
warn!("ffmpeg didn't exit after SIGINT, killing");
child.kill().ok();
let _ = child.wait();
}

View File

@@ -310,6 +310,7 @@ impl EncoderInner {
let mut encoded = ffmpeg::Packet::empty();
while self.encoder.receive_packet(&mut encoded).is_ok() {
packets.push(EncodedPacket {
media_type: MediaType::Video,
data: encoded.data().unwrap_or(&[]).to_vec(),
pts: encoded.pts().unwrap_or(0),
dts: encoded.dts().unwrap_or(0),
@@ -327,8 +328,16 @@ impl EncoderInner {
}
}
/// An encoded video packet ready for transport.
/// Type of media stream in an encoded packet.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MediaType {
Video,
Audio,
}
/// An encoded media packet ready for transport.
pub struct EncodedPacket {
pub media_type: MediaType,
pub data: Vec<u8>,
pub pts: i64,
pub dts: i64,

View File

@@ -1,3 +1,5 @@
use std::time::Duration;
use anyhow::Result;
use cht_common::protocol::{
self, AudioParams, ControlMessage, PacketHeader, PacketType, VideoParams, WirePacket,
@@ -5,14 +7,14 @@ use cht_common::protocol::{
};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
use tracing::info;
use tracing::{info, warn};
use cht_client::backends::Backend;
use cht_client::capture::CaptureConfig;
use cht_client::encoder::EncoderConfig;
use cht_client::encoder::{EncoderConfig, MediaType};
use cht_client::pipeline::Pipeline;
const DEFAULT_SERVER: &str = "mcrndeb:4444";
const DEFAULT_SERVER: &str = "mcrndeb:4447";
#[tokio::main]
async fn main() -> Result<()> {
@@ -35,9 +37,8 @@ async fn main() -> Result<()> {
Backend::Subprocess
};
info!("Connecting to {server_addr}...");
let stream = TcpStream::connect(&server_addr).await?;
info!("Connected");
// Wait for the server to become available.
let stream = wait_for_server(&server_addr).await?;
let mut writer = BufWriter::new(stream);
@@ -69,6 +70,7 @@ async fn main() -> Result<()> {
// Forward encoded packets to the server
let mut video_count = 0u64;
let mut audio_count = 0u64;
let mut keepalive_interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
@@ -76,9 +78,13 @@ async fn main() -> Result<()> {
pkt = packet_rx.recv() => {
match pkt {
Some(encoded) => {
let pkt_type = match encoded.media_type {
MediaType::Video => PacketType::Video,
MediaType::Audio => PacketType::Audio,
};
let wire = WirePacket {
header: PacketHeader {
packet_type: PacketType::Video,
packet_type: pkt_type,
flags: if encoded.keyframe { FLAG_KEYFRAME } else { 0 },
length: encoded.data.len() as u32,
timestamp_ns: pts_to_ns(
@@ -90,11 +96,18 @@ async fn main() -> Result<()> {
payload: encoded.data,
};
protocol::write_packet(&mut writer, &wire).await?;
video_count += 1;
if video_count % 300 == 1 {
info!("Sent {video_count} video packets");
writer.flush().await?;
match encoded.media_type {
MediaType::Video => {
video_count += 1;
if video_count % 300 == 1 {
info!("Sent {video_count} video, {audio_count} audio packets");
writer.flush().await?;
}
}
MediaType::Audio => {
audio_count += 1;
}
}
}
None => {
@@ -115,17 +128,56 @@ async fn main() -> Result<()> {
}
}
pipeline.stop();
// Stop pipeline first (signals ffmpeg, joins thread).
// Give it a few seconds — if ffmpeg hangs, don't block forever.
info!("Stopping pipeline...");
let stop_handle = tokio::task::spawn_blocking(move || {
pipeline.stop();
});
let _ = tokio::time::timeout(Duration::from_secs(5), stop_handle).await;
let stop = ControlMessage::SessionStop;
protocol::write_packet(&mut writer, &stop.to_wire_packet()?).await?;
writer.flush().await?;
writer.shutdown().await?;
info!("Sent session_stop, {video_count} video packets total");
// Try to send SessionStop so the server closes cleanly.
let stop_msg = ControlMessage::SessionStop;
match tokio::time::timeout(
Duration::from_secs(2),
async {
protocol::write_packet(&mut writer, &stop_msg.to_wire_packet()?).await?;
writer.flush().await?;
writer.shutdown().await?;
Ok::<_, anyhow::Error>(())
}
).await {
Ok(Ok(())) => {}
Ok(Err(e)) => warn!("Error sending session_stop: {e}"),
Err(_) => warn!("Timeout sending session_stop"),
}
info!("Done — {video_count} video + {audio_count} audio packets");
Ok(())
}
async fn wait_for_server(addr: &str) -> Result<TcpStream> {
info!("Waiting for server at {addr}...");
let mut interval = tokio::time::interval(Duration::from_secs(2));
loop {
tokio::select! {
_ = interval.tick() => {}
_ = tokio::signal::ctrl_c() => {
anyhow::bail!("interrupted while waiting for server");
}
}
match TcpStream::connect(addr).await {
Ok(stream) => {
info!("Connected to {addr}");
return Ok(stream);
}
Err(e) => {
info!("Server not ready ({e}), retrying...");
}
}
}
}
fn pts_to_ns(pts: i64, tb_num: u32, tb_den: u32) -> u64 {
if tb_den == 0 {
return 0;
@@ -134,10 +186,23 @@ fn pts_to_ns(pts: i64, tb_num: u32, tb_den: u32) -> u64 {
}
fn session_id() -> String {
// Match Python's time.strftime("%Y%m%d_%H%M%S") format
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
format!("{secs}")
.as_secs() as libc::time_t;
let mut tm: libc::tm = unsafe { std::mem::zeroed() };
unsafe { libc::localtime_r(&secs, &mut tm) };
let mut buf = [0u8; 20];
let fmt = b"%Y%m%d_%H%M%S\0";
let len = unsafe {
libc::strftime(
buf.as_mut_ptr() as *mut libc::c_char,
buf.len(),
fmt.as_ptr() as *const libc::c_char,
&tm,
)
};
String::from_utf8_lossy(&buf[..len]).to_string()
}

View File

@@ -1,24 +1,40 @@
mod session;
use std::path::PathBuf;
use anyhow::Result;
use cht_common::protocol::{self, ControlMessage, PacketType};
use session::Session;
use tokio::io::BufReader;
use tokio::net::TcpListener;
use tracing::{error, info};
use tracing::{error, info, warn};
const LISTEN_ADDR: &str = "0.0.0.0:4444";
const LISTEN_ADDR: &str = "0.0.0.0:4447";
const DEFAULT_SESSIONS_DIR: &str = "/home/mariano/wdir/cht/data/sessions";
fn sessions_dir() -> PathBuf {
std::env::var("CHT_SESSIONS_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from(DEFAULT_SESSIONS_DIR))
}
#[tokio::main]
async fn main() -> Result<()> {
cht_common::logging::init("server");
let sessions_dir = sessions_dir();
info!("Sessions dir: {}", sessions_dir.display());
let listener = TcpListener::bind(LISTEN_ADDR).await?;
info!("Server listening on {LISTEN_ADDR}");
loop {
let (stream, addr) = listener.accept().await?;
info!("Client connected from {addr}");
let sdir = sessions_dir.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(stream).await {
if let Err(e) = handle_client(stream, sdir).await {
error!("Client {addr} error: {e:#}");
}
info!("Client {addr} disconnected");
@@ -26,17 +42,19 @@ async fn main() -> Result<()> {
}
}
async fn handle_client(stream: tokio::net::TcpStream) -> Result<()> {
async fn handle_client(
stream: tokio::net::TcpStream,
sessions_dir: PathBuf,
) -> Result<()> {
let mut reader = BufReader::new(stream);
let mut video_packets = 0u64;
let mut audio_packets = 0u64;
let mut session: Option<Session> = None;
let mut video_count = 0u64;
let mut audio_count = 0u64;
loop {
let packet = match protocol::read_packet(&mut reader).await {
Ok(p) => p,
Err(e) => {
// Any read error at the header boundary is a clean disconnect
// (includes EOF from flush + shutdown)
let msg = format!("{e:#}");
if msg.contains("eof") || msg.contains("Eof")
|| msg.contains("connection reset")
@@ -50,25 +68,60 @@ async fn handle_client(stream: tokio::net::TcpStream) -> Result<()> {
match packet.header.packet_type {
PacketType::Video => {
video_packets += 1;
if video_packets % 300 == 1 {
info!(
"video: {video_packets} packets, ts={}ms, keyframe={}",
packet.header.timestamp_ns / 1_000_000,
packet.header.is_keyframe(),
);
if let Some(s) = &mut session {
// Blocking write — offload to blocking thread to avoid stalling tokio.
let data = packet.payload;
let keyframe = packet.header.is_keyframe();
tokio::task::block_in_place(|| s.write_video(&data, keyframe))?;
video_count += 1;
if video_count % 300 == 1 {
info!("video: {video_count} packets, ts={}ms, keyframe={}",
packet.header.timestamp_ns / 1_000_000,
packet.header.is_keyframe());
}
} else {
warn!("Video packet before SessionStart — dropped");
}
}
PacketType::Audio => {
audio_packets += 1;
if let Some(s) = &mut session {
let data = packet.payload;
tokio::task::block_in_place(|| s.write_audio(&data))?;
audio_count += 1;
if audio_count % 500 == 1 {
info!("audio: {audio_count} packets");
}
}
}
PacketType::Control => {
let ctrl = ControlMessage::from_payload(&packet.payload)?;
info!("control: {ctrl:?}");
match ctrl {
ControlMessage::SessionStart { id, video, .. } => {
let s = tokio::task::block_in_place(|| {
Session::start(&id, &sessions_dir, video.fps)
})?;
session = Some(s);
}
ControlMessage::SessionStop => {
if let Some(s) = session.take() {
tokio::task::block_in_place(|| s.close());
}
break;
}
ControlMessage::Keepalive
| ControlMessage::Reconnect { .. }
| ControlMessage::ParamChange { .. } => {}
}
}
}
}
info!("Session totals: {video_packets} video, {audio_packets} audio packets");
if let Some(s) = session.take() {
tokio::task::block_in_place(|| s.close());
}
info!("Session totals: {video_count} video, {audio_count} audio packets");
Ok(())
}

306
media/server/src/session.rs Normal file
View File

@@ -0,0 +1,306 @@
//! Session: manages the ffmpeg recording subprocess for one client connection.
//!
//! Receives raw H.264 NAL units and AAC audio from the transport:
//! - Video: piped into ffmpeg → fragmented MP4 + UDP relay for live display
//! - Audio: written to raw AAC file for Python post-processing
//!
//! Also provides a Unix domain socket at `stream/scene.sock` carrying a copy
//! of the raw H.264 stream for Python's GPU scene detection. The socket is
//! fire-and-forget: if nobody connects, data is silently dropped; if the
//! reader is slow, old frames are dropped rather than stalling recording.
//!
//! Creates the session directory and writes its path to `data/active-session`
//! so the Python app can pick it up for SessionProcessor (audio extraction, etc).
use std::fs::{self, File};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, Command, Stdio};
use std::thread;
use anyhow::{Context, Result};
use tokio::io::AsyncWriteExt;
use tracing::{debug, info, warn};
// Written next to the sessions/ directory so everything stays under data/.
// Python reads this to discover the session dir created by cht-server.
const ACTIVE_SESSION_FILENAME: &str = "active-session";
const RELAY_URL: &str = "udp://127.0.0.1:4445";
const SCENE_SOCKET_NAME: &str = "scene.sock";
struct ScenePacket {
data: Vec<u8>,
keyframe: bool,
}
pub struct Session {
#[allow(dead_code)]
session_dir: PathBuf,
active_session_file: PathBuf,
ffmpeg: Child,
video_stdin: Option<ChildStdin>,
audio_file: Option<File>,
scene_tx: Option<tokio::sync::mpsc::Sender<ScenePacket>>,
#[allow(dead_code)]
fps: u32,
}
impl Session {
pub fn start(session_id: &str, sessions_dir: &Path, fps: u32) -> Result<Self> {
let active_session_file = sessions_dir
.parent()
.unwrap_or(sessions_dir)
.join(ACTIVE_SESSION_FILENAME);
let session_dir = sessions_dir.join(session_id);
let stream_dir = session_dir.join("stream");
fs::create_dir_all(&stream_dir)
.with_context(|| format!("create session dir: {}", stream_dir.display()))?;
let recording_path = stream_dir.join("recording_000.mp4");
let audio_path = stream_dir.join("audio.aac");
info!("Session {session_id}: recording → {}", recording_path.display());
let mut child = Command::new("ffmpeg")
.args([
"-f", "h264",
"-framerate", &fps.to_string(),
"-i", "pipe:0",
// fMP4 — same flags as Python StreamRecorder
"-c:v", "copy",
"-f", "mp4",
"-movflags", "frag_keyframe+empty_moov+default_base_moof",
"-flush_packets", "1",
recording_path.to_str().unwrap(),
// UDP relay for live display
"-c:v", "copy",
"-f", "mpegts",
RELAY_URL,
"-hide_banner", "-loglevel", "warning",
])
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.context("spawn ffmpeg recorder")?;
let video_stdin = child.stdin.take().expect("stdin piped");
// Drain stderr so ffmpeg never blocks on a full pipe.
let stderr = child.stderr.take().expect("stderr piped");
let sid = session_id.to_string();
thread::Builder::new()
.name("ffmpeg-recorder-stderr".into())
.spawn(move || {
use std::io::{BufRead, BufReader};
for line in BufReader::new(stderr).lines().map_while(Result::ok) {
if !line.is_empty() {
debug!("[recorder/{sid}] {line}");
}
}
})
.expect("spawn stderr thread");
// Open audio file for raw AAC frames from client
let audio_file = File::create(&audio_path)
.map(Some)
.unwrap_or_else(|e| {
warn!("Could not create audio file: {e}");
None
});
// Scene relay: Unix socket for Python scene detection.
let socket_path = stream_dir.join(SCENE_SOCKET_NAME);
let (scene_tx, scene_rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(scene_relay_task(socket_path, scene_rx));
// Tell Python which session dir to watch.
if let Err(e) = fs::write(&active_session_file, session_dir.to_str().unwrap_or("")) {
warn!("Could not write {}: {e}", active_session_file.display());
}
info!("Session {session_id}: ffmpeg pid={}, audio → {}",
child.id(), audio_path.display());
Ok(Self {
session_dir,
active_session_file,
ffmpeg: child,
video_stdin: Some(video_stdin),
audio_file,
scene_tx: Some(scene_tx),
fps,
})
}
pub fn write_video(&mut self, data: &[u8], keyframe: bool) -> Result<()> {
if let Some(stdin) = &mut self.video_stdin {
stdin.write_all(data).context("write H.264 to ffmpeg")?;
}
// Best-effort relay to scene detector — drop if channel full.
if let Some(tx) = &self.scene_tx {
let _ = tx.try_send(ScenePacket { data: data.to_vec(), keyframe });
}
Ok(())
}
pub fn write_audio(&mut self, data: &[u8]) -> Result<()> {
if let Some(f) = &mut self.audio_file {
// Wrap raw AAC frame with ADTS header so the file is playable/parseable.
// Assumes AAC-LC, 48kHz, stereo (matches client's encoder config).
write_adts_frame(f, data)?;
}
Ok(())
}
#[allow(dead_code)]
pub fn session_dir(&self) -> &Path {
&self.session_dir
}
pub fn close(mut self) {
// Drop stdin → ffmpeg gets EOF → flushes and exits cleanly.
drop(self.video_stdin.take());
drop(self.audio_file.take());
// Drop scene_tx → relay task sees channel closed → exits.
drop(self.scene_tx.take());
match self.ffmpeg.wait() {
Ok(s) => info!("ffmpeg recorder exited: {s}"),
Err(e) => warn!("ffmpeg recorder wait error: {e}"),
}
// Clear the active session marker.
let _ = fs::remove_file(&self.active_session_file);
}
}
impl Drop for Session {
fn drop(&mut self) {
if self.video_stdin.is_some() {
drop(self.video_stdin.take());
drop(self.audio_file.take());
drop(self.scene_tx.take());
let _ = self.ffmpeg.kill();
}
}
}
// ---------------------------------------------------------------------------
// Scene relay: serves raw H.264 over a Unix domain socket
// ---------------------------------------------------------------------------
async fn scene_relay_task(
socket_path: PathBuf,
mut rx: tokio::sync::mpsc::Receiver<ScenePacket>,
) {
// Remove stale socket from a previous session.
let _ = fs::remove_file(&socket_path);
let listener = match tokio::net::UnixListener::bind(&socket_path) {
Ok(l) => l,
Err(e) => {
warn!("Scene relay: bind failed on {}: {e}", socket_path.display());
return;
}
};
info!("Scene relay: listening on {}", socket_path.display());
let mut client: Option<tokio::net::UnixStream> = None;
// Buffer the latest keyframe so new clients start with a valid decoder state.
let mut last_keyframe: Option<Vec<u8>> = None;
loop {
if client.is_some() {
// We have a connected reader — forward data.
match rx.recv().await {
Some(pkt) => {
if pkt.keyframe {
last_keyframe = Some(pkt.data.clone());
}
let stream = client.as_mut().unwrap();
if stream.write_all(&pkt.data).await.is_err() {
info!("Scene relay: client disconnected");
client = None;
}
}
None => break, // Channel closed, session ending.
}
} else {
// No reader — accept connections while draining the channel.
tokio::select! {
biased;
result = listener.accept() => {
match result {
Ok((mut stream, _)) => {
info!("Scene relay: client connected");
// Send the last keyframe so the decoder can initialize.
if let Some(ref kf) = last_keyframe {
if stream.write_all(kf).await.is_err() {
warn!("Scene relay: failed to send keyframe");
continue;
}
info!("Scene relay: sent keyframe ({} bytes)", kf.len());
}
client = Some(stream);
}
Err(e) => warn!("Scene relay: accept error: {e}"),
}
}
pkt = rx.recv() => {
match pkt {
Some(pkt) => {
if pkt.keyframe {
last_keyframe = Some(pkt.data);
}
// Discard — no reader connected.
}
None => break, // Channel closed.
}
}
}
}
}
drop(client);
let _ = fs::remove_file(&socket_path);
info!("Scene relay: stopped");
}
// ---------------------------------------------------------------------------
// ADTS header for raw AAC framing
// ---------------------------------------------------------------------------
/// Write a raw AAC frame wrapped in a 7-byte ADTS header.
///
/// Fixed params: AAC-LC profile, 48 kHz sample rate, 2 channels (stereo).
/// These match the client's `-c:a aac -b:a 128k` default config.
fn write_adts_frame(w: &mut impl Write, aac_data: &[u8]) -> Result<()> {
// ADTS fixed header fields:
// profile: AAC-LC = 1 (stored as profile-1 = 0 in MPEG-4 ID mode)
// sample_rate: 48000 → index 3
// channels: 2 → channel_configuration 2
const PROFILE_MINUS1: u8 = 1; // AAC-LC
const SR_IDX: u8 = 3; // 48 kHz
const CH_CFG: u8 = 2; // stereo
let frame_len = (aac_data.len() + 7) as u16; // total ADTS frame = header + payload
let header: [u8; 7] = [
// byte 0-1: syncword(12) | ID(1)=0(MPEG4) | layer(2)=0 | protection(1)=1(no CRC)
0xFF,
0xF1,
// byte 2: profile(2) | sr_idx(4) | private(1)=0 | ch_cfg[2](1)
(PROFILE_MINUS1 << 6) | (SR_IDX << 2) | ((CH_CFG >> 2) & 1),
// byte 3: ch_cfg[1:0](2) | orig(1)=0 | home(1)=0 | copyright_id(1)=0 | copyright_start(1)=0 | frame_len[12:11](2)
((CH_CFG & 3) << 6) | ((frame_len >> 11) as u8 & 0x03),
// byte 4: frame_len[10:3](8)
((frame_len >> 3) & 0xFF) as u8,
// byte 5: frame_len[2:0](3) | buffer_fullness[10:6](5)
((frame_len & 0x07) << 5) as u8 | 0x1F,
// byte 6: buffer_fullness[5:0](6) | num_aac_frames_minus1(2)=0
0xFC,
];
w.write_all(&header).context("ADTS header")?;
w.write_all(aac_data).context("AAC frame")?;
Ok(())
}