first try new approach

This commit is contained in:
2026-04-03 10:47:27 -03:00
parent fbf9984a5d
commit 2a049d8c2b
5 changed files with 164 additions and 254 deletions

View File

@@ -13,7 +13,6 @@ SESSIONS_DIR = Path(os.environ.get("CHT_SESSIONS_DIR", DATA_DIR / "sessions"))
STREAM_HOST = "0.0.0.0" STREAM_HOST = "0.0.0.0"
STREAM_PORT = 4444 STREAM_PORT = 4444
RELAY_PORT = 4445 # UDP loopback relay for live display RELAY_PORT = 4445 # UDP loopback relay for live display
SCENE_RELAY_PORT = 4446 # UDP loopback relay for scene detector
# Frame extraction — scene-only, no interval fallback # Frame extraction — scene-only, no interval fallback
SCENE_THRESHOLD = 0.10 # 0-1, lower = more sensitive; 0.1 catches slide/window changes SCENE_THRESHOLD = 0.10 # 0-1, lower = more sensitive; 0.1 catches slide/window changes

View File

@@ -39,7 +39,7 @@ def receive_and_record(stream_url, output_path):
) )
def receive_record_and_relay(stream_url, output_path, relay_url, scene_relay_url=None): def receive_record_and_relay(stream_url, output_path, relay_url):
"""Receive TCP stream, write to fragmented MP4, and relay to UDP loopback. """Receive TCP stream, write to fragmented MP4, and relay to UDP loopback.
Fragmented MP4 (frag_keyframe+empty_moov) avoids MKV tail corruption: Fragmented MP4 (frag_keyframe+empty_moov) avoids MKV tail corruption:
@@ -47,7 +47,6 @@ def receive_record_and_relay(stream_url, output_path, relay_url, scene_relay_url
always valid up to the last complete fragment (~1 keyframe interval ≈ 2s). always valid up to the last complete fragment (~1 keyframe interval ≈ 2s).
Uses ffmpeg tee via merge_outputs: one process, identical timestamps. Uses ffmpeg tee via merge_outputs: one process, identical timestamps.
Optionally sends a second relay for the scene detector.
""" """
stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay")
file_out = ffmpeg.output( file_out = ffmpeg.output(
@@ -61,50 +60,49 @@ def receive_record_and_relay(stream_url, output_path, relay_url, scene_relay_url
stream, relay_url, stream, relay_url,
c="copy", f="mpegts", c="copy", f="mpegts",
) )
outputs = [file_out, relay_out] return ffmpeg.merge_outputs(file_out, relay_out).global_args(*QUIET_ARGS)
if scene_relay_url:
scene_out = ffmpeg.output(
stream, scene_relay_url,
c="copy", f="mpegts",
)
outputs.append(scene_out)
return ffmpeg.merge_outputs(*outputs).global_args(*QUIET_ARGS)
def start_live_scene_detector(stream_url, output_dir, scene_threshold=0.10, def receive_record_relay_and_detect(stream_url, output_path, relay_url,
start_number=1): frames_dir, scene_threshold=0.10,
"""Start a persistent ffmpeg process that detects scenes from a live stream. start_number=1):
"""Single process: receive TCP → record fMP4 + relay UDP + scene detect.
Reads from the UDP relay in real-time — no file seeking, no restart overhead. One ffmpeg process, three output branches from the same TCP input:
Writes frame JPEGs and emits showinfo on stderr as scenes are detected. 1. File output — c=copy to fMP4
Returns the async process (stderr must be read continuously). 2. UDP relay — c=copy to mpegts for live display
3. Scene frames — CUDA decode → select(scene) → showinfo → JPEG files
The scene filter runs on decoded frames in-process, so detection latency
is near-zero (no polling, no file re-reading, no separate process).
Stderr must be read continuously to parse showinfo lines.
""" """
stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay",
hwaccel="cuda")
# Copy outputs (raw packet remux, no decode)
file_out = ffmpeg.output(
stream, str(output_path),
c="copy", f="mp4",
movflags="frag_keyframe+empty_moov+default_base_moof",
flush_packets=1,
**{"bsf:a": "aac_adtstoasc"},
)
relay_out = ffmpeg.output(
stream, relay_url,
c="copy", f="mpegts",
)
# Scene detection output (decode + filter → JPEG)
select_expr = f"gt(scene,{scene_threshold})" select_expr = f"gt(scene,{scene_threshold})"
scene_stream = stream.filter("select", select_expr).filter("showinfo")
stream = ffmpeg.input( scene_out = ffmpeg.output(
stream_url, scene_stream, str(frames_dir / "F%04d.jpg"),
fflags="nobuffer+flush_packets", vsync="vfr", flush_packets=1, **{"q:v": "2"},
flags="low_delay", start_number=start_number,
probesize="32000",
analyzeduration="0",
hwaccel="cuda",
)
stream = stream.filter("select", select_expr).filter("showinfo")
output = (
ffmpeg.output(
stream,
str(output_dir / "F%04d.jpg"),
vsync="vfr",
flush_packets=1,
**{"q:v": "2"},
start_number=start_number,
)
.global_args(*GLOBAL_ARGS)
) )
log.info("start_live_scene_detector: %s", " ".join(output.compile())) return ffmpeg.merge_outputs(file_out, relay_out, scene_out).global_args(*GLOBAL_ARGS)
return run_async(output, pipe_stderr=True)
def extract_scene_frames(input_path, output_dir, scene_threshold=0.10, def extract_scene_frames(input_path, output_dir, scene_threshold=0.10,

View File

@@ -2,23 +2,22 @@
StreamManager: orchestrates ffmpeg for recording and scene detection. StreamManager: orchestrates ffmpeg for recording and scene detection.
Architecture: Architecture:
sender → TCP:4444 → ffmpeg (writes recording.ts) sender → TCP:4444 → single ffmpeg process:
recording.ts → mpv (plays via Timeline) 1. writes fMP4 to disk (c=copy)
recording.ts → ffmpeg scene detection (periodic, incremental) 2. relays UDP for live display (c=copy)
3. CUDA decode → scene filter → JPEG frames (real-time)
""" """
import json import json
import logging import logging
import re import re
import time import time
from pathlib import Path
from threading import Thread from threading import Thread
from cht.config import ( from cht.config import (
STREAM_HOST, STREAM_HOST,
STREAM_PORT, STREAM_PORT,
RELAY_PORT, RELAY_PORT,
SCENE_RELAY_PORT,
SCENE_THRESHOLD, SCENE_THRESHOLD,
SESSIONS_DIR, SESSIONS_DIR,
AUDIO_EXTRACT_INTERVAL, AUDIO_EXTRACT_INTERVAL,
@@ -156,10 +155,6 @@ class StreamManager:
def relay_url(self): def relay_url(self):
return f"udp://127.0.0.1:{RELAY_PORT}" return f"udp://127.0.0.1:{RELAY_PORT}"
@property
def scene_relay_url(self):
return f"udp://127.0.0.1:{SCENE_RELAY_PORT}"
@property @property
def recording_path(self): def recording_path(self):
"""Current recording segment path.""" """Current recording segment path."""
@@ -198,108 +193,33 @@ class StreamManager:
return proc is not None and proc.poll() is None return proc is not None and proc.poll() is None
def _launch_recorder(self): def _launch_recorder(self):
node = ff.receive_record_and_relay( start_number = self._next_frame_number()
node = ff.receive_record_relay_and_detect(
self.stream_url, self.recording_path, self.relay_url, self.stream_url, self.recording_path, self.relay_url,
self.frames_dir, scene_threshold=self.scene_threshold,
start_number=start_number,
) )
proc = ff.run_async(node, pipe_stderr=True) proc = ff.run_async(node, pipe_stderr=True)
self._procs["recorder"] = proc self._procs["recorder"] = proc
log.info("Recorder: pid=%s%s", proc.pid, self.recording_path) log.info("Recorder+scene: pid=%s%s (threshold=%.2f, start_number=%d)",
self._start_stderr_reader("recorder", proc) proc.pid, self.recording_path, self.scene_threshold, start_number)
self._start_scene_stderr_reader(proc, start_number)
# -- Scene Detection -- # -- Scene Detection --
def start_scene_detector(self, on_new_frames=None): def start_scene_detector(self, on_new_frames=None):
"""Periodically run scene detection on new portions of the recording. """Register callback for new scene frames.
Args: Scene detection runs inside the recorder process (single ffmpeg).
on_new_frames: callback(list of {id, timestamp, path}) for new frames The stderr reader thread parses showinfo lines and fires this callback.
""" """
self._on_new_frames = on_new_frames self._on_new_frames = on_new_frames
def _detect(): def update_scene_threshold(self, new_threshold):
processed_time = 0.0 """Update scene threshold. Restarts the recorder to apply new filter."""
current_segment = None self.scene_threshold = new_threshold
last_threshold = self.scene_threshold log.info("Threshold changed → %.2f, restarting recorder", new_threshold)
self.restart_recorder()
while "stop" not in self._stop_flags:
time.sleep(1.0)
# Threshold changed — reset to re-process recent content
if self.scene_threshold != last_threshold:
log.info("Threshold changed %.2f%.2f, resetting",
last_threshold, self.scene_threshold)
last_threshold = self.scene_threshold
# Back up a bit to re-scan with new sensitivity
processed_time = max(0.0, processed_time - 10)
seg = self.recording_path
if not seg.exists():
continue
if seg != current_segment:
current_segment = seg
processed_time = 0.0
log.info("Scene detector: switched to %s", seg.name)
size = seg.stat().st_size
if size < 100_000:
continue
safe_duration = self._estimate_safe_duration()
if safe_duration is None or safe_duration <= 0:
continue
process_to = safe_duration - 1
if process_to <= processed_time + 0.5:
continue
log.info("Scene detection: %.1fs → %.1fs", processed_time, process_to)
new_frames = self._detect_scenes(
start_time=processed_time,
end_time=process_to,
)
if new_frames:
log.info("Found %d new scene frames (total: %d)",
len(new_frames), self._next_frame_number() - 1)
if self._on_new_frames:
self._on_new_frames(new_frames)
processed_time = process_to
log.info("Scene detector stopped")
t = Thread(target=_detect, daemon=True, name="scene_detector")
t.start()
self._threads["scene_detector"] = t
def _estimate_safe_duration(self):
"""Estimate recording duration. Uses ffprobe, falls back to file size.
For fragmented MP4 (empty_moov), format-level duration is 0 so we
check stream duration from the last video stream instead.
"""
try:
import ffmpeg as ffmpeg_lib
info = ffmpeg_lib.probe(str(self.recording_path))
# Format duration works for non-fragmented; 0 for empty_moov fMP4
dur = float(info.get("format", {}).get("duration", 0))
if dur > 0:
return dur
# Fragmented MP4: check video stream duration
for stream in info.get("streams", []):
sdur = float(stream.get("duration", 0))
if sdur > 0:
return sdur
except Exception:
pass
# Fallback: rough estimate from file size (~500kbit/s typical for this stream)
try:
size = self.recording_path.stat().st_size
return size / 65_000 # ~500kbps → 62.5 KB/s
except Exception:
return None
def _next_frame_number(self): def _next_frame_number(self):
"""Determine next frame number from the index (source of truth).""" """Determine next frame number from the index (source of truth)."""
@@ -309,69 +229,82 @@ class StreamManager:
return len(index) + 1 return len(index) + 1
return 1 return 1
def _detect_scenes(self, start_time, end_time): def _append_frame_index(self, entry):
"""Run ffmpeg scene detection on a time range. Returns list of new frame entries.""" """Append a frame entry to index.json."""
import time as _time
t0 = _time.monotonic()
duration = end_time - start_time
start_number = self._next_frame_number()
try:
_stdout, stderr = ff.extract_scene_frames(
self.recording_path,
self.frames_dir,
scene_threshold=self.scene_threshold,
start_number=start_number,
start_time=start_time,
duration=duration,
)
except Exception as e:
log.error("Scene detection failed: %s", e)
return []
# Parse new frames from showinfo output — match each showinfo line
# to the corresponding file ffmpeg wrote (sequential from start_number)
new_frames = []
index_path = self.frames_dir / "index.json" index_path = self.frames_dir / "index.json"
index = json.loads(index_path.read_text()) if index_path.exists() else [] index = json.loads(index_path.read_text()) if index_path.exists() else []
index.append(entry)
index_path.write_text(json.dumps(index, indent=2))
offset = self.current_global_offset def _start_scene_stderr_reader(self, proc, start_number):
frame_num = start_number """Read stderr continuously, parsing showinfo lines for scene frames.
for line in stderr.splitlines():
if "showinfo" not in line: Each showinfo line corresponds to a JPEG that ffmpeg writes. We wait
continue briefly for the file to appear on disk (showinfo fires before the
pts_match = re.search(r"pts_time:\s*([\d.]+)", line) muxer flushes), then update the index and fire the callback.
if pts_match: """
def _read():
frame_num = start_number
offset = self.current_global_offset
for raw in proc.stderr:
line = raw.decode("utf-8", errors="replace").rstrip()
if not line:
continue
if "showinfo" not in line:
log.debug("[recorder] %s", line)
continue
pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
if not pts_match:
continue
pts_time = float(pts_match.group(1)) pts_time = float(pts_match.group(1))
frame_id = f"F{frame_num:04d}" frame_id = f"F{frame_num:04d}"
frame_path = self.frames_dir / f"{frame_id}.jpg" frame_path = self.frames_dir / f"{frame_id}.jpg"
if frame_path.exists():
entry = { # Wait for ffmpeg to flush the JPEG (showinfo fires before mux)
"id": frame_id, for _ in range(20): # up to ~200ms
"timestamp": pts_time + offset, if frame_path.exists() and frame_path.stat().st_size > 0:
"path": str(frame_path), break
"sent_to_agent": False, time.sleep(0.01)
}
index.append(entry) if not frame_path.exists():
new_frames.append(entry) log.warning("Scene frame %s not found on disk", frame_id)
continue
entry = {
"id": frame_id,
"timestamp": pts_time + offset,
"path": str(frame_path),
"sent_to_agent": False,
}
self._append_frame_index(entry)
log.info("Scene frame: %s at %.1fs (pts=%.1f + offset=%.1f)",
frame_id, entry["timestamp"], pts_time, offset)
if self._on_new_frames:
self._on_new_frames([entry])
frame_num += 1 frame_num += 1
index_path.write_text(json.dumps(index, indent=2)) log.info("[recorder] stderr closed, exit=%s", proc.poll())
elapsed_ms = (_time.monotonic() - t0) * 1000 Thread(target=_read, daemon=True, name="recorder_stderr").start()
tel = getattr(self, "telemetry", None)
if tel:
tel.metric("scene_detection", {
"start": start_time, "end": end_time,
"duration": duration,
"frames_found": len(new_frames),
"total_frames": len(index),
"threshold": self.scene_threshold,
"elapsed_ms": round(elapsed_ms),
"file_duration": self._estimate_safe_duration() or 0,
})
return new_frames def _probe_safe_duration(self):
"""Probe current recording duration via ffprobe. Returns seconds or None."""
try:
import ffmpeg as ffmpeg_lib
info = ffmpeg_lib.probe(str(self.recording_path))
dur = float(info.get("format", {}).get("duration", 0))
if dur > 0:
return dur
for stream in info.get("streams", []):
sdur = float(stream.get("duration", 0))
if sdur > 0:
return sdur
except Exception:
pass
try:
return self.recording_path.stat().st_size / 65_000
except Exception:
return None
def capture_now(self, on_new_frames=None): def capture_now(self, on_new_frames=None):
"""Capture a single frame from the current recording position. """Capture a single frame from the current recording position.
@@ -380,16 +313,14 @@ class StreamManager:
to the index. Runs in a thread to avoid blocking the UI. to the index. Runs in a thread to avoid blocking the UI.
""" """
def _capture(): def _capture():
safe_duration = self._estimate_safe_duration() safe_duration = self._probe_safe_duration()
if not safe_duration or safe_duration < 1: if not safe_duration or safe_duration < 1:
log.warning("capture_now: recording too short") log.warning("capture_now: recording too short")
return return
local_timestamp = safe_duration - 1 local_timestamp = safe_duration - 1
timestamp = local_timestamp + self.current_global_offset timestamp = local_timestamp + self.current_global_offset
index_path = self.frames_dir / "index.json" frame_num = self._next_frame_number()
index = json.loads(index_path.read_text()) if index_path.exists() else []
frame_num = len(index) + 1
frame_id = f"F{frame_num:04d}" frame_id = f"F{frame_num:04d}"
frame_path = self.frames_dir / f"{frame_id}.jpg" frame_path = self.frames_dir / f"{frame_id}.jpg"
@@ -409,8 +340,7 @@ class StreamManager:
"path": str(frame_path), "path": str(frame_path),
"sent_to_agent": False, "sent_to_agent": False,
} }
index.append(entry) self._append_frame_index(entry)
index_path.write_text(json.dumps(index, indent=2))
log.info("Manual capture: %s at %.1fs", frame_id, timestamp) log.info("Manual capture: %s at %.1fs", frame_id, timestamp)
if on_new_frames: if on_new_frames:
@@ -453,7 +383,7 @@ class StreamManager:
if seg.stat().st_size < 100_000: if seg.stat().st_size < 100_000:
continue continue
safe_duration = self._estimate_safe_duration() safe_duration = self._probe_safe_duration()
if safe_duration is None or safe_duration <= 0: if safe_duration is None or safe_duration <= 0:
continue continue
@@ -503,12 +433,3 @@ class StreamManager:
ff.stop_proc(proc) ff.stop_proc(proc)
self._procs.clear() self._procs.clear()
def _start_stderr_reader(self, name, proc):
def _read():
for line in proc.stderr:
text = line.decode("utf-8", errors="replace").rstrip()
if text:
log.debug("[%s] %s", name, text)
log.info("[%s] exited: %s", name, proc.poll())
Thread(target=_read, daemon=True, name=f"{name}_stderr").start()

View File

@@ -159,9 +159,9 @@ class ChtWindow(Adw.ApplicationWindow):
) )
def _on_scene_threshold(self, val): def _on_scene_threshold(self, val):
if self._lifecycle.stream_mgr: if self._lifecycle.stream_mgr and not self._lifecycle.stream_mgr.readonly:
old = self._lifecycle.stream_mgr.scene_threshold old = self._lifecycle.stream_mgr.scene_threshold
self._lifecycle.stream_mgr.scene_threshold = val self._lifecycle.stream_mgr.update_scene_threshold(val)
if self._telemetry: if self._telemetry:
self._telemetry.event("scene_threshold_changed", {"from": old, "to": val}) self._telemetry.event("scene_threshold_changed", {"from": old, "to": val})

View File

@@ -22,7 +22,7 @@ class TestInit:
assert manager.session_id == "test_session" assert manager.session_id == "test_session"
def test_recording_path(self, manager): def test_recording_path(self, manager):
assert manager.recording_path.name == "recording.mkv" assert manager.recording_path.name == "recording_000.mp4"
def test_dirs_not_created_on_init(self, manager): def test_dirs_not_created_on_init(self, manager):
assert not manager.stream_dir.exists() assert not manager.stream_dir.exists()
@@ -37,17 +37,6 @@ class TestSetupDirs:
assert manager.agent_dir.is_dir() assert manager.agent_dir.is_dir()
class TestStartRecorder:
@patch("cht.stream.manager.ff.run_async")
@patch("cht.stream.manager.ff.receive_and_record")
def test_starts_ffmpeg(self, mock_record, mock_async, manager):
manager.setup_dirs()
mock_record.return_value = MagicMock()
manager.start_recorder()
mock_record.assert_called_once_with(manager.stream_url, manager.recording_path)
assert "recorder" in manager._procs
class TestStopAll: class TestStopAll:
@patch("cht.stream.manager.ff.stop_proc") @patch("cht.stream.manager.ff.stop_proc")
def test_stops_all_procs(self, mock_stop, manager): def test_stops_all_procs(self, mock_stop, manager):
@@ -62,41 +51,44 @@ class TestStopAll:
assert "stop" in manager._stop_flags assert "stop" in manager._stop_flags
class TestDetectScenes: class TestFrameIndex:
@patch("cht.stream.manager.ff.extract_scene_frames") def test_next_frame_number_empty(self, manager):
def test_returns_new_frames(self, mock_extract, manager):
manager.setup_dirs() manager.setup_dirs()
rec = manager.recording_path assert manager._next_frame_number() == 1
rec.touch()
def create_frame(*args, **kwargs): def test_next_frame_number_with_existing(self, manager):
(manager.frames_dir / "F0001.jpg").touch()
return ("", "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:10.5 stuff\n")
mock_extract.side_effect = create_frame
frames = manager._detect_scenes(start_time=0, end_time=15, start_number=1)
assert len(frames) == 1
assert frames[0]["id"] == "F0001"
assert frames[0]["timestamp"] == 10.5
@patch("cht.stream.manager.ff.extract_scene_frames")
def test_passes_duration(self, mock_extract, manager):
manager.setup_dirs() manager.setup_dirs()
manager.recording_path.touch() index = [{"id": "F0001"}, {"id": "F0002"}]
mock_extract.return_value = ("", "") (manager.frames_dir / "index.json").write_text(json.dumps(index))
assert manager._next_frame_number() == 3
manager._detect_scenes(start_time=10, end_time=25, start_number=1) def test_append_frame_index(self, manager):
call_kwargs = mock_extract.call_args
assert call_kwargs.kwargs["start_time"] == 10
assert call_kwargs.kwargs["duration"] == 15
@patch("cht.stream.manager.ff.extract_scene_frames")
def test_handles_failure(self, mock_extract, manager):
manager.setup_dirs() manager.setup_dirs()
manager.recording_path.touch() entry = {"id": "F0001", "timestamp": 5.0, "path": "/tmp/F0001.jpg", "sent_to_agent": False}
mock_extract.side_effect = RuntimeError("boom") manager._append_frame_index(entry)
index = json.loads((manager.frames_dir / "index.json").read_text())
assert len(index) == 1
assert index[0]["id"] == "F0001"
frames = manager._detect_scenes(start_time=0, end_time=10, start_number=1) def test_append_frame_index_accumulates(self, manager):
assert frames == [] manager.setup_dirs()
for i in range(3):
entry = {"id": f"F{i+1:04d}", "timestamp": float(i), "path": f"/tmp/F{i+1:04d}.jpg", "sent_to_agent": False}
manager._append_frame_index(entry)
index = json.loads((manager.frames_dir / "index.json").read_text())
assert len(index) == 3
class TestSceneDetector:
def test_start_scene_detector_stores_callback(self, manager):
cb = MagicMock()
manager.start_scene_detector(on_new_frames=cb)
assert manager._on_new_frames is cb
def test_update_scene_threshold(self, manager):
manager.setup_dirs()
# Mock restart_recorder to avoid launching ffmpeg
manager.restart_recorder = MagicMock()
manager.update_scene_threshold(0.25)
assert manager.scene_threshold == 0.25
manager.restart_recorder.assert_called_once()