phase 1
This commit is contained in:
39
media/common/src/frame.rs
Normal file
39
media/common/src/frame.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Pixel format for decoded video frames.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum PixelFormat {
|
||||
NV12,
|
||||
I420,
|
||||
P010,
|
||||
RGBA,
|
||||
BGRA,
|
||||
}
|
||||
|
||||
/// Audio sample format.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum AudioFormat {
|
||||
S16,
|
||||
F32,
|
||||
F32Planar,
|
||||
}
|
||||
|
||||
/// Which track a packet belongs to.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[repr(u8)]
|
||||
pub enum Track {
|
||||
Video = 0,
|
||||
Audio = 1,
|
||||
}
|
||||
|
||||
/// Encoded packet ready for network transport or muxing.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Packet {
|
||||
pub data: Vec<u8>,
|
||||
pub pts: i64,
|
||||
pub dts: i64,
|
||||
pub timebase_num: u32,
|
||||
pub timebase_den: u32,
|
||||
pub keyframe: bool,
|
||||
pub track: Track,
|
||||
}
|
||||
3
media/common/src/lib.rs
Normal file
3
media/common/src/lib.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod frame;
|
||||
pub mod logging;
|
||||
pub mod protocol;
|
||||
41
media/common/src/logging.rs
Normal file
41
media/common/src/logging.rs
Normal file
@@ -0,0 +1,41 @@
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
/// Initialize logging to both stderr and a file under `media/logs/`.
|
||||
/// Log file: `media/logs/{name}.log`
|
||||
pub fn init(name: &str) {
|
||||
let log_dir = log_dir();
|
||||
fs::create_dir_all(&log_dir).expect("create logs dir");
|
||||
|
||||
let log_path = log_dir.join(format!("{name}.log"));
|
||||
let file = fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&log_path)
|
||||
.expect("open log file");
|
||||
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "info".into());
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_writer(std::io::stderr),
|
||||
)
|
||||
.with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.with_writer(std::sync::Mutex::new(file)),
|
||||
)
|
||||
.init();
|
||||
|
||||
tracing::info!("Logging to {}", log_path.display());
|
||||
}
|
||||
|
||||
fn log_dir() -> PathBuf {
|
||||
// Use CWD/logs (expected to run from media/ workspace root)
|
||||
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
|
||||
cwd.join("logs")
|
||||
}
|
||||
337
media/common/src/protocol.rs
Normal file
337
media/common/src/protocol.rs
Normal file
@@ -0,0 +1,337 @@
|
||||
//! Framed TCP protocol for CHT media transport.
|
||||
//!
|
||||
//! Wire format:
|
||||
//! ```text
|
||||
//! ┌──────────────────────────────────────────────┐
|
||||
//! │ Header (16 bytes) │
|
||||
//! │ type: u8 (0=video, 1=audio, 2=ctrl) │
|
||||
//! │ flags: u8 (bit 0=keyframe) │
|
||||
//! │ reserved: u16 │
|
||||
//! │ length: u32 (payload bytes) │
|
||||
//! │ timestamp: u64 (nanoseconds) │
|
||||
//! ├──────────────────────────────────────────────┤
|
||||
//! │ Payload (length bytes) │
|
||||
//! └──────────────────────────────────────────────┘
|
||||
//! ```
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
pub const HEADER_SIZE: usize = 16;
|
||||
pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024; // 16 MiB
|
||||
|
||||
/// Packet type on the wire.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[repr(u8)]
|
||||
pub enum PacketType {
|
||||
Video = 0,
|
||||
Audio = 1,
|
||||
Control = 2,
|
||||
}
|
||||
|
||||
impl PacketType {
|
||||
fn from_u8(v: u8) -> Result<Self> {
|
||||
match v {
|
||||
0 => Ok(Self::Video),
|
||||
1 => Ok(Self::Audio),
|
||||
2 => Ok(Self::Control),
|
||||
other => bail!("unknown packet type: {other}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Flags in the packet header.
|
||||
pub const FLAG_KEYFRAME: u8 = 0x01;
|
||||
|
||||
/// Fixed-size header on the wire.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PacketHeader {
|
||||
pub packet_type: PacketType,
|
||||
pub flags: u8,
|
||||
pub length: u32,
|
||||
pub timestamp_ns: u64,
|
||||
}
|
||||
|
||||
impl PacketHeader {
|
||||
pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
|
||||
let mut buf = [0u8; HEADER_SIZE];
|
||||
buf[0] = self.packet_type as u8;
|
||||
buf[1] = self.flags;
|
||||
// buf[2..4] reserved
|
||||
buf[4..8].copy_from_slice(&self.length.to_le_bytes());
|
||||
buf[8..16].copy_from_slice(&self.timestamp_ns.to_le_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Result<Self> {
|
||||
let packet_type = PacketType::from_u8(buf[0])?;
|
||||
let flags = buf[1];
|
||||
let length = u32::from_le_bytes(buf[4..8].try_into().unwrap());
|
||||
let timestamp_ns = u64::from_le_bytes(buf[8..16].try_into().unwrap());
|
||||
|
||||
if length > MAX_PAYLOAD_SIZE {
|
||||
bail!("payload too large: {length} bytes (max {MAX_PAYLOAD_SIZE})");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
packet_type,
|
||||
flags,
|
||||
length,
|
||||
timestamp_ns,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_keyframe(&self) -> bool {
|
||||
self.flags & FLAG_KEYFRAME != 0
|
||||
}
|
||||
}
|
||||
|
||||
/// A complete packet: header + payload.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WirePacket {
|
||||
pub header: PacketHeader,
|
||||
pub payload: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Read one packet from an async reader.
|
||||
pub async fn read_packet<R: AsyncReadExt + Unpin>(reader: &mut R) -> Result<WirePacket> {
|
||||
let mut header_buf = [0u8; HEADER_SIZE];
|
||||
reader
|
||||
.read_exact(&mut header_buf)
|
||||
.await
|
||||
.context("reading packet header")?;
|
||||
|
||||
let header = PacketHeader::from_bytes(&header_buf)?;
|
||||
|
||||
let mut payload = vec![0u8; header.length as usize];
|
||||
if !payload.is_empty() {
|
||||
reader
|
||||
.read_exact(&mut payload)
|
||||
.await
|
||||
.context("reading packet payload")?;
|
||||
}
|
||||
|
||||
Ok(WirePacket { header, payload })
|
||||
}
|
||||
|
||||
/// Write one packet to an async writer.
|
||||
pub async fn write_packet<W: AsyncWriteExt + Unpin>(
|
||||
writer: &mut W,
|
||||
packet: &WirePacket,
|
||||
) -> Result<()> {
|
||||
writer
|
||||
.write_all(&packet.header.to_bytes())
|
||||
.await
|
||||
.context("writing packet header")?;
|
||||
if !packet.payload.is_empty() {
|
||||
writer
|
||||
.write_all(&packet.payload)
|
||||
.await
|
||||
.context("writing packet payload")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Control messages exchanged as JSON in control packets.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "ctrl")]
|
||||
pub enum ControlMessage {
|
||||
#[serde(rename = "session_start")]
|
||||
SessionStart {
|
||||
id: String,
|
||||
video: VideoParams,
|
||||
audio: AudioParams,
|
||||
},
|
||||
#[serde(rename = "session_stop")]
|
||||
SessionStop,
|
||||
#[serde(rename = "keepalive")]
|
||||
Keepalive,
|
||||
#[serde(rename = "reconnect")]
|
||||
Reconnect { last_pts: i64 },
|
||||
#[serde(rename = "param_change")]
|
||||
ParamChange { scene_threshold: f64 },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct VideoParams {
|
||||
pub width: u32,
|
||||
pub height: u32,
|
||||
pub codec: String,
|
||||
pub fps: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AudioParams {
|
||||
pub sample_rate: u32,
|
||||
pub channels: u16,
|
||||
pub codec: String,
|
||||
}
|
||||
|
||||
impl ControlMessage {
|
||||
/// Encode to a WirePacket for sending.
|
||||
pub fn to_wire_packet(&self) -> Result<WirePacket> {
|
||||
let json = serde_json::to_vec(self)?;
|
||||
Ok(WirePacket {
|
||||
header: PacketHeader {
|
||||
packet_type: PacketType::Control,
|
||||
flags: 0,
|
||||
length: json.len() as u32,
|
||||
timestamp_ns: 0,
|
||||
},
|
||||
payload: json,
|
||||
})
|
||||
}
|
||||
|
||||
/// Decode from a WirePacket payload.
|
||||
pub fn from_payload(payload: &[u8]) -> Result<Self> {
|
||||
serde_json::from_slice(payload).context("parsing control message JSON")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn header_round_trip() {
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Video,
|
||||
flags: FLAG_KEYFRAME,
|
||||
length: 4096,
|
||||
timestamp_ns: 1_000_000_000,
|
||||
};
|
||||
let bytes = header.to_bytes();
|
||||
let decoded = PacketHeader::from_bytes(&bytes).unwrap();
|
||||
|
||||
assert_eq!(decoded.packet_type, PacketType::Video);
|
||||
assert!(decoded.is_keyframe());
|
||||
assert_eq!(decoded.length, 4096);
|
||||
assert_eq!(decoded.timestamp_ns, 1_000_000_000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn header_rejects_oversized_payload() {
|
||||
let mut bytes = [0u8; HEADER_SIZE];
|
||||
bytes[4..8].copy_from_slice(&(MAX_PAYLOAD_SIZE + 1).to_le_bytes());
|
||||
assert!(PacketHeader::from_bytes(&bytes).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn control_message_round_trip() {
|
||||
let msg = ControlMessage::SessionStart {
|
||||
id: "20260404_120000".into(),
|
||||
video: VideoParams {
|
||||
width: 1920,
|
||||
height: 1080,
|
||||
codec: "h264".into(),
|
||||
fps: 30,
|
||||
},
|
||||
audio: AudioParams {
|
||||
sample_rate: 48000,
|
||||
channels: 2,
|
||||
codec: "aac".into(),
|
||||
},
|
||||
};
|
||||
let wire = msg.to_wire_packet().unwrap();
|
||||
assert_eq!(wire.header.packet_type, PacketType::Control);
|
||||
|
||||
let decoded = ControlMessage::from_payload(&wire.payload).unwrap();
|
||||
match decoded {
|
||||
ControlMessage::SessionStart { id, video, audio } => {
|
||||
assert_eq!(id, "20260404_120000");
|
||||
assert_eq!(video.width, 1920);
|
||||
assert_eq!(audio.channels, 2);
|
||||
}
|
||||
_ => panic!("wrong variant"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_control_variants_serialize() {
|
||||
let messages = vec![
|
||||
ControlMessage::SessionStop,
|
||||
ControlMessage::Keepalive,
|
||||
ControlMessage::Reconnect { last_pts: 12345 },
|
||||
ControlMessage::ParamChange {
|
||||
scene_threshold: 0.15,
|
||||
},
|
||||
];
|
||||
for msg in &messages {
|
||||
let wire = msg.to_wire_packet().unwrap();
|
||||
let decoded = ControlMessage::from_payload(&wire.payload).unwrap();
|
||||
// Just verify it doesn't panic
|
||||
let _ = format!("{decoded:?}");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wire_packet_read_write_round_trip() {
|
||||
let packet = WirePacket {
|
||||
header: PacketHeader {
|
||||
packet_type: PacketType::Audio,
|
||||
flags: 0,
|
||||
length: 5,
|
||||
timestamp_ns: 42_000_000,
|
||||
},
|
||||
payload: b"hello".to_vec(),
|
||||
};
|
||||
|
||||
let mut buf = Vec::new();
|
||||
write_packet(&mut buf, &packet).await.unwrap();
|
||||
|
||||
let mut cursor = std::io::Cursor::new(buf);
|
||||
let decoded = read_packet(&mut cursor).await.unwrap();
|
||||
|
||||
assert_eq!(decoded.header.packet_type, PacketType::Audio);
|
||||
assert_eq!(decoded.header.length, 5);
|
||||
assert_eq!(decoded.header.timestamp_ns, 42_000_000);
|
||||
assert_eq!(decoded.payload, b"hello");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiple_packets_round_trip() {
|
||||
let packets: Vec<WirePacket> = (0..100)
|
||||
.map(|i| {
|
||||
let data = vec![i as u8; (i * 10 + 1) as usize];
|
||||
WirePacket {
|
||||
header: PacketHeader {
|
||||
packet_type: if i % 3 == 0 {
|
||||
PacketType::Video
|
||||
} else if i % 3 == 1 {
|
||||
PacketType::Audio
|
||||
} else {
|
||||
PacketType::Control
|
||||
},
|
||||
flags: if i % 5 == 0 { FLAG_KEYFRAME } else { 0 },
|
||||
length: data.len() as u32,
|
||||
timestamp_ns: i as u64 * 33_333_333,
|
||||
},
|
||||
payload: data,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut buf = Vec::new();
|
||||
for p in &packets {
|
||||
write_packet(&mut buf, p).await.unwrap();
|
||||
}
|
||||
|
||||
let mut cursor = std::io::Cursor::new(buf);
|
||||
for (i, original) in packets.iter().enumerate() {
|
||||
let decoded = read_packet(&mut cursor).await.unwrap();
|
||||
assert_eq!(
|
||||
decoded.header.packet_type, original.header.packet_type,
|
||||
"packet {i} type mismatch"
|
||||
);
|
||||
assert_eq!(
|
||||
decoded.header.flags, original.header.flags,
|
||||
"packet {i} flags mismatch"
|
||||
);
|
||||
assert_eq!(
|
||||
decoded.payload, original.payload,
|
||||
"packet {i} payload mismatch"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user