diff --git a/.gitignore b/.gitignore index 2ae2849..b5f011e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ def data/ +bin/ *.egg-info/ __pycache__/ .venv/ diff --git a/cht/config.py b/cht/config.py index 53f9b57..52681be 100644 --- a/cht/config.py +++ b/cht/config.py @@ -13,6 +13,7 @@ SESSIONS_DIR = Path(os.environ.get("CHT_SESSIONS_DIR", DATA_DIR / "sessions")) STREAM_HOST = "0.0.0.0" STREAM_PORT = 4444 RELAY_PORT = 4445 # UDP loopback relay for live display +SCENE_RELAY_PORT = 4446 # UDP loopback relay for scene detector # Frame extraction — scene-only, no interval fallback SCENE_THRESHOLD = 0.10 # 0-1, lower = more sensitive; 0.1 catches slide/window changes diff --git a/cht/scrub/proxy.py b/cht/scrub/proxy.py index 341b655..d5c3473 100644 --- a/cht/scrub/proxy.py +++ b/cht/scrub/proxy.py @@ -36,7 +36,7 @@ def generate_proxy(segment_path: Path, output_path: Path, """ output_path.parent.mkdir(parents=True, exist_ok=True) - stream = ffmpeg_lib.input(str(segment_path)) + stream = ffmpeg_lib.input(str(segment_path), hwaccel="cuda") output = ( ffmpeg_lib.output( stream, str(output_path), diff --git a/cht/session.py b/cht/session.py index 6e086c8..43dfd45 100644 --- a/cht/session.py +++ b/cht/session.py @@ -108,7 +108,8 @@ def load_frame_index(frames_dir: Path) -> list[dict]: return [] try: index = json.loads(index_path.read_text()) - except (json.JSONDecodeError, IOError): + except (json.JSONDecodeError, IOError) as e: + log.debug("Failed to read frame index %s: %s", index_path, e) return [] result = [] for entry in index: diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index c9e129d..7cee3f7 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -39,15 +39,15 @@ def receive_and_record(stream_url, output_path): ) -def receive_record_and_relay(stream_url, output_path, relay_url): +def receive_record_and_relay(stream_url, output_path, relay_url, scene_relay_url=None): """Receive TCP stream, write to fragmented MP4, and relay to UDP loopback. Fragmented MP4 (frag_keyframe+empty_moov) avoids MKV tail corruption: each keyframe boundary closes a self-contained fragment, so the file is always valid up to the last complete fragment (~1 keyframe interval ≈ 2s). - This allows the scene detector to use a 2s safety margin instead of 6s. Uses ffmpeg tee via merge_outputs: one process, identical timestamps. + Optionally sends a second relay for the scene detector. """ stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") file_out = ffmpeg.output( @@ -61,7 +61,50 @@ def receive_record_and_relay(stream_url, output_path, relay_url): stream, relay_url, c="copy", f="mpegts", ) - return ffmpeg.merge_outputs(file_out, relay_out).global_args(*QUIET_ARGS) + outputs = [file_out, relay_out] + if scene_relay_url: + scene_out = ffmpeg.output( + stream, scene_relay_url, + c="copy", f="mpegts", + ) + outputs.append(scene_out) + return ffmpeg.merge_outputs(*outputs).global_args(*QUIET_ARGS) + + +def start_live_scene_detector(stream_url, output_dir, scene_threshold=0.10, + start_number=1): + """Start a persistent ffmpeg process that detects scenes from a live stream. + + Reads from the UDP relay in real-time — no file seeking, no restart overhead. + Writes frame JPEGs and emits showinfo on stderr as scenes are detected. + Returns the async process (stderr must be read continuously). + """ + select_expr = f"gt(scene,{scene_threshold})" + + stream = ffmpeg.input( + stream_url, + fflags="nobuffer+flush_packets", + flags="low_delay", + probesize="32000", + analyzeduration="0", + hwaccel="cuda", + ) + stream = stream.filter("select", select_expr).filter("showinfo") + + output = ( + ffmpeg.output( + stream, + str(output_dir / "F%04d.jpg"), + vsync="vfr", + flush_packets=1, + **{"q:v": "2"}, + start_number=start_number, + ) + .global_args(*GLOBAL_ARGS) + ) + + log.info("start_live_scene_detector: %s", " ".join(output.compile())) + return run_async(output, pipe_stderr=True) def extract_scene_frames(input_path, output_dir, scene_threshold=0.10, @@ -72,23 +115,28 @@ def extract_scene_frames(input_path, output_dir, scene_threshold=0.10, meaningfully vs the previous frame. No periodic fallback so static content produces no spurious frames. - Uses -ss input seeking for O(1) startup regardless of file size. - pts_time in showinfo output is relative to the seek point. + start_time/duration: applied via the select filter expression (NOT as -ss/-t + input options, which break scene detection on fragmented MP4). Returns (stdout, stderr) as decoded strings for timestamp parsing. """ scene_expr = f"gt(scene,{scene_threshold})" - # With -ss input seeking, t starts at 0 from the seek point. - # Only need end boundary (duration), start is handled by -ss. - if duration is not None: - scene_expr = f"({scene_expr})*lte(t,{duration})" - - input_kwargs = {} + time_conditions = [] if start_time > 0: - input_kwargs["ss"] = start_time + time_conditions.append(f"gte(t,{start_time})") + if duration is not None: + time_conditions.append(f"lte(t,{start_time + duration})") - stream = ffmpeg.input(str(input_path), **input_kwargs) - stream = stream.filter("select", scene_expr).filter("showinfo") + if time_conditions: + time_filter = "*".join(time_conditions) + select_expr = f"({scene_expr})*{time_filter}" + else: + select_expr = scene_expr + + # CUDA hardware decode — GPU does h264 parsing, frames auto-transfer + # to CPU for the scene filter. Falls back to software if unavailable. + stream = ffmpeg.input(str(input_path), hwaccel="cuda") + stream = stream.filter("select", select_expr).filter("showinfo") output = ( ffmpeg.output( @@ -150,7 +198,7 @@ def extract_audio_chunk(input_path, output_path, start_time=0.0, duration=None): def extract_frame_at(input_path, output_path, timestamp): """Extract a single frame at the given timestamp.""" output = ( - ffmpeg.input(str(input_path), ss=timestamp) + ffmpeg.input(str(input_path), ss=timestamp, hwaccel="cuda") .output(str(output_path), vframes=1, **{"q:v": "2"}) .overwrite_output() .global_args(*QUIET_ARGS) diff --git a/cht/stream/manager.py b/cht/stream/manager.py index 5487cda..b46353a 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -18,6 +18,7 @@ from cht.config import ( STREAM_HOST, STREAM_PORT, RELAY_PORT, + SCENE_RELAY_PORT, SCENE_THRESHOLD, SESSIONS_DIR, AUDIO_EXTRACT_INTERVAL, @@ -155,6 +156,10 @@ class StreamManager: def relay_url(self): return f"udp://127.0.0.1:{RELAY_PORT}" + @property + def scene_relay_url(self): + return f"udp://127.0.0.1:{SCENE_RELAY_PORT}" + @property def recording_path(self): """Current recording segment path.""" @@ -193,7 +198,9 @@ class StreamManager: return proc is not None and proc.poll() is None def _launch_recorder(self): - node = ff.receive_record_and_relay(self.stream_url, self.recording_path, self.relay_url) + node = ff.receive_record_and_relay( + self.stream_url, self.recording_path, self.relay_url, + ) proc = ff.run_async(node, pipe_stderr=True) self._procs["recorder"] = proc log.info("Recorder: pid=%s → %s", proc.pid, self.recording_path) @@ -211,38 +218,38 @@ class StreamManager: def _detect(): processed_time = 0.0 - idle_cycles = 0 current_segment = None + last_threshold = self.scene_threshold while "stop" not in self._stop_flags: - # Adaptive sleep: faster at lower thresholds (more sensitive) - # threshold 0.01→1s base, 0.10→1s, 0.50→2s - base = max(1.0, min(2.0, self.scene_threshold * 10)) - sleep_secs = base if idle_cycles == 0 else min(base * 2, base * (2 ** idle_cycles)) - time.sleep(sleep_secs) + time.sleep(1.0) + + # Threshold changed — reset to re-process recent content + if self.scene_threshold != last_threshold: + log.info("Threshold changed %.2f → %.2f, resetting", + last_threshold, self.scene_threshold) + last_threshold = self.scene_threshold + # Back up a bit to re-scan with new sensitivity + processed_time = max(0.0, processed_time - 10) seg = self.recording_path if not seg.exists(): continue - # New segment started — reset per-segment progress if seg != current_segment: current_segment = seg processed_time = 0.0 - idle_cycles = 0 log.info("Scene detector: switched to %s", seg.name) size = seg.stat().st_size if size < 100_000: continue - # Probe current segment duration directly (not total across segments) safe_duration = self._estimate_safe_duration() if safe_duration is None or safe_duration <= 0: continue - # 2s safety margin for incomplete tail fragments - process_to = safe_duration - 2 + process_to = safe_duration - 1 if process_to <= processed_time + 0.5: continue @@ -253,13 +260,10 @@ class StreamManager: ) if new_frames: - idle_cycles = 0 log.info("Found %d new scene frames (total: %d)", len(new_frames), self._next_frame_number() - 1) if self._on_new_frames: self._on_new_frames(new_frames) - else: - idle_cycles += 1 processed_time = process_to @@ -338,8 +342,7 @@ class StreamManager: continue pts_match = re.search(r"pts_time:\s*([\d.]+)", line) if pts_match: - # pts_time is relative to -ss seek point, add start_time for local offset - pts_time = float(pts_match.group(1)) + start_time + pts_time = float(pts_match.group(1)) frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" if frame_path.exists(): diff --git a/cht/window.py b/cht/window.py index 998464c..98f2806 100644 --- a/cht/window.py +++ b/cht/window.py @@ -682,7 +682,10 @@ class ChtWindow(Adw.ApplicationWindow): def _poll_frames(self): if not self._lifecycle.stream_mgr: return False - for entry in load_frame_index(self._lifecycle.stream_mgr.frames_dir): + entries = load_frame_index(self._lifecycle.stream_mgr.frames_dir) + if entries and not self._known_frames: + log.info("Poll: found %d frames, known %d", len(entries), len(self._known_frames)) + for entry in entries: fid = entry["id"] if fid in self._known_frames: continue