From 512d8ecef80b8297f92a04e3ad85ae6ee6067aea Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Thu, 9 Apr 2026 22:15:16 -0300 Subject: [PATCH] almost back to working state with rust transport --- cht/stream/ffmpeg.py | 33 +- cht/stream/lifecycle.py | 45 ++- cht/stream/manager.py | 517 ++++++------------------ cht/stream/processor.py | 365 +++++++++++++++++ cht/stream/recorder.py | 256 ++++++++++++ cht/window.py | 8 +- media/Cargo.lock | 1 + media/client/Cargo.toml | 1 + media/client/src/backends/subprocess.rs | 259 +++++++++--- media/client/src/encoder.rs | 11 +- media/client/src/main.rs | 103 ++++- media/server/src/main.rs | 87 +++- media/server/src/session.rs | 306 ++++++++++++++ 13 files changed, 1504 insertions(+), 488 deletions(-) create mode 100644 cht/stream/processor.py create mode 100644 cht/stream/recorder.py create mode 100644 media/server/src/session.rs diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index 091a2bb..641e39b 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -178,15 +178,45 @@ def extract_scene_frames(input_path, output_dir, scene_threshold=0.10, return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") +def detect_scenes_from_pipe(scene_threshold=0.10, flush_frames=2, fps=30): + """Scene-detect from piped raw H.264 on stdin. Returns a node for run_async. + + Used when Rust server provides a live H.264 stream via Unix socket. + Caller bridges the socket to ffmpeg's stdin and reads stdout/stderr: + - stdin: raw H.264 from the socket + - stdout: MJPEG pipe (JPEG frames on scene change) + - stderr: showinfo lines with pts_time timestamps + """ + stream = ffmpeg.input("pipe:0", f="h264", framerate=fps, hwaccel="cuda") + scene_expr = f"gt(scene,{scene_threshold})" + if flush_frames > 0: + mod_val = 1 + flush_frames + flush_expr = f"eq(n,prev_selected_n+1)*mod(selected_n,{mod_val})" + select_expr = f"{scene_expr}+{flush_expr}" + else: + select_expr = scene_expr + scene_stream = stream.filter("select", select_expr).filter("showinfo") + return ffmpeg.output( + scene_stream, "pipe:1", + f="image2pipe", vcodec="mjpeg", + flush_packets=1, strict="unofficial", + **{"q:v": "2", "fps_mode": "passthrough"}, + ).global_args(*GLOBAL_ARGS) + + def extract_audio_chunk(input_path, output_path, start_time=0.0, duration=None): """Extract audio from recording as 16kHz mono WAV (optimal for Whisper). Uses input-level seeking (-ss before -i) for fast keyframe-based seek. + Supports fMP4 (auto-detect) and raw AAC files (explicit format hint). Returns (stdout, stderr) as decoded strings. """ kwargs = {"ss": start_time} if duration is not None: kwargs["t"] = duration + # Raw AAC files need explicit format hint + if str(input_path).endswith(".aac"): + kwargs["f"] = "aac" stream = ffmpeg.input(str(input_path), **kwargs) output = ( ffmpeg.output( @@ -219,10 +249,11 @@ def extract_frame_at(input_path, output_path, timestamp): output.run(capture_stdout=True, capture_stderr=True) -def run_async(output_node, pipe_stdout=False, pipe_stderr=False): +def run_async(output_node, pipe_stdin=False, pipe_stdout=False, pipe_stderr=False): """Start an ffmpeg pipeline asynchronously via ffmpeg-python's run_async.""" log.info("run_async: %s", " ".join(output_node.compile())) return output_node.run_async( + pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr, ) diff --git a/cht/stream/lifecycle.py b/cht/stream/lifecycle.py index 19bc07c..6e6c13c 100644 --- a/cht/stream/lifecycle.py +++ b/cht/stream/lifecycle.py @@ -57,14 +57,29 @@ class StreamLifecycle: def tracker(self) -> RecordingTracker | None: return self._tracker - def start(self, session_id=None) -> StreamManager: - """Start recording and all background processes. Returns the StreamManager.""" + def start(self, session_id=None, rust_transport=False) -> StreamManager: + """Start recording and all background processes. Returns the StreamManager. + + rust_transport=True: skip StreamRecorder (Rust cht-server handles TCP + + fMP4 + UDP relay). Session dir is discovered from data/active-session + written by cht-server on first client connection. + """ self._streaming = True self._gone_live = False + self._rust_transport = rust_transport - self._stream_mgr = StreamManager(session_id=session_id) - self._stream_mgr.setup_dirs() - self._stream_mgr.start_recorder() + if rust_transport: + # Wait for cht-server to write the active session path. + session_dir = self._wait_for_rust_session() + if session_dir is None: + log.error("Timed out waiting for cht-server session") + self._streaming = False + return None + self._stream_mgr = StreamManager.from_rust_session(session_dir) + else: + self._stream_mgr = StreamManager(session_id=session_id) + self._stream_mgr.setup_dirs() + self._stream_mgr.start_recorder() self._tracker = RecordingTracker( get_segments=lambda: self._stream_mgr.recording_segments if self._stream_mgr else [], @@ -76,10 +91,28 @@ class StreamLifecycle: self._stream_mgr.start_audio_extractor(on_new_audio=self._handle_new_audio) GLib.timeout_add(1000, self._tick_live) - GLib.timeout_add(2000, self._check_recorder) + if not rust_transport: + GLib.timeout_add(2000, self._check_recorder) return self._stream_mgr + def _wait_for_rust_session(self, timeout=30, poll_interval=0.5): + """Poll data/active-session until cht-server writes it.""" + import time + from pathlib import Path + from cht.config import DATA_DIR + marker = DATA_DIR / "active-session" + elapsed = 0.0 + while elapsed < timeout: + if marker.exists(): + session_dir = Path(marker.read_text().strip()) + if session_dir.exists(): + log.info("Rust session dir: %s", session_dir) + return session_dir + time.sleep(poll_interval) + elapsed += poll_interval + return None + def stop(self): """Stop all processes and reset state. Does NOT touch UI — caller handles that.""" if self._tracker: diff --git a/cht/stream/manager.py b/cht/stream/manager.py index 07e1a63..724a2ff 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -1,30 +1,25 @@ -""" -StreamManager: orchestrates ffmpeg for recording and scene detection. +"""StreamManager: coordinates StreamRecorder and SessionProcessor. -Architecture: - sender → TCP:4444 → single ffmpeg process: - 1. writes fMP4 to disk (c=copy) - 2. relays UDP for live display (c=copy) - 3. CUDA decode → scene filter → JPEG frames (real-time) +Thin facade that keeps the existing public API intact while delegating +to two focused classes: + + StreamRecorder — ffmpeg network receiver + fMP4 recorder + scene detection + (to be replaced by cht-server in Rust in a future phase) + + SessionProcessor — audio extraction from fMP4 + (stays Python; reads files regardless of how they were written) + +Callers (lifecycle.py, window.py) use StreamManager as before — no changes +needed there. """ -import json import logging -import re import time -from threading import Thread +from pathlib import Path -from cht.config import ( - STREAM_HOST, - STREAM_PORT, - RELAY_PORT, - SCENE_THRESHOLD, - SCENE_FLUSH_FRAMES, - SESSIONS_DIR, - AUDIO_EXTRACT_INTERVAL, - AUDIO_SAFETY_MARGIN, -) -from cht.stream import ffmpeg as ff +from cht.config import SCENE_THRESHOLD, SESSIONS_DIR +from cht.stream.recorder import StreamRecorder +from cht.stream.processor import SessionProcessor log = logging.getLogger(__name__) @@ -62,16 +57,55 @@ class StreamManager: self.audio_dir = self.session_dir / "audio" self.agent_dir = self.session_dir / "agent" - self._procs = {} - self._threads = {} - self._stop_flags = set() - self._segment = 0 - self._segment_offsets = {0: 0.0} # segment_index → global_offset - self.scene_threshold = SCENE_THRESHOLD - self.readonly = False # True when loaded from existing session - self.telemetry = None # set by window after start + self.readonly = False + self.telemetry = None + + self.recorder = StreamRecorder(self.session_dir) + self.processor = SessionProcessor(self.session_dir) + self.processor.attach( + get_recording_path=lambda: self.recorder.recording_path, + get_current_global_offset=lambda: self.recorder.current_global_offset, + ) + # Wire recorder pipe output → processor frame handling + self.recorder.set_on_raw_frame(self.processor.on_raw_frame) + log.info("Session: %s", session_id) + @classmethod + def from_rust_session(cls, session_dir: Path): + """Attach to a live session being recorded by cht-server (Rust). + + No StreamRecorder is started — Rust owns the TCP + fMP4 + UDP relay. + SessionProcessor handles audio extraction from the growing fMP4. + Scene detection pipe is also skipped (Rust will handle it eventually). + """ + mgr = cls.__new__(cls) + mgr.session_id = session_dir.name + mgr.session_dir = session_dir + mgr.stream_dir = session_dir / "stream" + mgr.frames_dir = session_dir / "frames" + mgr.transcript_dir = session_dir / "transcript" + mgr.audio_dir = session_dir / "audio" + mgr.agent_dir = session_dir / "agent" + mgr.readonly = False + mgr.telemetry = None + + # No recorder — Rust server owns transport + recording. + mgr.recorder = None + mgr.processor = SessionProcessor(session_dir) + mgr.processor.attach( + get_recording_path=lambda: next(iter(sorted(mgr.stream_dir.glob("recording_*.mp4"))), None) + if mgr.stream_dir.exists() else None, + get_current_global_offset=lambda: 0.0, + ) + + for d in (mgr.stream_dir, mgr.frames_dir, mgr.transcript_dir, + mgr.audio_dir, mgr.agent_dir): + d.mkdir(parents=True, exist_ok=True) + + log.info("Attached to Rust session: %s", mgr.session_id) + return mgr + @classmethod def from_existing(cls, session_id): """Load an existing session without starting any ffmpeg processes.""" @@ -80,51 +114,83 @@ class StreamManager: if not mgr.session_dir.exists(): raise FileNotFoundError(f"Session not found: {session_id}") mgr.readonly = True - # Point _segment to last recording segment - segments = mgr.recording_segments - if segments: - mgr._segment = len(segments) - 1 - mgr._rebuild_offsets() + mgr.recorder._segment = max(0, len(mgr.recorder.recording_segments) - 1) + mgr.recorder._rebuild_offsets() rebuild_manifest(mgr.session_dir) log.info("Loaded existing session: %s (%d segments, %d frames)", - session_id, len(segments), mgr.frame_count) + session_id, len(mgr.recorder.recording_segments), mgr.frame_count) return mgr + # -- Recorder delegation -- + + @property + def scene_threshold(self) -> float: + return self.recorder.scene_threshold if self.recorder else 0.10 + + @property + def relay_url(self) -> str: + return self.recorder.relay_url if self.recorder else "udp://127.0.0.1:4445" + + @property + def recording_path(self) -> Path: + if self.recorder: + return self.recorder.recording_path + return next(iter(sorted(self.stream_dir.glob("recording_*.mp4"))), None) + + @property + def recording_segments(self) -> list[Path]: + if self.recorder: + return self.recorder.recording_segments + return sorted(self.stream_dir.glob("recording_*.mp4")) + @property def current_global_offset(self) -> float: - """Global time offset for the current recording segment.""" - return self._segment_offsets.get(self._segment, 0.0) - - def _rebuild_offsets(self): - """Compute global offsets from all segments on disk.""" - from cht.session import probe_duration - offset = 0.0 - self._segment_offsets = {} - for i, seg in enumerate(self.recording_segments): - self._segment_offsets[i] = offset - offset += probe_duration(seg) - - def _advance_segment_offset(self, completed_segment_path): - """Update offsets after a segment completes and a new one begins.""" - from cht.session import probe_duration - dur = probe_duration(completed_segment_path) - prev_offset = self._segment_offsets.get(self._segment, 0.0) - self._segment_offsets[self._segment + 1] = prev_offset + dur - log.info("Segment %d completed (%.1fs), next offset: %.1fs", - self._segment, dur, prev_offset + dur) + return self.recorder.current_global_offset if self.recorder else 0.0 @property - def frame_count(self): - index_path = self.frames_dir / "index.json" - if index_path.exists(): - try: - return len(json.loads(index_path.read_text())) - except Exception: - pass - return 0 + def frame_count(self) -> int: + return self.processor.frame_count - def total_duration(self): - """Probe total duration across all segments (for completed sessions).""" + def setup_dirs(self): + for d in (self.stream_dir, self.frames_dir, self.transcript_dir, + self.audio_dir, self.agent_dir): + d.mkdir(parents=True, exist_ok=True) + + def start_recorder(self): + if self.recorder: + self.recorder.start() + + def restart_recorder(self): + if self.recorder: + self.recorder.restart() + + def recorder_alive(self) -> bool: + return self.recorder.alive() if self.recorder else True # Rust owns it + + def start_scene_detector(self, on_new_frames=None): + if self.recorder: + self.recorder.set_on_new_scene_frames(on_new_frames) + else: + self.processor.set_on_new_frames(on_new_frames) + self.processor.start_scene_detector(threshold=SCENE_THRESHOLD) + + def capture_now(self, on_new_frames=None): + self.processor.set_on_new_frames(on_new_frames) + if self.recorder: + self.recorder.capture_now(on_raw_frame=self.processor.on_captured_frame) + + def update_scene_threshold(self, new_threshold: float): + if self.recorder: + self.recorder.update_scene_threshold(new_threshold) + + # -- Processor delegation -- + + def start_audio_extractor(self, on_new_audio=None): + self.processor.start_audio_extractor(on_new_audio=on_new_audio) + + # -- Session-level -- + + def total_duration(self) -> float: total = 0.0 for seg in self.recording_segments: try: @@ -144,323 +210,8 @@ class StreamManager: total += seg.stat().st_size / 65_000 return total - def setup_dirs(self): - for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.audio_dir, self.agent_dir): - d.mkdir(parents=True, exist_ok=True) - - @property - def stream_url(self): - return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" - - @property - def relay_url(self): - return f"udp://127.0.0.1:{RELAY_PORT}" - - @property - def recording_path(self): - """Current recording segment path.""" - return self.stream_dir / f"recording_{self._segment:03d}.mp4" - - @property - def recording_segments(self): - """All recording segments in order.""" - return sorted(self.stream_dir.glob("recording_*.mp4")) - - # -- Recording -- - - def start_recorder(self): - """Start ffmpeg to receive TCP stream, write to fMP4, and relay to UDP.""" - # Start after existing segments (for resumed sessions) - existing = self.recording_segments - self._segment = len(existing) - self._rebuild_offsets() - self._launch_recorder() - - def restart_recorder(self): - """Restart recorder into a new segment. Session stays alive.""" - old = self._procs.pop("recorder", None) - if old: - ff.stop_proc(old) - completed_path = self.recording_path - self._advance_segment_offset(completed_path) - self._segment += 1 - log.info("Restarting recorder → segment %d (offset %.1fs)", - self._segment, self.current_global_offset) - self._launch_recorder() - - def recorder_alive(self): - """Check if the recorder process is still running.""" - proc = self._procs.get("recorder") - return proc is not None and proc.poll() is None - - def _launch_recorder(self): - start_number = self._next_frame_number() - node = ff.receive_record_relay_and_detect( - self.stream_url, self.recording_path, self.relay_url, - scene_threshold=self.scene_threshold, - flush_frames=SCENE_FLUSH_FRAMES, - ) - proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True) - self._procs["recorder"] = proc - log.info("Recorder+scene: pid=%s → %s (threshold=%.2f, start_number=%d)", - proc.pid, self.recording_path, self.scene_threshold, start_number) - self._start_scene_readers(proc, start_number) - - # -- Scene Detection -- - - def start_scene_detector(self, on_new_frames=None): - """Register callback for new scene frames. - - Scene detection runs inside the recorder process (single ffmpeg). - The stderr reader thread parses showinfo lines and fires this callback. - """ - self._on_new_frames = on_new_frames - - def update_scene_threshold(self, new_threshold): - """Update scene threshold. Restarts the recorder to apply new filter.""" - self.scene_threshold = new_threshold - log.info("Threshold changed → %.2f, restarting recorder", new_threshold) - self.restart_recorder() - - def _next_frame_number(self): - """Determine next frame number from the index (source of truth).""" - index_path = self.frames_dir / "index.json" - if index_path.exists(): - index = json.loads(index_path.read_text()) - return len(index) + 1 - return 1 - - def _append_frame_index(self, entry): - """Append a frame entry to index.json.""" - index_path = self.frames_dir / "index.json" - index = json.loads(index_path.read_text()) if index_path.exists() else [] - index.append(entry) - index_path.write_text(json.dumps(index, indent=2)) - - def _start_scene_readers(self, proc, start_number): - """Read scene frames from stdout (MJPEG pipe) and timestamps from stderr. - - Two threads: - - stderr: parses showinfo lines, queues pts_time values - - stdout: reads JPEG frames from pipe, pairs with queued timestamps, - writes files to disk, fires callbacks immediately - """ - from queue import Queue, Empty - import os - ts_queue = Queue() - - def _read_stderr(): - for raw in proc.stderr: - line = raw.decode("utf-8", errors="replace").rstrip() - if not line: - continue - if "showinfo" not in line: - log.debug("[recorder] %s", line) - continue - pts_match = re.search(r"pts_time:\s*([\d.]+)", line) - if pts_match: - ts_queue.put(float(pts_match.group(1))) - log.info("[recorder] stderr closed, exit=%s", proc.poll()) - - def _read_stdout(): - frame_num = start_number - offset = self.current_global_offset - last_pts = -1.0 - buf = b"" - raw_fd = proc.stdout.fileno() - while True: - chunk = os.read(raw_fd, 65536) - if not chunk: - break - buf += chunk - while True: - soi = buf.find(b"\xff\xd8") - if soi < 0: - buf = b"" - break - eoi = buf.find(b"\xff\xd9", soi + 2) - if eoi < 0: - buf = buf[soi:] - break - jpeg_data = buf[soi:eoi + 2] - buf = buf[eoi + 2:] - - try: - pts_time = ts_queue.get(timeout=2.0) - except Empty: - log.warning("No timestamp for scene frame %d", frame_num) - pts_time = 0.0 - - # Skip flush frames (within 100ms of previous = duplicate) - if pts_time - last_pts < 0.1: - log.debug("Skipping flush frame at pts=%.3f", pts_time) - continue - last_pts = pts_time - - frame_id = f"F{frame_num:04d}" - frame_path = self.frames_dir / f"{frame_id}.jpg" - frame_path.write_bytes(jpeg_data) - - entry = { - "id": frame_id, - "timestamp": pts_time + offset, - "path": str(frame_path), - "sent_to_agent": False, - } - self._append_frame_index(entry) - log.info("Scene frame: %s at %.1fs (pts=%.1f + offset=%.1f)", - frame_id, entry["timestamp"], pts_time, offset) - if self._on_new_frames: - self._on_new_frames([entry]) - frame_num += 1 - log.info("[recorder] stdout closed") - - Thread(target=_read_stderr, daemon=True, name="recorder_stderr").start() - Thread(target=_read_stdout, daemon=True, name="recorder_stdout").start() - - def _probe_safe_duration(self): - """Probe current recording duration via ffprobe. Returns seconds or None.""" - try: - import ffmpeg as ffmpeg_lib - info = ffmpeg_lib.probe(str(self.recording_path)) - dur = float(info.get("format", {}).get("duration", 0)) - if dur > 0: - return dur - for stream in info.get("streams", []): - sdur = float(stream.get("duration", 0)) - if sdur > 0: - return sdur - except Exception: - pass - try: - return self.recording_path.stat().st_size / 65_000 - except Exception: - return None - - def capture_now(self, on_new_frames=None): - """Capture a single frame from the current recording position. - - Grabs the latest available frame (safe_duration - 1s) and adds it - to the index. Runs in a thread to avoid blocking the UI. - """ - def _capture(): - safe_duration = self._probe_safe_duration() - if not safe_duration or safe_duration < 1: - log.warning("capture_now: recording too short") - return - - local_timestamp = safe_duration - 1 - timestamp = local_timestamp + self.current_global_offset - frame_num = self._next_frame_number() - frame_id = f"F{frame_num:04d}" - frame_path = self.frames_dir / f"{frame_id}.jpg" - - try: - ff.extract_frame_at(self.recording_path, frame_path, local_timestamp) - except Exception as e: - log.error("capture_now failed: %s", e) - return - - if not frame_path.exists(): - log.warning("capture_now: frame not written") - return - - entry = { - "id": frame_id, - "timestamp": timestamp, - "path": str(frame_path), - "sent_to_agent": False, - } - self._append_frame_index(entry) - log.info("Manual capture: %s at %.1fs", frame_id, timestamp) - - if on_new_frames: - on_new_frames([entry]) - - Thread(target=_capture, daemon=True, name="capture_now").start() - - # -- Audio Extraction -- - - def start_audio_extractor(self, on_new_audio=None): - """Periodically extract audio from the growing recording as WAV chunks. - - Same incremental pattern as scene detector: polls recording, extracts - new time range, calls back with (wav_path, start_time, duration). - - Args: - on_new_audio: callback(wav_path, start_time, duration) - """ - self._on_new_audio = on_new_audio - self.audio_dir.mkdir(parents=True, exist_ok=True) - - def _extract(): - processed_time = 0.0 - chunk_num = 0 - current_segment = None - - while "stop" not in self._stop_flags: - time.sleep(AUDIO_EXTRACT_INTERVAL) - - seg = self.recording_path - if not seg.exists(): - continue - - if seg != current_segment: - current_segment = seg - processed_time = 0.0 - chunk_num = 0 - log.info("Audio extractor: switched to %s", seg.name) - - if seg.stat().st_size < 100_000: - continue - - safe_duration = self._probe_safe_duration() - if safe_duration is None or safe_duration <= 0: - continue - - process_to = safe_duration - AUDIO_SAFETY_MARGIN - if process_to <= processed_time + 1.0: - continue - - chunk_duration = process_to - processed_time - wav_path = self.audio_dir / f"chunk_{chunk_num:04d}.wav" - - try: - ff.extract_audio_chunk( - seg, wav_path, - start_time=processed_time, - duration=chunk_duration, - ) - except Exception as e: - log.error("Audio extraction failed: %s", e) - continue - - if wav_path.exists() and wav_path.stat().st_size > 100: - global_start = processed_time + self.current_global_offset - log.info("Audio chunk: %s (%.1fs → %.1fs, global %.1fs)", - wav_path.name, processed_time, process_to, global_start) - if self._on_new_audio: - self._on_new_audio( - wav_path, global_start, chunk_duration, - segment_path=seg, local_start=processed_time, - ) - chunk_num += 1 - - processed_time = process_to - - log.info("Audio extractor stopped") - - t = Thread(target=_extract, daemon=True, name="audio_extractor") - t.start() - self._threads["audio_extractor"] = t - - # -- Lifecycle -- - def stop_all(self): log.info("Stopping all...") - self._stop_flags.add("stop") - for name, proc in self._procs.items(): - log.info("Stopping %s", name) - ff.stop_proc(proc) - self._procs.clear() - + self.processor.stop() + if self.recorder: + self.recorder.stop() diff --git a/cht/stream/processor.py b/cht/stream/processor.py new file mode 100644 index 0000000..46d5b45 --- /dev/null +++ b/cht/stream/processor.py @@ -0,0 +1,365 @@ +"""SessionProcessor: processes raw frame data and audio from recordings. + +Receives raw JPEG frames from StreamRecorder (via on_raw_frame callback) and +handles all frame processing: file writing, frame index, GUI callbacks. + +Also extracts audio from fMP4 files by polling (latency-insensitive). + +The boundary with StreamRecorder: + Recorder: reads pipe → fires on_raw_frame(jpeg_bytes, global_ts) + Processor: writes JPEG to disk, updates index, fires on_new_frames to GUI + +When Rust owns transport, SessionProcessor connects to the server's Unix +domain socket (scene.sock) for a live H.264 stream, pipes it to ffmpeg +for GPU scene detection. Continuous stream — no polling, no restarts. +""" + +import json +import logging +import os +import re +import socket +import time +from pathlib import Path +from queue import Queue, Empty +from threading import Thread + +from cht.config import ( + AUDIO_EXTRACT_INTERVAL, + AUDIO_SAFETY_MARGIN, + SCENE_THRESHOLD, + SCENE_FLUSH_FRAMES, +) +from cht.stream import ffmpeg as ff + +log = logging.getLogger(__name__) + + +class SessionProcessor: + """Writes scene frames to disk and extracts audio from fMP4.""" + + def __init__(self, session_dir: Path): + self.session_dir = session_dir + self.frames_dir = session_dir / "frames" + self.audio_dir = session_dir / "audio" + + self._stop_flags: set[str] = set() + self._threads: dict[str, Thread] = {} + self._on_new_frames = None + self._on_new_audio = None + + self._get_recording_path = None + self._get_current_global_offset = None + + def attach(self, get_recording_path, get_current_global_offset): + """Wire up callbacks to query the recorder's current state.""" + self._get_recording_path = get_recording_path + self._get_current_global_offset = get_current_global_offset + + # -- Scene frame handling (called from recorder's pipe thread) -- + + def on_raw_frame(self, jpeg_bytes: bytes, global_ts: float): + """Receive a raw JPEG frame from the recorder pipe. Write and index it.""" + frame_num = self._next_frame_number() + frame_id = f"F{frame_num:04d}" + frame_path = self.frames_dir / f"{frame_id}.jpg" + frame_path.write_bytes(jpeg_bytes) + + entry = { + "id": frame_id, + "timestamp": global_ts, + "path": str(frame_path), + "sent_to_agent": False, + } + self._append_frame_index(entry) + log.info("Scene frame: %s at %.1fs", frame_id, global_ts) + + if self._on_new_frames: + self._on_new_frames([entry]) + + def set_on_new_frames(self, cb): + self._on_new_frames = cb + + # -- On-demand capture (recorder extracts bytes, processor indexes) -- + + def on_captured_frame(self, jpeg_bytes: bytes, global_ts: float): + """Receive a manually captured frame. Write and index it.""" + self.on_raw_frame(jpeg_bytes, global_ts) + + # -- Frame index -- + + @property + def frame_count(self) -> int: + index_path = self.frames_dir / "index.json" + if index_path.exists(): + try: + return len(json.loads(index_path.read_text())) + except Exception: + pass + return 0 + + def _next_frame_number(self) -> int: + index_path = self.frames_dir / "index.json" + if index_path.exists(): + try: + return len(json.loads(index_path.read_text())) + 1 + except Exception: + pass + return 1 + + def _append_frame_index(self, entry: dict): + index_path = self.frames_dir / "index.json" + index = json.loads(index_path.read_text()) if index_path.exists() else [] + index.append(entry) + index_path.write_text(json.dumps(index, indent=2)) + + # -- Scene detection via Unix socket (Rust transport mode) -- + + def start_scene_detector(self, threshold=None): + """Connect to Rust server's scene socket and run GPU scene detection. + + The server provides a live H.264 stream via a Unix domain socket at + stream/scene.sock. We pipe it to ffmpeg for CUDA scene detection — + continuous stream, no polling, no restarts. + """ + threshold = threshold or SCENE_THRESHOLD + t = Thread(target=self._scene_detect_loop, daemon=True, + name="scene_detector", args=(threshold,)) + t.start() + self._threads["scene_detector"] = t + + def _scene_detect_loop(self, threshold): + """Connect to scene socket, pipe H.264 to ffmpeg, read scene frames. + + Retries on failure (e.g. ffmpeg dies from bad initial frames). + The server buffers the latest keyframe so reconnects start clean. + """ + socket_path = self.session_dir / "stream" / "scene.sock" + + # Wait for the socket to appear (server creates it on session start). + while "stop" not in self._stop_flags: + if socket_path.exists(): + break + time.sleep(0.5) + if "stop" in self._stop_flags: + return + + while "stop" not in self._stop_flags: + try: + self._run_scene_session(socket_path, threshold) + except Exception: + log.exception("Scene detector error") + if "stop" in self._stop_flags: + break + log.info("Scene detector: reconnecting in 2s...") + time.sleep(2.0) + + log.info("Scene detector stopped") + + def _run_scene_session(self, socket_path, threshold): + """Single scene detection session: connect, run ffmpeg, read frames.""" + log.info("Scene detector: connecting to %s", socket_path) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.connect(str(socket_path)) + except OSError as e: + log.error("Scene detector: connect failed: %s", e) + return + + log.info("Scene detector: connected, starting ffmpeg") + node = ff.detect_scenes_from_pipe( + scene_threshold=threshold, flush_frames=SCENE_FLUSH_FRAMES, + ) + proc = ff.run_async(node, pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + self._procs = getattr(self, "_procs", {}) + self._procs["scene_detector"] = proc + + # Thread: socket → ffmpeg stdin + def _feed_stdin(): + try: + while "stop" not in self._stop_flags: + data = sock.recv(65536) + if not data: + break + try: + proc.stdin.write(data) + proc.stdin.flush() + except (BrokenPipeError, OSError): + break + finally: + try: + proc.stdin.close() + except OSError: + pass + sock.close() + log.debug("Scene detector: stdin feeder stopped") + + stdin_t = Thread(target=_feed_stdin, daemon=True, name="scene_stdin") + stdin_t.start() + + # Thread: ffmpeg stderr → parse showinfo timestamps + ts_queue = Queue() + offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 + + def _read_stderr(): + for raw in proc.stderr: + line = raw.decode("utf-8", errors="replace").rstrip() + if not line: + continue + if "showinfo" in line: + pts_match = re.search(r"pts_time:\s*([\d.]+)", line) + if pts_match: + ts_queue.put(float(pts_match.group(1))) + elif line.startswith("[") or "error" in line.lower() or "warning" in line.lower(): + log.debug("[scene] %s", line) + log.debug("[scene] stderr closed") + + stderr_t = Thread(target=_read_stderr, daemon=True, name="scene_stderr") + stderr_t.start() + + # Main: ffmpeg stdout → extract JPEG frames + last_pts = 0.0 + buf = b"" + raw_fd = proc.stdout.fileno() + while True: + chunk = os.read(raw_fd, 65536) + if not chunk: + break + buf += chunk + while True: + soi = buf.find(b"\xff\xd8") + if soi < 0: + buf = b"" + break + eoi = buf.find(b"\xff\xd9", soi + 2) + if eoi < 0: + buf = buf[soi:] + break + jpeg_data = buf[soi:eoi + 2] + buf = buf[eoi + 2:] + + try: + pts_time = ts_queue.get(timeout=2.0) + except Empty: + log.warning("No timestamp for scene frame") + pts_time = 0.0 + + # Skip flush frames (within 100ms of previous = duplicate) + if pts_time - last_pts < 0.1: + log.debug("Skipping flush frame at pts=%.3f", pts_time) + continue + last_pts = pts_time + + global_ts = pts_time + offset + self.on_raw_frame(jpeg_data, global_ts) + + ff.stop_proc(proc, timeout=3) + log.info("Scene detector: ffmpeg exited (last_pts=%.1f)", last_pts) + + def start_audio_extractor(self, on_new_audio=None): + """Periodically extract audio from the growing fMP4 as WAV chunks.""" + self._on_new_audio = on_new_audio + self.audio_dir.mkdir(parents=True, exist_ok=True) + t = Thread(target=self._audio_loop, daemon=True, name="audio_extractor") + t.start() + self._threads["audio_extractor"] = t + + def stop(self): + self._stop_flags.add("stop") + for name, proc in getattr(self, "_procs", {}).items(): + ff.stop_proc(proc, timeout=3) + self._procs = {} if hasattr(self, "_procs") else {} + + def _has_audio_stream(self, seg: Path) -> bool: + try: + import ffmpeg as ffmpeg_lib + info = ffmpeg_lib.probe(str(seg)) + return any(s.get("codec_type") == "audio" for s in info.get("streams", [])) + except Exception: + return False + + def _find_audio_source(self): + """Find audio source: fMP4 with audio track, or standalone audio.aac from Rust server.""" + seg = self._get_recording_path() if self._get_recording_path else None + if seg and seg.exists() and self._has_audio_stream(seg): + return seg + # Rust server writes raw AAC alongside the fMP4 + stream_dir = self.session_dir / "stream" + aac_path = stream_dir / "audio.aac" + if aac_path.exists() and aac_path.stat().st_size > 100: + return aac_path + return None + + def _audio_loop(self): + processed_time = 0.0 + chunk_num = 0 + current_source = None + + while "stop" not in self._stop_flags: + time.sleep(AUDIO_EXTRACT_INTERVAL) + + source = self._find_audio_source() + if not source: + continue + + if source != current_source: + current_source = source + processed_time = 0.0 + chunk_num = 0 + log.info("Audio extractor: using %s", source.name) + + if source.stat().st_size < 100_000: + continue + + safe_duration = self._probe_safe_duration(source) + if safe_duration is None or safe_duration <= 0: + continue + + process_to = safe_duration - AUDIO_SAFETY_MARGIN + if process_to <= processed_time + 1.0: + continue + + chunk_duration = process_to - processed_time + wav_path = self.audio_dir / f"chunk_{chunk_num:04d}.wav" + + try: + ff.extract_audio_chunk(source, wav_path, + start_time=processed_time, + duration=chunk_duration) + except Exception as e: + log.error("Audio extraction failed: %s", e) + continue + + if wav_path.exists() and wav_path.stat().st_size > 100: + offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 + global_start = processed_time + offset + log.info("Audio chunk: %s (%.1fs → %.1fs, global %.1fs)", + wav_path.name, processed_time, process_to, global_start) + if self._on_new_audio: + self._on_new_audio( + wav_path, global_start, chunk_duration, + segment_path=source, local_start=processed_time, + ) + chunk_num += 1 + + processed_time = process_to + + log.info("Audio extractor stopped") + + def _probe_safe_duration(self, seg: Path): + try: + import ffmpeg as ffmpeg_lib + info = ffmpeg_lib.probe(str(seg)) + dur = float(info.get("format", {}).get("duration", 0)) + if dur > 0: + return dur + for stream in info.get("streams", []): + sdur = float(stream.get("duration", 0)) + if sdur > 0: + return sdur + except Exception: + pass + try: + return seg.stat().st_size / 65_000 + except Exception: + return None diff --git a/cht/stream/recorder.py b/cht/stream/recorder.py new file mode 100644 index 0000000..d4a2596 --- /dev/null +++ b/cht/stream/recorder.py @@ -0,0 +1,256 @@ +"""StreamRecorder: ffmpeg-based network receiver and session recorder. + +Responsible for transport + real-time scene detection: +- TCP listen (receives mpegts from sender) +- Writing fragmented MP4 to disk +- UDP relay for live display +- Scene frame detection via ffmpeg stdout pipe (low-latency, ~same-second) +- Segment rotation + +Scene detection lives here — not in SessionProcessor — because it reads from +the recorder's ffmpeg stdout pipe directly. Moving it to poll fMP4 would add +3-5s latency (disk IPC vs kernel pipe). When Rust replaces this class, scene +detection moves in-process (zero IPC, even faster). + +SessionProcessor reads from the fMP4 files this class produces for +non-latency-sensitive work (audio extraction). +""" + +import logging +import re +from pathlib import Path +from threading import Thread + +from cht.config import ( + STREAM_HOST, + STREAM_PORT, + RELAY_PORT, + SCENE_THRESHOLD, + SCENE_FLUSH_FRAMES, +) +from cht.stream import ffmpeg as ff + +log = logging.getLogger(__name__) + + +class StreamRecorder: + """Owns the ffmpeg recording process, relay, and real-time scene detection.""" + + def __init__(self, session_dir: Path, scene_threshold: float = SCENE_THRESHOLD): + self.session_dir = session_dir + self.stream_dir = session_dir / "stream" + self.scene_threshold = scene_threshold + + self._procs: dict = {} + self._segment = 0 + self._segment_offsets: dict[int, float] = {0: 0.0} + self._on_raw_frame = None # cb(jpeg_bytes: bytes, pts_time: float) + self._on_segment_complete = None + + def set_on_raw_frame(self, cb): + """Called with (jpeg_bytes, pts_time) for each scene-change frame.""" + self._on_raw_frame = cb + + def set_on_segment_complete(self, cb): + self._on_segment_complete = cb + + # -- Lifecycle -- + + def start(self): + existing = self.recording_segments + self._segment = len(existing) + self._rebuild_offsets() + self._launch() + + def stop(self): + for proc in self._procs.values(): + ff.stop_proc(proc) + self._procs.clear() + + def restart(self): + """Rotate to a new segment and relaunch.""" + old = self._procs.pop("recorder", None) + if old: + ff.stop_proc(old) + completed_path = self.recording_path + self._advance_segment_offset(completed_path) + self._segment += 1 + log.info("Restarting recorder → segment %d (offset %.1fs)", + self._segment, self.current_global_offset) + if self._on_segment_complete: + self._on_segment_complete(completed_path) + self._launch() + + def alive(self) -> bool: + proc = self._procs.get("recorder") + return proc is not None and proc.poll() is None + + def update_scene_threshold(self, new_threshold: float): + """Update threshold and restart recorder (restarts ffmpeg filter).""" + self.scene_threshold = new_threshold + log.info("Scene threshold → %.2f, restarting recorder", new_threshold) + self.restart() + + # -- Properties -- + + @property + def stream_url(self) -> str: + return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" + + @property + def relay_url(self) -> str: + return f"udp://127.0.0.1:{RELAY_PORT}" + + @property + def recording_path(self) -> Path: + return self.stream_dir / f"recording_{self._segment:03d}.mp4" + + @property + def recording_segments(self) -> list[Path]: + return sorted(self.stream_dir.glob("recording_*.mp4")) + + @property + def current_global_offset(self) -> float: + return self._segment_offsets.get(self._segment, 0.0) + + # -- Internal -- + + def _launch(self): + node = ff.receive_record_relay_and_detect( + self.stream_url, self.recording_path, self.relay_url, + scene_threshold=self.scene_threshold, + flush_frames=SCENE_FLUSH_FRAMES, + ) + proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True) + self._procs["recorder"] = proc + log.info("Recorder+scene: pid=%s → %s (threshold=%.2f)", + proc.pid, self.recording_path, self.scene_threshold) + self._start_scene_readers(proc) + + def _rebuild_offsets(self): + from cht.session import probe_duration + offset = 0.0 + self._segment_offsets = {} + for i, seg in enumerate(self.recording_segments): + self._segment_offsets[i] = offset + offset += probe_duration(seg) + + def _advance_segment_offset(self, completed_path: Path): + from cht.session import probe_duration + dur = probe_duration(completed_path) + prev = self._segment_offsets.get(self._segment, 0.0) + self._segment_offsets[self._segment + 1] = prev + dur + log.info("Segment %d completed (%.1fs), next offset: %.1fs", + self._segment, dur, prev + dur) + + def _probe_safe_duration(self): + try: + import ffmpeg as ffmpeg_lib + info = ffmpeg_lib.probe(str(self.recording_path)) + dur = float(info.get("format", {}).get("duration", 0)) + if dur > 0: + return dur + for stream in info.get("streams", []): + sdur = float(stream.get("duration", 0)) + if sdur > 0: + return sdur + except Exception: + pass + try: + return self.recording_path.stat().st_size / 65_000 + except Exception: + return None + + def capture_now(self, on_raw_frame=None): + """Extract a single frame from the current recording position. + + Calls on_raw_frame(jpeg_bytes, pts_time) — SessionProcessor handles + file writing and index updates. + """ + def _capture(): + safe_duration = self._probe_safe_duration() + if not safe_duration or safe_duration < 1: + log.warning("capture_now: recording too short") + return + local_timestamp = safe_duration - 1 + import tempfile, os + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: + tmp_path = Path(tmp.name) + try: + ff.extract_frame_at(self.recording_path, tmp_path, local_timestamp) + if not tmp_path.exists(): + log.warning("capture_now: frame not written") + return + jpeg_bytes = tmp_path.read_bytes() + except Exception as e: + log.error("capture_now failed: %s", e) + return + finally: + try: + os.unlink(tmp_path) + except Exception: + pass + if on_raw_frame: + on_raw_frame(jpeg_bytes, local_timestamp + self.current_global_offset) + + Thread(target=_capture, daemon=True, name="capture_now").start() + + def _start_scene_readers(self, proc): + from queue import Queue, Empty + import os + ts_queue = Queue() + + def _read_stderr(): + for raw in proc.stderr: + line = raw.decode("utf-8", errors="replace").rstrip() + if not line: + continue + if "showinfo" not in line: + log.debug("[recorder] %s", line) + continue + m = re.search(r"pts_time:\s*([\d.]+)", line) + if m: + ts_queue.put(float(m.group(1))) + log.info("[recorder] stderr closed, exit=%s", proc.poll()) + + def _read_stdout(): + offset = self.current_global_offset + last_pts = -1.0 + buf = b"" + raw_fd = proc.stdout.fileno() + while True: + chunk = os.read(raw_fd, 65536) + if not chunk: + break + buf += chunk + while True: + soi = buf.find(b"\xff\xd8") + if soi < 0: + buf = b"" + break + eoi = buf.find(b"\xff\xd9", soi + 2) + if eoi < 0: + buf = buf[soi:] + break + jpeg_data = buf[soi:eoi + 2] + buf = buf[eoi + 2:] + + try: + pts_time = ts_queue.get(timeout=2.0) + except Empty: + log.warning("No timestamp for scene frame, using 0") + pts_time = 0.0 + + if pts_time - last_pts < 0.1: + log.debug("Skipping flush frame at pts=%.3f", pts_time) + continue + last_pts = pts_time + + global_ts = pts_time + offset + log.debug("Raw scene frame at pts=%.3f (global=%.3f)", pts_time, global_ts) + if self._on_raw_frame: + self._on_raw_frame(jpeg_data, global_ts) + log.info("[recorder] stdout closed") + + Thread(target=_read_stderr, daemon=True, name="recorder_stderr").start() + Thread(target=_read_stdout, daemon=True, name="recorder_stdout").start() diff --git a/cht/window.py b/cht/window.py index 4bf5508..0282f02 100644 --- a/cht/window.py +++ b/cht/window.py @@ -282,7 +282,13 @@ class ChtWindow(Adw.ApplicationWindow): self._connect_btn.remove_css_class("suggested-action") self._connect_btn.add_css_class("destructive-action") - mgr = self._lifecycle.start(session_id=session_id) + mgr = self._lifecycle.start(session_id=session_id, rust_transport=True) + if mgr is None: + log.error("Failed to start stream — no cht-server session found") + self._connect_btn.set_label("Connect") + self._connect_btn.remove_css_class("destructive-action") + self._connect_btn.add_css_class("suggested-action") + return self._telemetry = Telemetry(mgr.session_dir) mgr.telemetry = self._telemetry diff --git a/media/Cargo.lock b/media/Cargo.lock index 09e979e..21c1b01 100644 --- a/media/Cargo.lock +++ b/media/Cargo.lock @@ -85,6 +85,7 @@ dependencies = [ "anyhow", "cht-common", "ffmpeg-next", + "libc", "nix", "tokio", "tracing", diff --git a/media/client/Cargo.toml b/media/client/Cargo.toml index efbbe53..d03c185 100644 --- a/media/client/Cargo.toml +++ b/media/client/Cargo.toml @@ -11,3 +11,4 @@ tracing-subscriber = { workspace = true } anyhow = { workspace = true } ffmpeg = { package = "ffmpeg-next", version = "8" } nix = { version = "0.29", features = ["signal", "process"] } +libc = "0.2" diff --git a/media/client/src/backends/subprocess.rs b/media/client/src/backends/subprocess.rs index 835cc7a..ff839f4 100644 --- a/media/client/src/backends/subprocess.rs +++ b/media/client/src/backends/subprocess.rs @@ -1,16 +1,13 @@ //! Subprocess backend: spawn ffmpeg CLI for capture+encode. //! -//! Spawns ffmpeg with the same hardware pipeline as `stream_av.py`: +//! Spawns ffmpeg with the same hardware pipeline as `stream_av.sh`: //! kmsgrab → hwmap=derive_device=vaapi → scale_vaapi → h264_vaapi +//! + PulseAudio desktop audio + mic → amix → AAC //! //! ffmpeg outputs NUT format to stdout. We demux that pipe with ffmpeg-next //! to get proper AVPackets (keyframe flags, timestamps) without parsing //! bytestreams. NUT is lighter than mpegts — no TS overhead, exact packet //! metadata in the container layer. -//! -//! This approach works where the direct VAAPI API path fails: hwmap uses -//! fftools' internal AVFilterGraph.hw_device_ctx (removed from public API -//! in ffmpeg 7+), so X2RGB10LE format negotiation succeeds. use std::os::fd::AsRawFd; use std::os::unix::io::RawFd; @@ -21,7 +18,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; use tracing::{error, info, warn}; -use crate::encoder::EncodedPacket; +use crate::encoder::{EncodedPacket, MediaType}; pub struct SubprocessConfig { pub device: String, @@ -63,8 +60,6 @@ pub fn run( .expect("spawn stderr thread"); // Get the raw fd from stdout before handing it to ffmpeg-next. - // ffmpeg-next takes ownership of the input context but we keep the Child - // alive so the fd stays valid. let stdout = child.stdout.take().expect("stdout piped"); let fd: RawFd = stdout.as_raw_fd(); @@ -79,32 +74,141 @@ pub fn run( result } +/// Detect PulseAudio audio sources for capture. +struct AudioSources { + monitor: Option, // desktop audio (speaker tap) + mic: Option, // microphone + pulse_server: String, // PULSE_SERVER env for root +} + +fn detect_audio_sources() -> AudioSources { + // When running as root (sudo for kmsgrab), we need the real user's PulseAudio + let real_uid = std::env::var("SUDO_UID") + .unwrap_or_else(|_| unsafe { libc::getuid() }.to_string()); + let pulse_server = format!("unix:/run/user/{real_uid}/pulse/native"); + + let monitor = detect_monitor_source(&pulse_server); + let mic = detect_default_source(&pulse_server); + + // Don't use mic if it's the same as monitor (some systems set monitor as default) + let mic = match (&monitor, &mic) { + (Some(m), Some(d)) if m == d => None, + _ => mic, + }; + + info!("Audio sources — monitor: {:?}, mic: {:?}", monitor, mic); + AudioSources { monitor, mic, pulse_server } +} + +fn detect_monitor_source(pulse_server: &str) -> Option { + let output = Command::new("pactl") + .arg("info") + .env("PULSE_SERVER", pulse_server) + .output() + .ok()?; + let stdout = String::from_utf8_lossy(&output.stdout); + for line in stdout.lines() { + if line.contains("Default Sink:") { + let sink = line.split(':').nth(1)?.trim(); + return Some(format!("{sink}.monitor")); + } + } + None +} + +fn detect_default_source(pulse_server: &str) -> Option { + let output = Command::new("pactl") + .args(["get-default-source"]) + .env("PULSE_SERVER", pulse_server) + .output() + .ok()?; + let source = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if source.is_empty() { None } else { Some(source) } +} + fn spawn_ffmpeg(cfg: &SubprocessConfig) -> Result { + let audio = detect_audio_sources(); + let filter = format!( "hwmap=derive_device=vaapi,scale_vaapi=w={}:h={}:format=nv12,fps={}", cfg.width, cfg.height, cfg.fps, ); + let mut args: Vec = vec![ + // Hardware init + "-init_hw_device".into(), format!("drm=drm:{}", cfg.device), + "-init_hw_device".into(), "vaapi=va@drm".into(), + // Video input (kmsgrab) + "-thread_queue_size".into(), "64".into(), + "-device".into(), cfg.device.clone(), + "-f".into(), "kmsgrab".into(), + "-framerate".into(), cfg.fps.to_string(), + "-i".into(), "-".into(), + ]; + + // Audio inputs + let has_monitor = audio.monitor.is_some(); + let has_mic = audio.mic.is_some(); + + if let Some(ref monitor) = audio.monitor { + args.extend([ + "-f".into(), "pulse".into(), + "-thread_queue_size".into(), "1024".into(), + "-i".into(), monitor.clone(), + ]); + } + if let Some(ref mic) = audio.mic { + args.extend([ + "-f".into(), "pulse".into(), + "-thread_queue_size".into(), "1024".into(), + "-i".into(), mic.clone(), + ]); + } + + // Audio filter: mix monitor + mic if both present + if has_monitor && has_mic { + args.extend([ + "-filter_complex".into(), + "[1:a][2:a]amix=inputs=2:duration=longest[aout]".into(), + "-map".into(), "0:v".into(), + "-map".into(), "[aout]".into(), + ]); + } else if has_monitor { + args.extend(["-map".into(), "0:v".into(), "-map".into(), "1:a".into()]); + } + // If no audio: no -map needed, only video output + + // Video encoding + args.extend([ + "-vf".into(), filter, + "-c:v".into(), "h264_vaapi".into(), + "-qp".into(), cfg.qp.to_string(), + "-g".into(), cfg.gop_size.to_string(), + "-bf".into(), "0".into(), + ]); + + // Audio encoding (if any audio source) + if has_monitor || has_mic { + args.extend([ + "-c:a".into(), "aac".into(), + "-b:a".into(), "128k".into(), + ]); + } + + // Output + args.extend([ + "-flush_packets".into(), "1".into(), + "-fflags".into(), "nobuffer".into(), + "-f".into(), "nut".into(), + "pipe:1".into(), + "-hide_banner".into(), + ]); + + info!("ffmpeg args: {:?}", args); + let child = Command::new("ffmpeg") - .args([ - "-init_hw_device", &format!("drm=drm:{}", cfg.device), - "-init_hw_device", "vaapi=va@drm", - "-thread_queue_size", "64", - "-device", &cfg.device, - "-f", "kmsgrab", - "-framerate", &cfg.fps.to_string(), - "-i", "-", - "-vf", &filter, - "-c:v", "h264_vaapi", - "-qp", &cfg.qp.to_string(), - "-g", &cfg.gop_size.to_string(), - "-bf", "0", - "-flush_packets", "1", - "-fflags", "nobuffer", - "-f", "nut", - "pipe:1", - "-hide_banner", - ]) + .args(&args) + .env("PULSE_SERVER", &audio.pulse_server) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -127,22 +231,34 @@ fn demux_and_send( let mut input_ctx = ffmpeg::format::input(&pipe_url) .context("open ffmpeg input from pipe")?; + // Find video stream let video_stream = input_ctx .streams() .best(ffmpeg::media::Type::Video) .context("no video stream in NUT output")?; + let video_idx = video_stream.index(); + let video_tb = video_stream.time_base(); + let video_tb_num = video_tb.numerator() as u32; + let video_tb_den = video_tb.denominator() as u32; - let stream_idx = video_stream.index(); - let time_base = video_stream.time_base(); - let tb_num = time_base.numerator() as u32; - let tb_den = time_base.denominator() as u32; + // Find audio stream (may not exist if no PulseAudio sources found) + let audio_info = input_ctx + .streams() + .best(ffmpeg::media::Type::Audio) + .map(|s| { + let tb = s.time_base(); + (s.index(), tb.numerator() as u32, tb.denominator() as u32) + }); - info!( - "Subprocess demux ready: stream_idx={}, time_base={}/{}", - stream_idx, tb_num, tb_den - ); + if let Some((idx, num, den)) = audio_info { + info!("Demux: video_idx={video_idx} tb={video_tb_num}/{video_tb_den}, \ + audio_idx={idx} tb={num}/{den}"); + } else { + info!("Demux: video_idx={video_idx} tb={video_tb_num}/{video_tb_den}, no audio"); + } - let mut packet_count = 0u64; + let mut video_count = 0u64; + let mut audio_count = 0u64; for (stream, packet) in input_ctx.packets() { if stop.load(Ordering::Relaxed) { @@ -155,36 +271,52 @@ fn demux_and_send( break; } - if stream.index() != stream_idx { - continue; - } - let data = match packet.data() { Some(d) => d.to_vec(), None => continue, }; - let encoded = EncodedPacket { - data, - pts: packet.pts().unwrap_or(0), - dts: packet.dts().unwrap_or(0), - keyframe: packet.is_key(), - time_base_num: tb_num, - time_base_den: tb_den, - }; + let stream_idx = stream.index(); - packet_count += 1; - if packet_count % 300 == 1 { - info!("Subprocess: {packet_count} packets encoded"); - } - - if packet_tx.blocking_send(encoded).is_err() { - info!("Packet channel closed, stopping subprocess pipeline"); - break; + if stream_idx == video_idx { + let encoded = EncodedPacket { + media_type: MediaType::Video, + data, + pts: packet.pts().unwrap_or(0), + dts: packet.dts().unwrap_or(0), + keyframe: packet.is_key(), + time_base_num: video_tb_num, + time_base_den: video_tb_den, + }; + video_count += 1; + if video_count % 300 == 1 { + info!("Subprocess: {video_count} video, {audio_count} audio packets"); + } + if packet_tx.blocking_send(encoded).is_err() { + info!("Packet channel closed"); + break; + } + } else if let Some((audio_idx, audio_tb_num, audio_tb_den)) = audio_info { + if stream_idx == audio_idx { + let encoded = EncodedPacket { + media_type: MediaType::Audio, + data, + pts: packet.pts().unwrap_or(0), + dts: packet.dts().unwrap_or(0), + keyframe: packet.is_key(), + time_base_num: audio_tb_num, + time_base_den: audio_tb_den, + }; + audio_count += 1; + if packet_tx.blocking_send(encoded).is_err() { + info!("Packet channel closed"); + break; + } + } } } - info!("Subprocess pipeline stopped ({packet_count} packets)"); + info!("Subprocess pipeline stopped ({video_count} video, {audio_count} audio packets)"); Ok(()) } @@ -224,8 +356,15 @@ fn kill_child(child: &mut Child) { child.kill().ok(); } - match child.wait() { - Ok(s) => info!("ffmpeg exited: {s}"), - Err(e) => warn!("ffmpeg wait error: {e}"), + // Wait up to 3 seconds, then SIGKILL. + for _ in 0..30 { + if child.try_wait().ok().flatten().is_some() { + info!("ffmpeg exited cleanly"); + return; + } + std::thread::sleep(std::time::Duration::from_millis(100)); } + warn!("ffmpeg didn't exit after SIGINT, killing"); + child.kill().ok(); + let _ = child.wait(); } diff --git a/media/client/src/encoder.rs b/media/client/src/encoder.rs index 3cc0949..0ce6eda 100644 --- a/media/client/src/encoder.rs +++ b/media/client/src/encoder.rs @@ -310,6 +310,7 @@ impl EncoderInner { let mut encoded = ffmpeg::Packet::empty(); while self.encoder.receive_packet(&mut encoded).is_ok() { packets.push(EncodedPacket { + media_type: MediaType::Video, data: encoded.data().unwrap_or(&[]).to_vec(), pts: encoded.pts().unwrap_or(0), dts: encoded.dts().unwrap_or(0), @@ -327,8 +328,16 @@ impl EncoderInner { } } -/// An encoded video packet ready for transport. +/// Type of media stream in an encoded packet. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MediaType { + Video, + Audio, +} + +/// An encoded media packet ready for transport. pub struct EncodedPacket { + pub media_type: MediaType, pub data: Vec, pub pts: i64, pub dts: i64, diff --git a/media/client/src/main.rs b/media/client/src/main.rs index 18dd855..8033dd8 100644 --- a/media/client/src/main.rs +++ b/media/client/src/main.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use anyhow::Result; use cht_common::protocol::{ self, AudioParams, ControlMessage, PacketHeader, PacketType, VideoParams, WirePacket, @@ -5,14 +7,14 @@ use cht_common::protocol::{ }; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::net::TcpStream; -use tracing::info; +use tracing::{info, warn}; use cht_client::backends::Backend; use cht_client::capture::CaptureConfig; -use cht_client::encoder::EncoderConfig; +use cht_client::encoder::{EncoderConfig, MediaType}; use cht_client::pipeline::Pipeline; -const DEFAULT_SERVER: &str = "mcrndeb:4444"; +const DEFAULT_SERVER: &str = "mcrndeb:4447"; #[tokio::main] async fn main() -> Result<()> { @@ -35,9 +37,8 @@ async fn main() -> Result<()> { Backend::Subprocess }; - info!("Connecting to {server_addr}..."); - let stream = TcpStream::connect(&server_addr).await?; - info!("Connected"); + // Wait for the server to become available. + let stream = wait_for_server(&server_addr).await?; let mut writer = BufWriter::new(stream); @@ -69,6 +70,7 @@ async fn main() -> Result<()> { // Forward encoded packets to the server let mut video_count = 0u64; + let mut audio_count = 0u64; let mut keepalive_interval = tokio::time::interval(std::time::Duration::from_secs(5)); loop { @@ -76,9 +78,13 @@ async fn main() -> Result<()> { pkt = packet_rx.recv() => { match pkt { Some(encoded) => { + let pkt_type = match encoded.media_type { + MediaType::Video => PacketType::Video, + MediaType::Audio => PacketType::Audio, + }; let wire = WirePacket { header: PacketHeader { - packet_type: PacketType::Video, + packet_type: pkt_type, flags: if encoded.keyframe { FLAG_KEYFRAME } else { 0 }, length: encoded.data.len() as u32, timestamp_ns: pts_to_ns( @@ -90,11 +96,18 @@ async fn main() -> Result<()> { payload: encoded.data, }; protocol::write_packet(&mut writer, &wire).await?; - video_count += 1; - if video_count % 300 == 1 { - info!("Sent {video_count} video packets"); - writer.flush().await?; + match encoded.media_type { + MediaType::Video => { + video_count += 1; + if video_count % 300 == 1 { + info!("Sent {video_count} video, {audio_count} audio packets"); + writer.flush().await?; + } + } + MediaType::Audio => { + audio_count += 1; + } } } None => { @@ -115,17 +128,56 @@ async fn main() -> Result<()> { } } - pipeline.stop(); + // Stop pipeline first (signals ffmpeg, joins thread). + // Give it a few seconds — if ffmpeg hangs, don't block forever. + info!("Stopping pipeline..."); + let stop_handle = tokio::task::spawn_blocking(move || { + pipeline.stop(); + }); + let _ = tokio::time::timeout(Duration::from_secs(5), stop_handle).await; - let stop = ControlMessage::SessionStop; - protocol::write_packet(&mut writer, &stop.to_wire_packet()?).await?; - writer.flush().await?; - writer.shutdown().await?; - info!("Sent session_stop, {video_count} video packets total"); + // Try to send SessionStop so the server closes cleanly. + let stop_msg = ControlMessage::SessionStop; + match tokio::time::timeout( + Duration::from_secs(2), + async { + protocol::write_packet(&mut writer, &stop_msg.to_wire_packet()?).await?; + writer.flush().await?; + writer.shutdown().await?; + Ok::<_, anyhow::Error>(()) + } + ).await { + Ok(Ok(())) => {} + Ok(Err(e)) => warn!("Error sending session_stop: {e}"), + Err(_) => warn!("Timeout sending session_stop"), + } + info!("Done — {video_count} video + {audio_count} audio packets"); Ok(()) } +async fn wait_for_server(addr: &str) -> Result { + info!("Waiting for server at {addr}..."); + let mut interval = tokio::time::interval(Duration::from_secs(2)); + loop { + tokio::select! { + _ = interval.tick() => {} + _ = tokio::signal::ctrl_c() => { + anyhow::bail!("interrupted while waiting for server"); + } + } + match TcpStream::connect(addr).await { + Ok(stream) => { + info!("Connected to {addr}"); + return Ok(stream); + } + Err(e) => { + info!("Server not ready ({e}), retrying..."); + } + } + } +} + fn pts_to_ns(pts: i64, tb_num: u32, tb_den: u32) -> u64 { if tb_den == 0 { return 0; @@ -134,10 +186,23 @@ fn pts_to_ns(pts: i64, tb_num: u32, tb_den: u32) -> u64 { } fn session_id() -> String { + // Match Python's time.strftime("%Y%m%d_%H%M%S") format use std::time::{SystemTime, UNIX_EPOCH}; let secs = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_secs(); - format!("{secs}") + .as_secs() as libc::time_t; + let mut tm: libc::tm = unsafe { std::mem::zeroed() }; + unsafe { libc::localtime_r(&secs, &mut tm) }; + let mut buf = [0u8; 20]; + let fmt = b"%Y%m%d_%H%M%S\0"; + let len = unsafe { + libc::strftime( + buf.as_mut_ptr() as *mut libc::c_char, + buf.len(), + fmt.as_ptr() as *const libc::c_char, + &tm, + ) + }; + String::from_utf8_lossy(&buf[..len]).to_string() } diff --git a/media/server/src/main.rs b/media/server/src/main.rs index 11fdbea..56fce18 100644 --- a/media/server/src/main.rs +++ b/media/server/src/main.rs @@ -1,24 +1,40 @@ +mod session; + +use std::path::PathBuf; + use anyhow::Result; use cht_common::protocol::{self, ControlMessage, PacketType}; +use session::Session; use tokio::io::BufReader; use tokio::net::TcpListener; -use tracing::{error, info}; +use tracing::{error, info, warn}; -const LISTEN_ADDR: &str = "0.0.0.0:4444"; +const LISTEN_ADDR: &str = "0.0.0.0:4447"; +const DEFAULT_SESSIONS_DIR: &str = "/home/mariano/wdir/cht/data/sessions"; + +fn sessions_dir() -> PathBuf { + std::env::var("CHT_SESSIONS_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(DEFAULT_SESSIONS_DIR)) +} #[tokio::main] async fn main() -> Result<()> { cht_common::logging::init("server"); + let sessions_dir = sessions_dir(); + info!("Sessions dir: {}", sessions_dir.display()); + let listener = TcpListener::bind(LISTEN_ADDR).await?; info!("Server listening on {LISTEN_ADDR}"); loop { let (stream, addr) = listener.accept().await?; info!("Client connected from {addr}"); + let sdir = sessions_dir.clone(); tokio::spawn(async move { - if let Err(e) = handle_client(stream).await { + if let Err(e) = handle_client(stream, sdir).await { error!("Client {addr} error: {e:#}"); } info!("Client {addr} disconnected"); @@ -26,17 +42,19 @@ async fn main() -> Result<()> { } } -async fn handle_client(stream: tokio::net::TcpStream) -> Result<()> { +async fn handle_client( + stream: tokio::net::TcpStream, + sessions_dir: PathBuf, +) -> Result<()> { let mut reader = BufReader::new(stream); - let mut video_packets = 0u64; - let mut audio_packets = 0u64; + let mut session: Option = None; + let mut video_count = 0u64; + let mut audio_count = 0u64; loop { let packet = match protocol::read_packet(&mut reader).await { Ok(p) => p, Err(e) => { - // Any read error at the header boundary is a clean disconnect - // (includes EOF from flush + shutdown) let msg = format!("{e:#}"); if msg.contains("eof") || msg.contains("Eof") || msg.contains("connection reset") @@ -50,25 +68,60 @@ async fn handle_client(stream: tokio::net::TcpStream) -> Result<()> { match packet.header.packet_type { PacketType::Video => { - video_packets += 1; - if video_packets % 300 == 1 { - info!( - "video: {video_packets} packets, ts={}ms, keyframe={}", - packet.header.timestamp_ns / 1_000_000, - packet.header.is_keyframe(), - ); + if let Some(s) = &mut session { + // Blocking write — offload to blocking thread to avoid stalling tokio. + let data = packet.payload; + let keyframe = packet.header.is_keyframe(); + tokio::task::block_in_place(|| s.write_video(&data, keyframe))?; + video_count += 1; + if video_count % 300 == 1 { + info!("video: {video_count} packets, ts={}ms, keyframe={}", + packet.header.timestamp_ns / 1_000_000, + packet.header.is_keyframe()); + } + } else { + warn!("Video packet before SessionStart — dropped"); } } PacketType::Audio => { - audio_packets += 1; + if let Some(s) = &mut session { + let data = packet.payload; + tokio::task::block_in_place(|| s.write_audio(&data))?; + audio_count += 1; + if audio_count % 500 == 1 { + info!("audio: {audio_count} packets"); + } + } } PacketType::Control => { let ctrl = ControlMessage::from_payload(&packet.payload)?; info!("control: {ctrl:?}"); + + match ctrl { + ControlMessage::SessionStart { id, video, .. } => { + let s = tokio::task::block_in_place(|| { + Session::start(&id, &sessions_dir, video.fps) + })?; + session = Some(s); + } + ControlMessage::SessionStop => { + if let Some(s) = session.take() { + tokio::task::block_in_place(|| s.close()); + } + break; + } + ControlMessage::Keepalive + | ControlMessage::Reconnect { .. } + | ControlMessage::ParamChange { .. } => {} + } } } } - info!("Session totals: {video_packets} video, {audio_packets} audio packets"); + if let Some(s) = session.take() { + tokio::task::block_in_place(|| s.close()); + } + + info!("Session totals: {video_count} video, {audio_count} audio packets"); Ok(()) } diff --git a/media/server/src/session.rs b/media/server/src/session.rs new file mode 100644 index 0000000..12b8098 --- /dev/null +++ b/media/server/src/session.rs @@ -0,0 +1,306 @@ +//! Session: manages the ffmpeg recording subprocess for one client connection. +//! +//! Receives raw H.264 NAL units and AAC audio from the transport: +//! - Video: piped into ffmpeg → fragmented MP4 + UDP relay for live display +//! - Audio: written to raw AAC file for Python post-processing +//! +//! Also provides a Unix domain socket at `stream/scene.sock` carrying a copy +//! of the raw H.264 stream for Python's GPU scene detection. The socket is +//! fire-and-forget: if nobody connects, data is silently dropped; if the +//! reader is slow, old frames are dropped rather than stalling recording. +//! +//! Creates the session directory and writes its path to `data/active-session` +//! so the Python app can pick it up for SessionProcessor (audio extraction, etc). + +use std::fs::{self, File}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::{Child, ChildStdin, Command, Stdio}; +use std::thread; + +use anyhow::{Context, Result}; +use tokio::io::AsyncWriteExt; +use tracing::{debug, info, warn}; + +// Written next to the sessions/ directory so everything stays under data/. +// Python reads this to discover the session dir created by cht-server. +const ACTIVE_SESSION_FILENAME: &str = "active-session"; +const RELAY_URL: &str = "udp://127.0.0.1:4445"; +const SCENE_SOCKET_NAME: &str = "scene.sock"; + +struct ScenePacket { + data: Vec, + keyframe: bool, +} + +pub struct Session { + #[allow(dead_code)] + session_dir: PathBuf, + active_session_file: PathBuf, + ffmpeg: Child, + video_stdin: Option, + audio_file: Option, + scene_tx: Option>, + #[allow(dead_code)] + fps: u32, +} + +impl Session { + pub fn start(session_id: &str, sessions_dir: &Path, fps: u32) -> Result { + let active_session_file = sessions_dir + .parent() + .unwrap_or(sessions_dir) + .join(ACTIVE_SESSION_FILENAME); + let session_dir = sessions_dir.join(session_id); + let stream_dir = session_dir.join("stream"); + fs::create_dir_all(&stream_dir) + .with_context(|| format!("create session dir: {}", stream_dir.display()))?; + + let recording_path = stream_dir.join("recording_000.mp4"); + let audio_path = stream_dir.join("audio.aac"); + + info!("Session {session_id}: recording → {}", recording_path.display()); + + let mut child = Command::new("ffmpeg") + .args([ + "-f", "h264", + "-framerate", &fps.to_string(), + "-i", "pipe:0", + // fMP4 — same flags as Python StreamRecorder + "-c:v", "copy", + "-f", "mp4", + "-movflags", "frag_keyframe+empty_moov+default_base_moof", + "-flush_packets", "1", + recording_path.to_str().unwrap(), + // UDP relay for live display + "-c:v", "copy", + "-f", "mpegts", + RELAY_URL, + "-hide_banner", "-loglevel", "warning", + ]) + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) + .spawn() + .context("spawn ffmpeg recorder")?; + + let video_stdin = child.stdin.take().expect("stdin piped"); + + // Drain stderr so ffmpeg never blocks on a full pipe. + let stderr = child.stderr.take().expect("stderr piped"); + let sid = session_id.to_string(); + thread::Builder::new() + .name("ffmpeg-recorder-stderr".into()) + .spawn(move || { + use std::io::{BufRead, BufReader}; + for line in BufReader::new(stderr).lines().map_while(Result::ok) { + if !line.is_empty() { + debug!("[recorder/{sid}] {line}"); + } + } + }) + .expect("spawn stderr thread"); + + // Open audio file for raw AAC frames from client + let audio_file = File::create(&audio_path) + .map(Some) + .unwrap_or_else(|e| { + warn!("Could not create audio file: {e}"); + None + }); + + // Scene relay: Unix socket for Python scene detection. + let socket_path = stream_dir.join(SCENE_SOCKET_NAME); + let (scene_tx, scene_rx) = tokio::sync::mpsc::channel(32); + tokio::spawn(scene_relay_task(socket_path, scene_rx)); + + // Tell Python which session dir to watch. + if let Err(e) = fs::write(&active_session_file, session_dir.to_str().unwrap_or("")) { + warn!("Could not write {}: {e}", active_session_file.display()); + } + + info!("Session {session_id}: ffmpeg pid={}, audio → {}", + child.id(), audio_path.display()); + + Ok(Self { + session_dir, + active_session_file, + ffmpeg: child, + video_stdin: Some(video_stdin), + audio_file, + scene_tx: Some(scene_tx), + fps, + }) + } + + pub fn write_video(&mut self, data: &[u8], keyframe: bool) -> Result<()> { + if let Some(stdin) = &mut self.video_stdin { + stdin.write_all(data).context("write H.264 to ffmpeg")?; + } + // Best-effort relay to scene detector — drop if channel full. + if let Some(tx) = &self.scene_tx { + let _ = tx.try_send(ScenePacket { data: data.to_vec(), keyframe }); + } + Ok(()) + } + + pub fn write_audio(&mut self, data: &[u8]) -> Result<()> { + if let Some(f) = &mut self.audio_file { + // Wrap raw AAC frame with ADTS header so the file is playable/parseable. + // Assumes AAC-LC, 48kHz, stereo (matches client's encoder config). + write_adts_frame(f, data)?; + } + Ok(()) + } + + #[allow(dead_code)] + pub fn session_dir(&self) -> &Path { + &self.session_dir + } + + pub fn close(mut self) { + // Drop stdin → ffmpeg gets EOF → flushes and exits cleanly. + drop(self.video_stdin.take()); + drop(self.audio_file.take()); + // Drop scene_tx → relay task sees channel closed → exits. + drop(self.scene_tx.take()); + match self.ffmpeg.wait() { + Ok(s) => info!("ffmpeg recorder exited: {s}"), + Err(e) => warn!("ffmpeg recorder wait error: {e}"), + } + // Clear the active session marker. + let _ = fs::remove_file(&self.active_session_file); + } +} + +impl Drop for Session { + fn drop(&mut self) { + if self.video_stdin.is_some() { + drop(self.video_stdin.take()); + drop(self.audio_file.take()); + drop(self.scene_tx.take()); + let _ = self.ffmpeg.kill(); + } + } +} + +// --------------------------------------------------------------------------- +// Scene relay: serves raw H.264 over a Unix domain socket +// --------------------------------------------------------------------------- + +async fn scene_relay_task( + socket_path: PathBuf, + mut rx: tokio::sync::mpsc::Receiver, +) { + // Remove stale socket from a previous session. + let _ = fs::remove_file(&socket_path); + + let listener = match tokio::net::UnixListener::bind(&socket_path) { + Ok(l) => l, + Err(e) => { + warn!("Scene relay: bind failed on {}: {e}", socket_path.display()); + return; + } + }; + info!("Scene relay: listening on {}", socket_path.display()); + + let mut client: Option = None; + // Buffer the latest keyframe so new clients start with a valid decoder state. + let mut last_keyframe: Option> = None; + + loop { + if client.is_some() { + // We have a connected reader — forward data. + match rx.recv().await { + Some(pkt) => { + if pkt.keyframe { + last_keyframe = Some(pkt.data.clone()); + } + let stream = client.as_mut().unwrap(); + if stream.write_all(&pkt.data).await.is_err() { + info!("Scene relay: client disconnected"); + client = None; + } + } + None => break, // Channel closed, session ending. + } + } else { + // No reader — accept connections while draining the channel. + tokio::select! { + biased; + result = listener.accept() => { + match result { + Ok((mut stream, _)) => { + info!("Scene relay: client connected"); + // Send the last keyframe so the decoder can initialize. + if let Some(ref kf) = last_keyframe { + if stream.write_all(kf).await.is_err() { + warn!("Scene relay: failed to send keyframe"); + continue; + } + info!("Scene relay: sent keyframe ({} bytes)", kf.len()); + } + client = Some(stream); + } + Err(e) => warn!("Scene relay: accept error: {e}"), + } + } + pkt = rx.recv() => { + match pkt { + Some(pkt) => { + if pkt.keyframe { + last_keyframe = Some(pkt.data); + } + // Discard — no reader connected. + } + None => break, // Channel closed. + } + } + } + } + } + + drop(client); + let _ = fs::remove_file(&socket_path); + info!("Scene relay: stopped"); +} + +// --------------------------------------------------------------------------- +// ADTS header for raw AAC framing +// --------------------------------------------------------------------------- + +/// Write a raw AAC frame wrapped in a 7-byte ADTS header. +/// +/// Fixed params: AAC-LC profile, 48 kHz sample rate, 2 channels (stereo). +/// These match the client's `-c:a aac -b:a 128k` default config. +fn write_adts_frame(w: &mut impl Write, aac_data: &[u8]) -> Result<()> { + // ADTS fixed header fields: + // profile: AAC-LC = 1 (stored as profile-1 = 0 in MPEG-4 ID mode) + // sample_rate: 48000 → index 3 + // channels: 2 → channel_configuration 2 + const PROFILE_MINUS1: u8 = 1; // AAC-LC + const SR_IDX: u8 = 3; // 48 kHz + const CH_CFG: u8 = 2; // stereo + + let frame_len = (aac_data.len() + 7) as u16; // total ADTS frame = header + payload + + let header: [u8; 7] = [ + // byte 0-1: syncword(12) | ID(1)=0(MPEG4) | layer(2)=0 | protection(1)=1(no CRC) + 0xFF, + 0xF1, + // byte 2: profile(2) | sr_idx(4) | private(1)=0 | ch_cfg[2](1) + (PROFILE_MINUS1 << 6) | (SR_IDX << 2) | ((CH_CFG >> 2) & 1), + // byte 3: ch_cfg[1:0](2) | orig(1)=0 | home(1)=0 | copyright_id(1)=0 | copyright_start(1)=0 | frame_len[12:11](2) + ((CH_CFG & 3) << 6) | ((frame_len >> 11) as u8 & 0x03), + // byte 4: frame_len[10:3](8) + ((frame_len >> 3) & 0xFF) as u8, + // byte 5: frame_len[2:0](3) | buffer_fullness[10:6](5) + ((frame_len & 0x07) << 5) as u8 | 0x1F, + // byte 6: buffer_fullness[5:0](6) | num_aac_frames_minus1(2)=0 + 0xFC, + ]; + + w.write_all(&header).context("ADTS header")?; + w.write_all(aac_data).context("AAC frame")?; + Ok(()) +}