diff --git a/cht/config.py b/cht/config.py index 52681be..53f9b57 100644 --- a/cht/config.py +++ b/cht/config.py @@ -13,7 +13,6 @@ SESSIONS_DIR = Path(os.environ.get("CHT_SESSIONS_DIR", DATA_DIR / "sessions")) STREAM_HOST = "0.0.0.0" STREAM_PORT = 4444 RELAY_PORT = 4445 # UDP loopback relay for live display -SCENE_RELAY_PORT = 4446 # UDP loopback relay for scene detector # Frame extraction — scene-only, no interval fallback SCENE_THRESHOLD = 0.10 # 0-1, lower = more sensitive; 0.1 catches slide/window changes diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index 7cee3f7..eed00da 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -39,7 +39,7 @@ def receive_and_record(stream_url, output_path): ) -def receive_record_and_relay(stream_url, output_path, relay_url, scene_relay_url=None): +def receive_record_and_relay(stream_url, output_path, relay_url): """Receive TCP stream, write to fragmented MP4, and relay to UDP loopback. Fragmented MP4 (frag_keyframe+empty_moov) avoids MKV tail corruption: @@ -47,7 +47,6 @@ def receive_record_and_relay(stream_url, output_path, relay_url, scene_relay_url always valid up to the last complete fragment (~1 keyframe interval ≈ 2s). Uses ffmpeg tee via merge_outputs: one process, identical timestamps. - Optionally sends a second relay for the scene detector. """ stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") file_out = ffmpeg.output( @@ -61,50 +60,49 @@ def receive_record_and_relay(stream_url, output_path, relay_url, scene_relay_url stream, relay_url, c="copy", f="mpegts", ) - outputs = [file_out, relay_out] - if scene_relay_url: - scene_out = ffmpeg.output( - stream, scene_relay_url, - c="copy", f="mpegts", - ) - outputs.append(scene_out) - return ffmpeg.merge_outputs(*outputs).global_args(*QUIET_ARGS) + return ffmpeg.merge_outputs(file_out, relay_out).global_args(*QUIET_ARGS) -def start_live_scene_detector(stream_url, output_dir, scene_threshold=0.10, - start_number=1): - """Start a persistent ffmpeg process that detects scenes from a live stream. +def receive_record_relay_and_detect(stream_url, output_path, relay_url, + frames_dir, scene_threshold=0.10, + start_number=1): + """Single process: receive TCP → record fMP4 + relay UDP + scene detect. - Reads from the UDP relay in real-time — no file seeking, no restart overhead. - Writes frame JPEGs and emits showinfo on stderr as scenes are detected. - Returns the async process (stderr must be read continuously). + One ffmpeg process, three output branches from the same TCP input: + 1. File output — c=copy to fMP4 + 2. UDP relay — c=copy to mpegts for live display + 3. Scene frames — CUDA decode → select(scene) → showinfo → JPEG files + + The scene filter runs on decoded frames in-process, so detection latency + is near-zero (no polling, no file re-reading, no separate process). + Stderr must be read continuously to parse showinfo lines. """ + stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay", + hwaccel="cuda") + + # Copy outputs (raw packet remux, no decode) + file_out = ffmpeg.output( + stream, str(output_path), + c="copy", f="mp4", + movflags="frag_keyframe+empty_moov+default_base_moof", + flush_packets=1, + **{"bsf:a": "aac_adtstoasc"}, + ) + relay_out = ffmpeg.output( + stream, relay_url, + c="copy", f="mpegts", + ) + + # Scene detection output (decode + filter → JPEG) select_expr = f"gt(scene,{scene_threshold})" - - stream = ffmpeg.input( - stream_url, - fflags="nobuffer+flush_packets", - flags="low_delay", - probesize="32000", - analyzeduration="0", - hwaccel="cuda", - ) - stream = stream.filter("select", select_expr).filter("showinfo") - - output = ( - ffmpeg.output( - stream, - str(output_dir / "F%04d.jpg"), - vsync="vfr", - flush_packets=1, - **{"q:v": "2"}, - start_number=start_number, - ) - .global_args(*GLOBAL_ARGS) + scene_stream = stream.filter("select", select_expr).filter("showinfo") + scene_out = ffmpeg.output( + scene_stream, str(frames_dir / "F%04d.jpg"), + vsync="vfr", flush_packets=1, **{"q:v": "2"}, + start_number=start_number, ) - log.info("start_live_scene_detector: %s", " ".join(output.compile())) - return run_async(output, pipe_stderr=True) + return ffmpeg.merge_outputs(file_out, relay_out, scene_out).global_args(*GLOBAL_ARGS) def extract_scene_frames(input_path, output_dir, scene_threshold=0.10, diff --git a/cht/stream/manager.py b/cht/stream/manager.py index b46353a..8d3472c 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -2,23 +2,22 @@ StreamManager: orchestrates ffmpeg for recording and scene detection. Architecture: - sender → TCP:4444 → ffmpeg (writes recording.ts) - recording.ts → mpv (plays via Timeline) - recording.ts → ffmpeg scene detection (periodic, incremental) + sender → TCP:4444 → single ffmpeg process: + 1. writes fMP4 to disk (c=copy) + 2. relays UDP for live display (c=copy) + 3. CUDA decode → scene filter → JPEG frames (real-time) """ import json import logging import re import time -from pathlib import Path from threading import Thread from cht.config import ( STREAM_HOST, STREAM_PORT, RELAY_PORT, - SCENE_RELAY_PORT, SCENE_THRESHOLD, SESSIONS_DIR, AUDIO_EXTRACT_INTERVAL, @@ -156,10 +155,6 @@ class StreamManager: def relay_url(self): return f"udp://127.0.0.1:{RELAY_PORT}" - @property - def scene_relay_url(self): - return f"udp://127.0.0.1:{SCENE_RELAY_PORT}" - @property def recording_path(self): """Current recording segment path.""" @@ -198,108 +193,33 @@ class StreamManager: return proc is not None and proc.poll() is None def _launch_recorder(self): - node = ff.receive_record_and_relay( + start_number = self._next_frame_number() + node = ff.receive_record_relay_and_detect( self.stream_url, self.recording_path, self.relay_url, + self.frames_dir, scene_threshold=self.scene_threshold, + start_number=start_number, ) proc = ff.run_async(node, pipe_stderr=True) self._procs["recorder"] = proc - log.info("Recorder: pid=%s → %s", proc.pid, self.recording_path) - self._start_stderr_reader("recorder", proc) + log.info("Recorder+scene: pid=%s → %s (threshold=%.2f, start_number=%d)", + proc.pid, self.recording_path, self.scene_threshold, start_number) + self._start_scene_stderr_reader(proc, start_number) # -- Scene Detection -- def start_scene_detector(self, on_new_frames=None): - """Periodically run scene detection on new portions of the recording. + """Register callback for new scene frames. - Args: - on_new_frames: callback(list of {id, timestamp, path}) for new frames + Scene detection runs inside the recorder process (single ffmpeg). + The stderr reader thread parses showinfo lines and fires this callback. """ self._on_new_frames = on_new_frames - def _detect(): - processed_time = 0.0 - current_segment = None - last_threshold = self.scene_threshold - - while "stop" not in self._stop_flags: - time.sleep(1.0) - - # Threshold changed — reset to re-process recent content - if self.scene_threshold != last_threshold: - log.info("Threshold changed %.2f → %.2f, resetting", - last_threshold, self.scene_threshold) - last_threshold = self.scene_threshold - # Back up a bit to re-scan with new sensitivity - processed_time = max(0.0, processed_time - 10) - - seg = self.recording_path - if not seg.exists(): - continue - - if seg != current_segment: - current_segment = seg - processed_time = 0.0 - log.info("Scene detector: switched to %s", seg.name) - - size = seg.stat().st_size - if size < 100_000: - continue - - safe_duration = self._estimate_safe_duration() - if safe_duration is None or safe_duration <= 0: - continue - - process_to = safe_duration - 1 - if process_to <= processed_time + 0.5: - continue - - log.info("Scene detection: %.1fs → %.1fs", processed_time, process_to) - new_frames = self._detect_scenes( - start_time=processed_time, - end_time=process_to, - ) - - if new_frames: - log.info("Found %d new scene frames (total: %d)", - len(new_frames), self._next_frame_number() - 1) - if self._on_new_frames: - self._on_new_frames(new_frames) - - processed_time = process_to - - log.info("Scene detector stopped") - - t = Thread(target=_detect, daemon=True, name="scene_detector") - t.start() - self._threads["scene_detector"] = t - - def _estimate_safe_duration(self): - """Estimate recording duration. Uses ffprobe, falls back to file size. - - For fragmented MP4 (empty_moov), format-level duration is 0 so we - check stream duration from the last video stream instead. - """ - try: - import ffmpeg as ffmpeg_lib - info = ffmpeg_lib.probe(str(self.recording_path)) - # Format duration works for non-fragmented; 0 for empty_moov fMP4 - dur = float(info.get("format", {}).get("duration", 0)) - if dur > 0: - return dur - # Fragmented MP4: check video stream duration - for stream in info.get("streams", []): - sdur = float(stream.get("duration", 0)) - if sdur > 0: - return sdur - except Exception: - pass - - # Fallback: rough estimate from file size (~500kbit/s typical for this stream) - try: - size = self.recording_path.stat().st_size - return size / 65_000 # ~500kbps → 62.5 KB/s - except Exception: - return None + def update_scene_threshold(self, new_threshold): + """Update scene threshold. Restarts the recorder to apply new filter.""" + self.scene_threshold = new_threshold + log.info("Threshold changed → %.2f, restarting recorder", new_threshold) + self.restart_recorder() def _next_frame_number(self): """Determine next frame number from the index (source of truth).""" @@ -309,69 +229,82 @@ class StreamManager: return len(index) + 1 return 1 - def _detect_scenes(self, start_time, end_time): - """Run ffmpeg scene detection on a time range. Returns list of new frame entries.""" - import time as _time - t0 = _time.monotonic() - duration = end_time - start_time - start_number = self._next_frame_number() - - try: - _stdout, stderr = ff.extract_scene_frames( - self.recording_path, - self.frames_dir, - scene_threshold=self.scene_threshold, - start_number=start_number, - start_time=start_time, - duration=duration, - ) - except Exception as e: - log.error("Scene detection failed: %s", e) - return [] - - # Parse new frames from showinfo output — match each showinfo line - # to the corresponding file ffmpeg wrote (sequential from start_number) - new_frames = [] + def _append_frame_index(self, entry): + """Append a frame entry to index.json.""" index_path = self.frames_dir / "index.json" index = json.loads(index_path.read_text()) if index_path.exists() else [] + index.append(entry) + index_path.write_text(json.dumps(index, indent=2)) - offset = self.current_global_offset - frame_num = start_number - for line in stderr.splitlines(): - if "showinfo" not in line: - continue - pts_match = re.search(r"pts_time:\s*([\d.]+)", line) - if pts_match: + def _start_scene_stderr_reader(self, proc, start_number): + """Read stderr continuously, parsing showinfo lines for scene frames. + + Each showinfo line corresponds to a JPEG that ffmpeg writes. We wait + briefly for the file to appear on disk (showinfo fires before the + muxer flushes), then update the index and fire the callback. + """ + def _read(): + frame_num = start_number + offset = self.current_global_offset + for raw in proc.stderr: + line = raw.decode("utf-8", errors="replace").rstrip() + if not line: + continue + if "showinfo" not in line: + log.debug("[recorder] %s", line) + continue + pts_match = re.search(r"pts_time:\s*([\d.]+)", line) + if not pts_match: + continue pts_time = float(pts_match.group(1)) frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" - if frame_path.exists(): - entry = { - "id": frame_id, - "timestamp": pts_time + offset, - "path": str(frame_path), - "sent_to_agent": False, - } - index.append(entry) - new_frames.append(entry) + + # Wait for ffmpeg to flush the JPEG (showinfo fires before mux) + for _ in range(20): # up to ~200ms + if frame_path.exists() and frame_path.stat().st_size > 0: + break + time.sleep(0.01) + + if not frame_path.exists(): + log.warning("Scene frame %s not found on disk", frame_id) + continue + + entry = { + "id": frame_id, + "timestamp": pts_time + offset, + "path": str(frame_path), + "sent_to_agent": False, + } + self._append_frame_index(entry) + log.info("Scene frame: %s at %.1fs (pts=%.1f + offset=%.1f)", + frame_id, entry["timestamp"], pts_time, offset) + if self._on_new_frames: + self._on_new_frames([entry]) frame_num += 1 - index_path.write_text(json.dumps(index, indent=2)) + log.info("[recorder] stderr closed, exit=%s", proc.poll()) - elapsed_ms = (_time.monotonic() - t0) * 1000 - tel = getattr(self, "telemetry", None) - if tel: - tel.metric("scene_detection", { - "start": start_time, "end": end_time, - "duration": duration, - "frames_found": len(new_frames), - "total_frames": len(index), - "threshold": self.scene_threshold, - "elapsed_ms": round(elapsed_ms), - "file_duration": self._estimate_safe_duration() or 0, - }) + Thread(target=_read, daemon=True, name="recorder_stderr").start() - return new_frames + def _probe_safe_duration(self): + """Probe current recording duration via ffprobe. Returns seconds or None.""" + try: + import ffmpeg as ffmpeg_lib + info = ffmpeg_lib.probe(str(self.recording_path)) + dur = float(info.get("format", {}).get("duration", 0)) + if dur > 0: + return dur + for stream in info.get("streams", []): + sdur = float(stream.get("duration", 0)) + if sdur > 0: + return sdur + except Exception: + pass + try: + return self.recording_path.stat().st_size / 65_000 + except Exception: + return None def capture_now(self, on_new_frames=None): """Capture a single frame from the current recording position. @@ -380,16 +313,14 @@ class StreamManager: to the index. Runs in a thread to avoid blocking the UI. """ def _capture(): - safe_duration = self._estimate_safe_duration() + safe_duration = self._probe_safe_duration() if not safe_duration or safe_duration < 1: log.warning("capture_now: recording too short") return local_timestamp = safe_duration - 1 timestamp = local_timestamp + self.current_global_offset - index_path = self.frames_dir / "index.json" - index = json.loads(index_path.read_text()) if index_path.exists() else [] - frame_num = len(index) + 1 + frame_num = self._next_frame_number() frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" @@ -409,8 +340,7 @@ class StreamManager: "path": str(frame_path), "sent_to_agent": False, } - index.append(entry) - index_path.write_text(json.dumps(index, indent=2)) + self._append_frame_index(entry) log.info("Manual capture: %s at %.1fs", frame_id, timestamp) if on_new_frames: @@ -453,7 +383,7 @@ class StreamManager: if seg.stat().st_size < 100_000: continue - safe_duration = self._estimate_safe_duration() + safe_duration = self._probe_safe_duration() if safe_duration is None or safe_duration <= 0: continue @@ -503,12 +433,3 @@ class StreamManager: ff.stop_proc(proc) self._procs.clear() - def _start_stderr_reader(self, name, proc): - def _read(): - for line in proc.stderr: - text = line.decode("utf-8", errors="replace").rstrip() - if text: - log.debug("[%s] %s", name, text) - log.info("[%s] exited: %s", name, proc.poll()) - - Thread(target=_read, daemon=True, name=f"{name}_stderr").start() diff --git a/cht/window.py b/cht/window.py index 98f2806..d3fb005 100644 --- a/cht/window.py +++ b/cht/window.py @@ -159,9 +159,9 @@ class ChtWindow(Adw.ApplicationWindow): ) def _on_scene_threshold(self, val): - if self._lifecycle.stream_mgr: + if self._lifecycle.stream_mgr and not self._lifecycle.stream_mgr.readonly: old = self._lifecycle.stream_mgr.scene_threshold - self._lifecycle.stream_mgr.scene_threshold = val + self._lifecycle.stream_mgr.update_scene_threshold(val) if self._telemetry: self._telemetry.event("scene_threshold_changed", {"from": old, "to": val}) diff --git a/tests/test_manager.py b/tests/test_manager.py index a08719b..881d26e 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -22,7 +22,7 @@ class TestInit: assert manager.session_id == "test_session" def test_recording_path(self, manager): - assert manager.recording_path.name == "recording.mkv" + assert manager.recording_path.name == "recording_000.mp4" def test_dirs_not_created_on_init(self, manager): assert not manager.stream_dir.exists() @@ -37,17 +37,6 @@ class TestSetupDirs: assert manager.agent_dir.is_dir() -class TestStartRecorder: - @patch("cht.stream.manager.ff.run_async") - @patch("cht.stream.manager.ff.receive_and_record") - def test_starts_ffmpeg(self, mock_record, mock_async, manager): - manager.setup_dirs() - mock_record.return_value = MagicMock() - manager.start_recorder() - mock_record.assert_called_once_with(manager.stream_url, manager.recording_path) - assert "recorder" in manager._procs - - class TestStopAll: @patch("cht.stream.manager.ff.stop_proc") def test_stops_all_procs(self, mock_stop, manager): @@ -62,41 +51,44 @@ class TestStopAll: assert "stop" in manager._stop_flags -class TestDetectScenes: - @patch("cht.stream.manager.ff.extract_scene_frames") - def test_returns_new_frames(self, mock_extract, manager): +class TestFrameIndex: + def test_next_frame_number_empty(self, manager): manager.setup_dirs() - rec = manager.recording_path - rec.touch() + assert manager._next_frame_number() == 1 - def create_frame(*args, **kwargs): - (manager.frames_dir / "F0001.jpg").touch() - return ("", "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:10.5 stuff\n") - - mock_extract.side_effect = create_frame - - frames = manager._detect_scenes(start_time=0, end_time=15, start_number=1) - assert len(frames) == 1 - assert frames[0]["id"] == "F0001" - assert frames[0]["timestamp"] == 10.5 - - @patch("cht.stream.manager.ff.extract_scene_frames") - def test_passes_duration(self, mock_extract, manager): + def test_next_frame_number_with_existing(self, manager): manager.setup_dirs() - manager.recording_path.touch() - mock_extract.return_value = ("", "") + index = [{"id": "F0001"}, {"id": "F0002"}] + (manager.frames_dir / "index.json").write_text(json.dumps(index)) + assert manager._next_frame_number() == 3 - manager._detect_scenes(start_time=10, end_time=25, start_number=1) - - call_kwargs = mock_extract.call_args - assert call_kwargs.kwargs["start_time"] == 10 - assert call_kwargs.kwargs["duration"] == 15 - - @patch("cht.stream.manager.ff.extract_scene_frames") - def test_handles_failure(self, mock_extract, manager): + def test_append_frame_index(self, manager): manager.setup_dirs() - manager.recording_path.touch() - mock_extract.side_effect = RuntimeError("boom") + entry = {"id": "F0001", "timestamp": 5.0, "path": "/tmp/F0001.jpg", "sent_to_agent": False} + manager._append_frame_index(entry) + index = json.loads((manager.frames_dir / "index.json").read_text()) + assert len(index) == 1 + assert index[0]["id"] == "F0001" - frames = manager._detect_scenes(start_time=0, end_time=10, start_number=1) - assert frames == [] + def test_append_frame_index_accumulates(self, manager): + manager.setup_dirs() + for i in range(3): + entry = {"id": f"F{i+1:04d}", "timestamp": float(i), "path": f"/tmp/F{i+1:04d}.jpg", "sent_to_agent": False} + manager._append_frame_index(entry) + index = json.loads((manager.frames_dir / "index.json").read_text()) + assert len(index) == 3 + + +class TestSceneDetector: + def test_start_scene_detector_stores_callback(self, manager): + cb = MagicMock() + manager.start_scene_detector(on_new_frames=cb) + assert manager._on_new_frames is cb + + def test_update_scene_threshold(self, manager): + manager.setup_dirs() + # Mock restart_recorder to avoid launching ffmpeg + manager.restart_recorder = MagicMock() + manager.update_scene_threshold(0.25) + assert manager.scene_threshold == 0.25 + manager.restart_recorder.assert_called_once()