From 6b6bc64ab8f65944e850159bfcaba76f911b53e7 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Fri, 10 Apr 2026 01:27:09 -0300 Subject: [PATCH] saving status before scene frame fix after rust change --- cht/stream/ffmpeg.py | 5 +- cht/stream/lifecycle.py | 22 +++++++ cht/stream/manager.py | 6 ++ cht/stream/processor.py | 114 +++++++++++++++++++++++++++++++++--- cht/window.py | 51 +++++++++++----- media/server/src/session.rs | 15 +++-- 6 files changed, 186 insertions(+), 27 deletions(-) diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index 641e39b..d39f9b9 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -187,7 +187,10 @@ def detect_scenes_from_pipe(scene_threshold=0.10, flush_frames=2, fps=30): - stdout: MJPEG pipe (JPEG frames on scene change) - stderr: showinfo lines with pts_time timestamps """ - stream = ffmpeg.input("pipe:0", f="h264", framerate=fps, hwaccel="cuda") + stream = ffmpeg.input( + "pipe:0", f="h264", framerate=fps, hwaccel="cuda", + fflags="nobuffer", probesize=32, analyzeduration=0, + ) scene_expr = f"gt(scene,{scene_threshold})" if flush_frames > 0: mod_val = 1 + flush_frames diff --git a/cht/stream/lifecycle.py b/cht/stream/lifecycle.py index 6e6c13c..4571533 100644 --- a/cht/stream/lifecycle.py +++ b/cht/stream/lifecycle.py @@ -102,6 +102,28 @@ class StreamLifecycle: from pathlib import Path from cht.config import DATA_DIR marker = DATA_DIR / "active-session" + + # If marker exists, check liveness via data/scene.sock (fixed path). + if marker.exists(): + try: + session_dir = Path(marker.read_text().strip()) + scene_sock = DATA_DIR / "scene.sock" + if session_dir.exists() and scene_sock.exists(): + import socket + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + s.connect(str(scene_sock)) + s.close() + log.info("Rust session dir (already active): %s", session_dir) + return session_dir + except OSError: + log.info("Stale scene.sock, cleaning up") + scene_sock.unlink(missing_ok=True) + marker.unlink(missing_ok=True) + log.info("Cleared stale active-session marker") + except Exception: + marker.unlink(missing_ok=True) + elapsed = 0.0 while elapsed < timeout: if marker.exists(): diff --git a/cht/stream/manager.py b/cht/stream/manager.py index 724a2ff..652162d 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -178,10 +178,16 @@ class StreamManager: self.processor.set_on_new_frames(on_new_frames) if self.recorder: self.recorder.capture_now(on_raw_frame=self.processor.on_captured_frame) + else: + # Rust mode: extract current frame directly from the growing fMP4. + self.processor.capture_now_from_file() def update_scene_threshold(self, new_threshold: float): if self.recorder: self.recorder.update_scene_threshold(new_threshold) + else: + # Rust mode: restart scene detector with new threshold. + self.processor.restart_scene_detector(threshold=new_threshold) # -- Processor delegation -- diff --git a/cht/stream/processor.py b/cht/stream/processor.py index 46d5b45..c50a0ce 100644 --- a/cht/stream/processor.py +++ b/cht/stream/processor.py @@ -47,6 +47,7 @@ class SessionProcessor: self._threads: dict[str, Thread] = {} self._on_new_frames = None self._on_new_audio = None + self._last_scene_capture = 0.0 self._get_recording_path = None self._get_current_global_offset = None @@ -86,6 +87,98 @@ class SessionProcessor: """Receive a manually captured frame. Write and index it.""" self.on_raw_frame(jpeg_bytes, global_ts) + def capture_now_from_file(self): + """Extract the current frame from the growing fMP4 (Rust transport mode).""" + import tempfile, os as _os + + def _capture(): + seg = self._get_recording_path() if self._get_recording_path else None + if not seg or not seg.exists(): + log.warning("capture_now: no recording file") + return + try: + import subprocess + result = subprocess.run( + ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", str(seg)], + capture_output=True, text=True, + ) + duration = float(result.stdout.strip()) + except Exception as e: + log.warning("capture_now: could not probe duration: %s", e) + return + if duration < 1: + log.warning("capture_now: recording too short") + return + timestamp = max(0, duration - 0.5) + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: + tmp_path = Path(tmp.name) + try: + ff.extract_frame_at(seg, tmp_path, timestamp) + if not tmp_path.exists(): + log.warning("capture_now: frame not written") + return + jpeg_bytes = tmp_path.read_bytes() + except Exception as e: + log.error("capture_now failed: %s", e) + return + finally: + try: + _os.unlink(tmp_path) + except Exception: + pass + offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 + self.on_raw_frame(jpeg_bytes, timestamp + offset) + + Thread(target=_capture, daemon=True, name="capture_now").start() + + def _capture_current_frame(self): + """Capture a fresh frame from the recording file's current tip. + + Called when scene detection triggers. The scene filter's own JPEG + is stale (buffered in the encoder), so we extract directly from + the fMP4 which is always near-current. + """ + seg = self._get_recording_path() if self._get_recording_path else None + if not seg or not seg.exists(): + return + duration = self._probe_safe_duration(seg) + if not duration or duration < 0.5: + return + local_ts = max(0, duration - 0.3) + offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 + + import tempfile, os as _os + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: + tmp_path = Path(tmp.name) + try: + ff.extract_frame_at(seg, tmp_path, local_ts) + if not tmp_path.exists() or tmp_path.stat().st_size == 0: + return + jpeg_bytes = tmp_path.read_bytes() + except Exception as e: + log.debug("Scene capture failed: %s", e) + return + finally: + try: + _os.unlink(tmp_path) + except Exception: + pass + + self.on_raw_frame(jpeg_bytes, local_ts + offset) + + def restart_scene_detector(self, threshold): + """Restart scene detector with a new threshold. + + Kills the running ffmpeg — the detector thread reconnects automatically + and picks up the new threshold on the next call to start_scene_detector. + """ + if "scene_detector" in self._procs: + ff.stop_proc(self._procs.pop("scene_detector"), timeout=2) + # Spawn a fresh thread with the new threshold; old thread will exit + # when its ffmpeg proc dies. + self.start_scene_detector(threshold=threshold) + # -- Frame index -- @property @@ -134,7 +227,8 @@ class SessionProcessor: Retries on failure (e.g. ffmpeg dies from bad initial frames). The server buffers the latest keyframe so reconnects start clean. """ - socket_path = self.session_dir / "stream" / "scene.sock" + from cht.config import DATA_DIR + socket_path = DATA_DIR / "scene.sock" # Wait for the socket to appear (server creates it on session start). while "stop" not in self._stop_flags: @@ -151,6 +245,10 @@ class SessionProcessor: log.exception("Scene detector error") if "stop" in self._stop_flags: break + # If the socket is gone, the session ended — don't retry. + if not socket_path.exists(): + log.info("Scene detector: socket gone, session ended") + break log.info("Scene detector: reconnecting in 2s...") time.sleep(2.0) @@ -163,7 +261,7 @@ class SessionProcessor: try: sock.connect(str(socket_path)) except OSError as e: - log.error("Scene detector: connect failed: %s", e) + log.debug("Scene detector: connect failed: %s", e) return log.info("Scene detector: connected, starting ffmpeg") @@ -197,7 +295,7 @@ class SessionProcessor: stdin_t = Thread(target=_feed_stdin, daemon=True, name="scene_stdin") stdin_t.start() - # Thread: ffmpeg stderr → parse showinfo timestamps + # Thread: ffmpeg stderr → parse showinfo timestamps → queue ts_queue = Queue() offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 @@ -217,7 +315,9 @@ class SessionProcessor: stderr_t = Thread(target=_read_stderr, daemon=True, name="scene_stderr") stderr_t.start() - # Main: ffmpeg stdout → extract JPEG frames + # Main: read JPEG frames from stdout, pair with stderr timestamps, + # skip flush frames. Same proven pattern as StreamRecorder._read_stdout. + flush_window = (SCENE_FLUSH_FRAMES + 1) / 30.0 last_pts = 0.0 buf = b"" raw_fd = proc.stdout.fileno() @@ -241,16 +341,16 @@ class SessionProcessor: try: pts_time = ts_queue.get(timeout=2.0) except Empty: - log.warning("No timestamp for scene frame") + log.warning("No timestamp for scene frame, using 0") pts_time = 0.0 - # Skip flush frames (within 100ms of previous = duplicate) - if pts_time - last_pts < 0.1: + if pts_time - last_pts < flush_window: log.debug("Skipping flush frame at pts=%.3f", pts_time) continue last_pts = pts_time global_ts = pts_time + offset + log.debug("Scene frame at pts=%.3f (global=%.3f)", pts_time, global_ts) self.on_raw_frame(jpeg_data, global_ts) ff.stop_proc(proc, timeout=3) diff --git a/cht/window.py b/cht/window.py index 0282f02..041d653 100644 --- a/cht/window.py +++ b/cht/window.py @@ -46,6 +46,7 @@ class ChtWindow(Adw.ApplicationWindow): self._pending_scrub_global = 0.0 self._scrub_pending = False # throttle flag for scrub updates self._telemetry = None + self._threshold_timeout_id = None # Core components self._timeline = Timeline() @@ -161,11 +162,23 @@ class ChtWindow(Adw.ApplicationWindow): ) def _on_scene_threshold(self, val): - if self._lifecycle.stream_mgr and not self._lifecycle.stream_mgr.readonly: - old = self._lifecycle.stream_mgr.scene_threshold - self._lifecycle.stream_mgr.update_scene_threshold(val) - if self._telemetry: - self._telemetry.event("scene_threshold_changed", {"from": old, "to": val}) + if not (self._lifecycle.stream_mgr and not self._lifecycle.stream_mgr.readonly): + return + if self._telemetry: + self._telemetry.event("scene_threshold_changed", + {"from": self._lifecycle.stream_mgr.scene_threshold, "to": val}) + # Debounce: wait 500ms after user stops dragging, then restart in background. + if self._threshold_timeout_id: + GLib.source_remove(self._threshold_timeout_id) + self._threshold_timeout_id = GLib.timeout_add(500, self._apply_threshold, val) + + def _apply_threshold(self, val): + self._threshold_timeout_id = None + mgr = self._lifecycle.stream_mgr + if mgr and not mgr.readonly: + Thread(target=mgr.update_scene_threshold, args=(val,), + daemon=True, name="threshold_update").start() + return False # don't repeat def _on_min_chunk_changed(self, panel, val): import cht.config @@ -247,6 +260,9 @@ class ChtWindow(Adw.ApplicationWindow): self._update_scrub_bar_manifest() self._populate_model_dropdown() + # Show "Continue" since there's an active session to resume + self._connect_btn.set_label("Continue") + # Load persisted agent conversation self._agent.load_from_session(mgr.session_dir) if self._agent.thread.messages: @@ -263,8 +279,11 @@ class ChtWindow(Adw.ApplicationWindow): audio_dir = mgr.audio_dir audio_dir.mkdir(parents=True, exist_ok=True) full_wav = audio_dir / "full.wav" + # Rust transport writes audio to a separate file (fMP4 has no audio track). + aac_path = mgr.stream_dir / "audio.aac" + source = aac_path if aac_path.exists() else segments[0] try: - ff.extract_audio_chunk(segments[0], full_wav) + ff.extract_audio_chunk(source, full_wav) self._waveform_engine.compute_full(full_wav) peaks = self._waveform_engine.peaks bucket_dur = self._waveform_engine.bucket_duration @@ -483,8 +502,18 @@ class ChtWindow(Adw.ApplicationWindow): if self._proxy_mgr: self._proxy_mgr.cancel() self._proxy_mgr = None - self._manifest = [] + self._connect_btn.set_label("Connect") + self._connect_btn.remove_css_class("destructive-action") + self._connect_btn.add_css_class("suggested-action") + + if reload_session and last_session_id: + # Transition to review mode — _load_session handles UI setup + self._load_session(last_session_id) + return + + # Full reset — only when not reloading + self._manifest = [] self._timeline.reset() self._timeline_controls.scrub_bar.set_manifest([]) self._monitor.reset() @@ -493,18 +522,10 @@ class ChtWindow(Adw.ApplicationWindow): self._transcriber.reset() self._agent.clear_history() self._known_frames = set() - self._frames_panel.clear() self._transcript_panel.clear() - - self._connect_btn.set_label("Connect") - self._connect_btn.remove_css_class("destructive-action") - self._connect_btn.add_css_class("suggested-action") self.set_title(APP_NAME) - if reload_session and last_session_id: - GLib.idle_add(self._load_session, last_session_id) - def _on_close(self, *args): self.teardown() diff --git a/media/server/src/session.rs b/media/server/src/session.rs index 69d9a03..6e4d522 100644 --- a/media/server/src/session.rs +++ b/media/server/src/session.rs @@ -109,8 +109,10 @@ impl Session { None }); - // Scene relay: Unix socket for Python scene detection. - let socket_path = stream_dir.join(SCENE_SOCKET_NAME); + // Scene relay: Unix socket at data/scene.sock (fixed path). + // Python always connects here — no need to discover per-session paths. + let data_dir = sessions_dir.parent().unwrap_or(sessions_dir); + let socket_path = data_dir.join(SCENE_SOCKET_NAME); let (scene_tx, scene_rx) = tokio::sync::mpsc::channel(32); info!("Scene relay: spawning for {}", socket_path.display()); tokio::spawn(scene_relay_task(socket_path, scene_rx)); @@ -169,8 +171,13 @@ impl Session { Ok(s) => info!("ffmpeg recorder exited: {s}"), Err(e) => warn!("ffmpeg recorder wait error: {e}"), } - // Clear the active session marker. - let _ = fs::remove_file(&self.active_session_file); + // Clear the active session marker only if it still points to our session. + // Another session may have overwritten it if the server restarted. + if let Ok(content) = fs::read_to_string(&self.active_session_file) { + if content.trim() == self.session_dir.to_str().unwrap_or("") { + let _ = fs::remove_file(&self.active_session_file); + } + } } }