"""StreamRecorder: ffmpeg-based network receiver and session recorder. Responsible for transport + real-time scene detection: - TCP listen (receives mpegts from sender) - Writing fragmented MP4 to disk - UDP relay for live display - Scene frame detection via ffmpeg stdout pipe (low-latency, ~same-second) - Segment rotation Scene detection lives here — not in SessionProcessor — because it reads from the recorder's ffmpeg stdout pipe directly. Moving it to poll fMP4 would add 3-5s latency (disk IPC vs kernel pipe). When Rust replaces this class, scene detection moves in-process (zero IPC, even faster). SessionProcessor reads from the fMP4 files this class produces for non-latency-sensitive work (audio extraction). """ import logging import re from pathlib import Path from threading import Thread from cht.config import ( STREAM_HOST, STREAM_PORT, RELAY_PORT, SCENE_THRESHOLD, SCENE_FLUSH_FRAMES, ) from cht.stream import ffmpeg as ff log = logging.getLogger(__name__) class StreamRecorder: """Owns the ffmpeg recording process, relay, and real-time scene detection.""" def __init__(self, session_dir: Path, scene_threshold: float = SCENE_THRESHOLD): self.session_dir = session_dir self.stream_dir = session_dir / "stream" self.scene_threshold = scene_threshold self._procs: dict = {} self._segment = 0 self._segment_offsets: dict[int, float] = {0: 0.0} self._on_raw_frame = None # cb(jpeg_bytes: bytes, pts_time: float) self._on_segment_complete = None def set_on_raw_frame(self, cb): """Called with (jpeg_bytes, pts_time) for each scene-change frame.""" self._on_raw_frame = cb def set_on_segment_complete(self, cb): self._on_segment_complete = cb # -- Lifecycle -- def start(self): existing = self.recording_segments self._segment = len(existing) self._rebuild_offsets() self._launch() def stop(self): for proc in self._procs.values(): ff.stop_proc(proc) self._procs.clear() def restart(self): """Rotate to a new segment and relaunch.""" old = self._procs.pop("recorder", None) if old: ff.stop_proc(old) completed_path = self.recording_path self._advance_segment_offset(completed_path) self._segment += 1 log.info("Restarting recorder → segment %d (offset %.1fs)", self._segment, self.current_global_offset) if self._on_segment_complete: self._on_segment_complete(completed_path) self._launch() def alive(self) -> bool: proc = self._procs.get("recorder") return proc is not None and proc.poll() is None def update_scene_threshold(self, new_threshold: float): """Update threshold and restart recorder (restarts ffmpeg filter).""" self.scene_threshold = new_threshold log.info("Scene threshold → %.2f, restarting recorder", new_threshold) self.restart() # -- Properties -- @property def stream_url(self) -> str: return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" @property def relay_url(self) -> str: return f"udp://127.0.0.1:{RELAY_PORT}" @property def recording_path(self) -> Path: return self.stream_dir / f"recording_{self._segment:03d}.mp4" @property def recording_segments(self) -> list[Path]: return sorted(self.stream_dir.glob("recording_*.mp4")) @property def current_global_offset(self) -> float: return self._segment_offsets.get(self._segment, 0.0) # -- Internal -- def _launch(self): node = ff.receive_record_relay_and_detect( self.stream_url, self.recording_path, self.relay_url, scene_threshold=self.scene_threshold, flush_frames=SCENE_FLUSH_FRAMES, ) proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True) self._procs["recorder"] = proc log.info("Recorder+scene: pid=%s → %s (threshold=%.2f)", proc.pid, self.recording_path, self.scene_threshold) self._start_scene_readers(proc) def _rebuild_offsets(self): from cht.session import probe_duration offset = 0.0 self._segment_offsets = {} for i, seg in enumerate(self.recording_segments): self._segment_offsets[i] = offset offset += probe_duration(seg) def _advance_segment_offset(self, completed_path: Path): from cht.session import probe_duration dur = probe_duration(completed_path) prev = self._segment_offsets.get(self._segment, 0.0) self._segment_offsets[self._segment + 1] = prev + dur log.info("Segment %d completed (%.1fs), next offset: %.1fs", self._segment, dur, prev + dur) def _probe_safe_duration(self): try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(self.recording_path)) dur = float(info.get("format", {}).get("duration", 0)) if dur > 0: return dur for stream in info.get("streams", []): sdur = float(stream.get("duration", 0)) if sdur > 0: return sdur except Exception: pass try: return self.recording_path.stat().st_size / 65_000 except Exception: return None def capture_now(self, on_raw_frame=None): """Extract a single frame from the current recording position. Calls on_raw_frame(jpeg_bytes, pts_time) — SessionProcessor handles file writing and index updates. """ def _capture(): safe_duration = self._probe_safe_duration() if not safe_duration or safe_duration < 1: log.warning("capture_now: recording too short") return local_timestamp = safe_duration - 1 import tempfile, os with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp_path = Path(tmp.name) try: ff.extract_frame_at(self.recording_path, tmp_path, local_timestamp) if not tmp_path.exists(): log.warning("capture_now: frame not written") return jpeg_bytes = tmp_path.read_bytes() except Exception as e: log.error("capture_now failed: %s", e) return finally: try: os.unlink(tmp_path) except Exception: pass if on_raw_frame: on_raw_frame(jpeg_bytes, local_timestamp + self.current_global_offset) Thread(target=_capture, daemon=True, name="capture_now").start() def _start_scene_readers(self, proc): from queue import Queue, Empty import os ts_queue = Queue() def _read_stderr(): for raw in proc.stderr: line = raw.decode("utf-8", errors="replace").rstrip() if not line: continue if "showinfo" not in line: log.debug("[recorder] %s", line) continue m = re.search(r"pts_time:\s*([\d.]+)", line) if m: ts_queue.put(float(m.group(1))) log.info("[recorder] stderr closed, exit=%s", proc.poll()) def _read_stdout(): offset = self.current_global_offset last_pts = -1.0 buf = b"" raw_fd = proc.stdout.fileno() while True: chunk = os.read(raw_fd, 65536) if not chunk: break buf += chunk while True: soi = buf.find(b"\xff\xd8") if soi < 0: buf = b"" break eoi = buf.find(b"\xff\xd9", soi + 2) if eoi < 0: buf = buf[soi:] break jpeg_data = buf[soi:eoi + 2] buf = buf[eoi + 2:] try: pts_time = ts_queue.get(timeout=2.0) except Empty: log.warning("No timestamp for scene frame, using 0") pts_time = 0.0 if pts_time - last_pts < 0.1: log.debug("Skipping flush frame at pts=%.3f", pts_time) continue last_pts = pts_time global_ts = pts_time + offset log.debug("Raw scene frame at pts=%.3f (global=%.3f)", pts_time, global_ts) if self._on_raw_frame: self._on_raw_frame(jpeg_data, global_ts) log.info("[recorder] stdout closed") Thread(target=_read_stderr, daemon=True, name="recorder_stderr").start() Thread(target=_read_stdout, daemon=True, name="recorder_stdout").start()