From 5921cd656271b0eca4896af8ce0a68b2a986d165 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Thu, 9 Apr 2026 16:00:07 -0300 Subject: [PATCH] phase 1 --- .gitignore | 2 + ctrl/sync.sh | 2 + media/Cargo.lock | 459 +++++++++++++++++++++++++++++++++++ media/Cargo.toml | 11 + media/client/Cargo.toml | 11 + media/client/src/main.rs | 102 ++++++++ media/common/Cargo.toml | 12 + media/common/src/frame.rs | 39 +++ media/common/src/lib.rs | 3 + media/common/src/logging.rs | 41 ++++ media/common/src/protocol.rs | 337 +++++++++++++++++++++++++ media/server/Cargo.toml | 11 + media/server/src/main.rs | 74 ++++++ 13 files changed, 1104 insertions(+) create mode 100644 media/Cargo.lock create mode 100644 media/Cargo.toml create mode 100644 media/client/Cargo.toml create mode 100644 media/client/src/main.rs create mode 100644 media/common/Cargo.toml create mode 100644 media/common/src/frame.rs create mode 100644 media/common/src/lib.rs create mode 100644 media/common/src/logging.rs create mode 100644 media/common/src/protocol.rs create mode 100644 media/server/Cargo.toml create mode 100644 media/server/src/main.rs diff --git a/.gitignore b/.gitignore index b5f011e..ad1af65 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ bin/ __pycache__/ .venv/ .pytest_cache/ +media/target/ +media/logs/ diff --git a/ctrl/sync.sh b/ctrl/sync.sh index 7a458d7..6e05bd3 100755 --- a/ctrl/sync.sh +++ b/ctrl/sync.sh @@ -9,6 +9,8 @@ PROJECT_DIR="$(cd "$(dirname "$0")/.." && pwd)" rsync -avz --delete \ --exclude='.git/' \ + --exclude='media/target/' \ + --exclude='media/logs/' \ --filter=':- .gitignore' \ "$PROJECT_DIR/" \ "${REMOTE}:${REMOTE_PATH}" diff --git a/media/Cargo.lock b/media/Cargo.lock new file mode 100644 index 0000000..fa6f3e8 --- /dev/null +++ b/media/Cargo.lock @@ -0,0 +1,459 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "bitflags" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" + +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "cht-client" +version = "0.1.0" +dependencies = [ + "anyhow", + "cht-common", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "cht-common" +version = "0.1.0" +dependencies = [ + "anyhow", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "cht-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "cht-common", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.184" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" + +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "mio" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "tokio" +version = "1.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" +dependencies = [ + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/media/Cargo.toml b/media/Cargo.toml new file mode 100644 index 0000000..f486bde --- /dev/null +++ b/media/Cargo.toml @@ -0,0 +1,11 @@ +[workspace] +members = ["common", "server", "client"] +resolver = "2" + +[workspace.dependencies] +tokio = { version = "1", features = ["full"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +anyhow = "1" diff --git a/media/client/Cargo.toml b/media/client/Cargo.toml new file mode 100644 index 0000000..440a580 --- /dev/null +++ b/media/client/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "cht-client" +version = "0.1.0" +edition = "2021" + +[dependencies] +cht-common = { path = "../common" } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +anyhow = { workspace = true } diff --git a/media/client/src/main.rs b/media/client/src/main.rs new file mode 100644 index 0000000..78201a7 --- /dev/null +++ b/media/client/src/main.rs @@ -0,0 +1,102 @@ +use anyhow::Result; +use cht_common::protocol::{ + self, AudioParams, ControlMessage, PacketHeader, PacketType, VideoParams, WirePacket, + FLAG_KEYFRAME, +}; +use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::net::TcpStream; +use tracing::info; + +const DEFAULT_SERVER: &str = "mcrndeb:4444"; + +#[tokio::main] +async fn main() -> Result<()> { + cht_common::logging::init("client"); + + let server_addr = std::env::args() + .nth(1) + .unwrap_or_else(|| DEFAULT_SERVER.to_string()); + + info!("Connecting to {server_addr}..."); + let stream = TcpStream::connect(&server_addr).await?; + info!("Connected"); + + let mut writer = BufWriter::new(stream); + + // Send session_start + let session_start = ControlMessage::SessionStart { + id: chrono_session_id(), + video: VideoParams { + width: 1920, + height: 1080, + codec: "h264".into(), + fps: 30, + }, + audio: AudioParams { + sample_rate: 48000, + channels: 2, + codec: "aac".into(), + }, + }; + protocol::write_packet(&mut writer, &session_start.to_wire_packet()?).await?; + info!("Sent session_start"); + + // Send test packets (placeholder — will be replaced by real capture) + let frame_interval_ns = 33_333_333u64; // ~30fps + for i in 0u64..300 { + let ts = i * frame_interval_ns; + let keyframe = i % 30 == 0; + + // Fake video packet + let video = WirePacket { + header: PacketHeader { + packet_type: PacketType::Video, + flags: if keyframe { FLAG_KEYFRAME } else { 0 }, + length: 1024, + timestamp_ns: ts, + }, + payload: vec![0u8; 1024], + }; + protocol::write_packet(&mut writer, &video).await?; + + // Fake audio packet every 3 video frames + if i % 3 == 0 { + let audio = WirePacket { + header: PacketHeader { + packet_type: PacketType::Audio, + flags: 0, + length: 512, + timestamp_ns: ts, + }, + payload: vec![0u8; 512], + }; + protocol::write_packet(&mut writer, &audio).await?; + } + + // Keepalive every 150 frames (~5s) + if i % 150 == 0 && i > 0 { + let keepalive = ControlMessage::Keepalive; + protocol::write_packet(&mut writer, &keepalive.to_wire_packet()?).await?; + } + + tokio::time::sleep(std::time::Duration::from_nanos(frame_interval_ns)).await; + } + + // Send session_stop and flush + let stop = ControlMessage::SessionStop; + protocol::write_packet(&mut writer, &stop.to_wire_packet()?).await?; + writer.flush().await?; + writer.shutdown().await?; + info!("Sent session_stop, done"); + + Ok(()) +} + +fn chrono_session_id() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + format!("{secs}") +} diff --git a/media/common/Cargo.toml b/media/common/Cargo.toml new file mode 100644 index 0000000..d798a2c --- /dev/null +++ b/media/common/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "cht-common" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +anyhow = { workspace = true } diff --git a/media/common/src/frame.rs b/media/common/src/frame.rs new file mode 100644 index 0000000..04be78c --- /dev/null +++ b/media/common/src/frame.rs @@ -0,0 +1,39 @@ +use serde::{Deserialize, Serialize}; + +/// Pixel format for decoded video frames. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum PixelFormat { + NV12, + I420, + P010, + RGBA, + BGRA, +} + +/// Audio sample format. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum AudioFormat { + S16, + F32, + F32Planar, +} + +/// Which track a packet belongs to. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[repr(u8)] +pub enum Track { + Video = 0, + Audio = 1, +} + +/// Encoded packet ready for network transport or muxing. +#[derive(Debug, Clone)] +pub struct Packet { + pub data: Vec, + pub pts: i64, + pub dts: i64, + pub timebase_num: u32, + pub timebase_den: u32, + pub keyframe: bool, + pub track: Track, +} diff --git a/media/common/src/lib.rs b/media/common/src/lib.rs new file mode 100644 index 0000000..add7952 --- /dev/null +++ b/media/common/src/lib.rs @@ -0,0 +1,3 @@ +pub mod frame; +pub mod logging; +pub mod protocol; diff --git a/media/common/src/logging.rs b/media/common/src/logging.rs new file mode 100644 index 0000000..f1b63f8 --- /dev/null +++ b/media/common/src/logging.rs @@ -0,0 +1,41 @@ +use std::fs; +use std::path::PathBuf; +use tracing_subscriber::prelude::*; + +/// Initialize logging to both stderr and a file under `media/logs/`. +/// Log file: `media/logs/{name}.log` +pub fn init(name: &str) { + let log_dir = log_dir(); + fs::create_dir_all(&log_dir).expect("create logs dir"); + + let log_path = log_dir.join(format!("{name}.log")); + let file = fs::OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .expect("open log file"); + + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info".into()); + + tracing_subscriber::registry() + .with(env_filter) + .with( + tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr), + ) + .with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_writer(std::sync::Mutex::new(file)), + ) + .init(); + + tracing::info!("Logging to {}", log_path.display()); +} + +fn log_dir() -> PathBuf { + // Use CWD/logs (expected to run from media/ workspace root) + let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); + cwd.join("logs") +} diff --git a/media/common/src/protocol.rs b/media/common/src/protocol.rs new file mode 100644 index 0000000..17c9b10 --- /dev/null +++ b/media/common/src/protocol.rs @@ -0,0 +1,337 @@ +//! Framed TCP protocol for CHT media transport. +//! +//! Wire format: +//! ```text +//! ┌──────────────────────────────────────────────┐ +//! │ Header (16 bytes) │ +//! │ type: u8 (0=video, 1=audio, 2=ctrl) │ +//! │ flags: u8 (bit 0=keyframe) │ +//! │ reserved: u16 │ +//! │ length: u32 (payload bytes) │ +//! │ timestamp: u64 (nanoseconds) │ +//! ├──────────────────────────────────────────────┤ +//! │ Payload (length bytes) │ +//! └──────────────────────────────────────────────┘ +//! ``` + +use anyhow::{bail, Context, Result}; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +pub const HEADER_SIZE: usize = 16; +pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024; // 16 MiB + +/// Packet type on the wire. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum PacketType { + Video = 0, + Audio = 1, + Control = 2, +} + +impl PacketType { + fn from_u8(v: u8) -> Result { + match v { + 0 => Ok(Self::Video), + 1 => Ok(Self::Audio), + 2 => Ok(Self::Control), + other => bail!("unknown packet type: {other}"), + } + } +} + +/// Flags in the packet header. +pub const FLAG_KEYFRAME: u8 = 0x01; + +/// Fixed-size header on the wire. +#[derive(Debug, Clone, Copy)] +pub struct PacketHeader { + pub packet_type: PacketType, + pub flags: u8, + pub length: u32, + pub timestamp_ns: u64, +} + +impl PacketHeader { + pub fn to_bytes(&self) -> [u8; HEADER_SIZE] { + let mut buf = [0u8; HEADER_SIZE]; + buf[0] = self.packet_type as u8; + buf[1] = self.flags; + // buf[2..4] reserved + buf[4..8].copy_from_slice(&self.length.to_le_bytes()); + buf[8..16].copy_from_slice(&self.timestamp_ns.to_le_bytes()); + buf + } + + pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Result { + let packet_type = PacketType::from_u8(buf[0])?; + let flags = buf[1]; + let length = u32::from_le_bytes(buf[4..8].try_into().unwrap()); + let timestamp_ns = u64::from_le_bytes(buf[8..16].try_into().unwrap()); + + if length > MAX_PAYLOAD_SIZE { + bail!("payload too large: {length} bytes (max {MAX_PAYLOAD_SIZE})"); + } + + Ok(Self { + packet_type, + flags, + length, + timestamp_ns, + }) + } + + pub fn is_keyframe(&self) -> bool { + self.flags & FLAG_KEYFRAME != 0 + } +} + +/// A complete packet: header + payload. +#[derive(Debug, Clone)] +pub struct WirePacket { + pub header: PacketHeader, + pub payload: Vec, +} + +/// Read one packet from an async reader. +pub async fn read_packet(reader: &mut R) -> Result { + let mut header_buf = [0u8; HEADER_SIZE]; + reader + .read_exact(&mut header_buf) + .await + .context("reading packet header")?; + + let header = PacketHeader::from_bytes(&header_buf)?; + + let mut payload = vec![0u8; header.length as usize]; + if !payload.is_empty() { + reader + .read_exact(&mut payload) + .await + .context("reading packet payload")?; + } + + Ok(WirePacket { header, payload }) +} + +/// Write one packet to an async writer. +pub async fn write_packet( + writer: &mut W, + packet: &WirePacket, +) -> Result<()> { + writer + .write_all(&packet.header.to_bytes()) + .await + .context("writing packet header")?; + if !packet.payload.is_empty() { + writer + .write_all(&packet.payload) + .await + .context("writing packet payload")?; + } + Ok(()) +} + +/// Control messages exchanged as JSON in control packets. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "ctrl")] +pub enum ControlMessage { + #[serde(rename = "session_start")] + SessionStart { + id: String, + video: VideoParams, + audio: AudioParams, + }, + #[serde(rename = "session_stop")] + SessionStop, + #[serde(rename = "keepalive")] + Keepalive, + #[serde(rename = "reconnect")] + Reconnect { last_pts: i64 }, + #[serde(rename = "param_change")] + ParamChange { scene_threshold: f64 }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VideoParams { + pub width: u32, + pub height: u32, + pub codec: String, + pub fps: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AudioParams { + pub sample_rate: u32, + pub channels: u16, + pub codec: String, +} + +impl ControlMessage { + /// Encode to a WirePacket for sending. + pub fn to_wire_packet(&self) -> Result { + let json = serde_json::to_vec(self)?; + Ok(WirePacket { + header: PacketHeader { + packet_type: PacketType::Control, + flags: 0, + length: json.len() as u32, + timestamp_ns: 0, + }, + payload: json, + }) + } + + /// Decode from a WirePacket payload. + pub fn from_payload(payload: &[u8]) -> Result { + serde_json::from_slice(payload).context("parsing control message JSON") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn header_round_trip() { + let header = PacketHeader { + packet_type: PacketType::Video, + flags: FLAG_KEYFRAME, + length: 4096, + timestamp_ns: 1_000_000_000, + }; + let bytes = header.to_bytes(); + let decoded = PacketHeader::from_bytes(&bytes).unwrap(); + + assert_eq!(decoded.packet_type, PacketType::Video); + assert!(decoded.is_keyframe()); + assert_eq!(decoded.length, 4096); + assert_eq!(decoded.timestamp_ns, 1_000_000_000); + } + + #[test] + fn header_rejects_oversized_payload() { + let mut bytes = [0u8; HEADER_SIZE]; + bytes[4..8].copy_from_slice(&(MAX_PAYLOAD_SIZE + 1).to_le_bytes()); + assert!(PacketHeader::from_bytes(&bytes).is_err()); + } + + #[test] + fn control_message_round_trip() { + let msg = ControlMessage::SessionStart { + id: "20260404_120000".into(), + video: VideoParams { + width: 1920, + height: 1080, + codec: "h264".into(), + fps: 30, + }, + audio: AudioParams { + sample_rate: 48000, + channels: 2, + codec: "aac".into(), + }, + }; + let wire = msg.to_wire_packet().unwrap(); + assert_eq!(wire.header.packet_type, PacketType::Control); + + let decoded = ControlMessage::from_payload(&wire.payload).unwrap(); + match decoded { + ControlMessage::SessionStart { id, video, audio } => { + assert_eq!(id, "20260404_120000"); + assert_eq!(video.width, 1920); + assert_eq!(audio.channels, 2); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn all_control_variants_serialize() { + let messages = vec![ + ControlMessage::SessionStop, + ControlMessage::Keepalive, + ControlMessage::Reconnect { last_pts: 12345 }, + ControlMessage::ParamChange { + scene_threshold: 0.15, + }, + ]; + for msg in &messages { + let wire = msg.to_wire_packet().unwrap(); + let decoded = ControlMessage::from_payload(&wire.payload).unwrap(); + // Just verify it doesn't panic + let _ = format!("{decoded:?}"); + } + } + + #[tokio::test] + async fn wire_packet_read_write_round_trip() { + let packet = WirePacket { + header: PacketHeader { + packet_type: PacketType::Audio, + flags: 0, + length: 5, + timestamp_ns: 42_000_000, + }, + payload: b"hello".to_vec(), + }; + + let mut buf = Vec::new(); + write_packet(&mut buf, &packet).await.unwrap(); + + let mut cursor = std::io::Cursor::new(buf); + let decoded = read_packet(&mut cursor).await.unwrap(); + + assert_eq!(decoded.header.packet_type, PacketType::Audio); + assert_eq!(decoded.header.length, 5); + assert_eq!(decoded.header.timestamp_ns, 42_000_000); + assert_eq!(decoded.payload, b"hello"); + } + + #[tokio::test] + async fn multiple_packets_round_trip() { + let packets: Vec = (0..100) + .map(|i| { + let data = vec![i as u8; (i * 10 + 1) as usize]; + WirePacket { + header: PacketHeader { + packet_type: if i % 3 == 0 { + PacketType::Video + } else if i % 3 == 1 { + PacketType::Audio + } else { + PacketType::Control + }, + flags: if i % 5 == 0 { FLAG_KEYFRAME } else { 0 }, + length: data.len() as u32, + timestamp_ns: i as u64 * 33_333_333, + }, + payload: data, + } + }) + .collect(); + + let mut buf = Vec::new(); + for p in &packets { + write_packet(&mut buf, p).await.unwrap(); + } + + let mut cursor = std::io::Cursor::new(buf); + for (i, original) in packets.iter().enumerate() { + let decoded = read_packet(&mut cursor).await.unwrap(); + assert_eq!( + decoded.header.packet_type, original.header.packet_type, + "packet {i} type mismatch" + ); + assert_eq!( + decoded.header.flags, original.header.flags, + "packet {i} flags mismatch" + ); + assert_eq!( + decoded.payload, original.payload, + "packet {i} payload mismatch" + ); + } + } +} diff --git a/media/server/Cargo.toml b/media/server/Cargo.toml new file mode 100644 index 0000000..ec3ba3c --- /dev/null +++ b/media/server/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "cht-server" +version = "0.1.0" +edition = "2021" + +[dependencies] +cht-common = { path = "../common" } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +anyhow = { workspace = true } diff --git a/media/server/src/main.rs b/media/server/src/main.rs new file mode 100644 index 0000000..11fdbea --- /dev/null +++ b/media/server/src/main.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use cht_common::protocol::{self, ControlMessage, PacketType}; +use tokio::io::BufReader; +use tokio::net::TcpListener; +use tracing::{error, info}; + +const LISTEN_ADDR: &str = "0.0.0.0:4444"; + +#[tokio::main] +async fn main() -> Result<()> { + cht_common::logging::init("server"); + + let listener = TcpListener::bind(LISTEN_ADDR).await?; + info!("Server listening on {LISTEN_ADDR}"); + + loop { + let (stream, addr) = listener.accept().await?; + info!("Client connected from {addr}"); + + tokio::spawn(async move { + if let Err(e) = handle_client(stream).await { + error!("Client {addr} error: {e:#}"); + } + info!("Client {addr} disconnected"); + }); + } +} + +async fn handle_client(stream: tokio::net::TcpStream) -> Result<()> { + let mut reader = BufReader::new(stream); + let mut video_packets = 0u64; + let mut audio_packets = 0u64; + + loop { + let packet = match protocol::read_packet(&mut reader).await { + Ok(p) => p, + Err(e) => { + // Any read error at the header boundary is a clean disconnect + // (includes EOF from flush + shutdown) + let msg = format!("{e:#}"); + if msg.contains("eof") || msg.contains("Eof") + || msg.contains("connection reset") + || msg.contains("broken pipe") + { + break; + } + return Err(e); + } + }; + + match packet.header.packet_type { + PacketType::Video => { + video_packets += 1; + if video_packets % 300 == 1 { + info!( + "video: {video_packets} packets, ts={}ms, keyframe={}", + packet.header.timestamp_ns / 1_000_000, + packet.header.is_keyframe(), + ); + } + } + PacketType::Audio => { + audio_packets += 1; + } + PacketType::Control => { + let ctrl = ControlMessage::from_payload(&packet.payload)?; + info!("control: {ctrl:?}"); + } + } + } + + info!("Session totals: {video_packets} video, {audio_packets} audio packets"); + Ok(()) +}