mod session; use std::path::PathBuf; use anyhow::Result; use cht_common::protocol::{self, ControlMessage, PacketType}; use session::Session; use tokio::io::BufReader; use tokio::net::TcpListener; use tracing::{error, info, warn}; const LISTEN_ADDR: &str = "0.0.0.0:4447"; const DEFAULT_SESSIONS_DIR: &str = "/home/mariano/wdir/cht/data/sessions"; fn sessions_dir() -> PathBuf { std::env::var("CHT_SESSIONS_DIR") .map(PathBuf::from) .unwrap_or_else(|_| PathBuf::from(DEFAULT_SESSIONS_DIR)) } #[tokio::main] async fn main() -> Result<()> { cht_common::logging::init("server"); let sessions_dir = sessions_dir(); info!("Sessions dir: {}", sessions_dir.display()); let listener = TcpListener::bind(LISTEN_ADDR).await?; info!("Server listening on {LISTEN_ADDR}"); loop { let (stream, addr) = listener.accept().await?; info!("Client connected from {addr}"); let sdir = sessions_dir.clone(); tokio::spawn(async move { if let Err(e) = handle_client(stream, sdir).await { error!("Client {addr} error: {e:#}"); } info!("Client {addr} disconnected"); }); } } async fn handle_client( stream: tokio::net::TcpStream, sessions_dir: PathBuf, ) -> Result<()> { let mut reader = BufReader::new(stream); let mut session: Option = None; let mut video_count = 0u64; let mut audio_count = 0u64; loop { let packet = match protocol::read_packet(&mut reader).await { Ok(p) => p, Err(e) => { let msg = format!("{e:#}"); if msg.contains("eof") || msg.contains("Eof") || msg.contains("connection reset") || msg.contains("broken pipe") { break; } return Err(e); } }; match packet.header.packet_type { PacketType::Video => { if let Some(s) = &mut session { // Blocking write — offload to blocking thread to avoid stalling tokio. let data = packet.payload; let keyframe = packet.header.is_keyframe(); tokio::task::block_in_place(|| s.write_video(&data, keyframe))?; video_count += 1; if video_count % 300 == 1 { info!("video: {video_count} packets, ts={}ms, keyframe={}", packet.header.timestamp_ns / 1_000_000, packet.header.is_keyframe()); } } else { warn!("Video packet before SessionStart — dropped"); } } PacketType::Audio => { if let Some(s) = &mut session { let data = packet.payload; tokio::task::block_in_place(|| s.write_audio(&data))?; audio_count += 1; if audio_count % 500 == 1 { info!("audio: {audio_count} packets"); } } } PacketType::Control => { let ctrl = ControlMessage::from_payload(&packet.payload)?; info!("control: {ctrl:?}"); match ctrl { ControlMessage::SessionStart { id, video, audio } => { let s = tokio::task::block_in_place(|| { Session::start(&id, &sessions_dir, video.fps, &audio) })?; session = Some(s); } ControlMessage::SessionStop => { if let Some(s) = session.take() { tokio::task::block_in_place(|| s.close()); } break; } ControlMessage::Keepalive | ControlMessage::Reconnect { .. } | ControlMessage::ParamChange { .. } => {} } } } } if let Some(s) = session.take() { tokio::task::block_in_place(|| s.close()); } info!("Session totals: {video_count} video, {audio_count} audio packets"); Ok(()) }