diff --git a/cht/stream/processor.py b/cht/stream/processor.py index c50a0ce..5c717e2 100644 --- a/cht/stream/processor.py +++ b/cht/stream/processor.py @@ -167,6 +167,64 @@ class SessionProcessor: self.on_raw_frame(jpeg_bytes, local_ts + offset) + def _extract_scene_frame(self, rec_ts, global_ts): + """Extract a frame from the recording at a specific timestamp. + + Called from the scene detector when showinfo fires. The timestamp + has already been corrected for the offset between the detector's + PTS and the recording's timeline. + + The fMP4 file lags ~2s behind real-time due to fragment boundaries. + If the target timestamp isn't available yet, retry briefly. + """ + seg = self._get_recording_path() if self._get_recording_path else None + if not seg or not seg.exists(): + return + + import tempfile, os as _os + + for attempt in range(4): # up to ~3s of waiting + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: + tmp_path = Path(tmp.name) + try: + ff.extract_frame_at(seg, tmp_path, rec_ts) + if tmp_path.exists() and tmp_path.stat().st_size > 0: + jpeg_bytes = tmp_path.read_bytes() + log.info("Scene frame: rec_ts=%.3f global_ts=%.3f (attempt %d)", + rec_ts, global_ts, attempt) + self.on_raw_frame(jpeg_bytes, global_ts) + return + except Exception: + pass + finally: + try: + _os.unlink(tmp_path) + except Exception: + pass + # Recording file not ready yet — wait for fragments to flush. + if attempt < 3: + time.sleep(1.0) + + log.warning("Scene extract gave up at rec_ts=%.3f after retries", rec_ts) + + def _wall_clock_offset(self): + """Seconds elapsed since session start, using wall clock. + + The session dir name is the start time in YYYYmmdd_HHMMSS format. + This avoids fMP4 probe lag which underestimates by ~2s. + """ + from datetime import datetime + try: + session_name = self.session_dir.name # e.g. "20260410_020644" + start_time = datetime.strptime(session_name, "%Y%m%d_%H%M%S") + elapsed = (datetime.now() - start_time).total_seconds() + return max(0.0, elapsed) + except Exception as e: + log.warning("Could not compute wall-clock offset: %s", e) + # Fall back to fMP4 probe. + seg = self._get_recording_path() if self._get_recording_path else None + return self._probe_safe_duration(seg) if seg and seg.exists() else 0.0 + def restart_scene_detector(self, threshold): """Restart scene detector with a new threshold. @@ -295,9 +353,22 @@ class SessionProcessor: stdin_t = Thread(target=_feed_stdin, daemon=True, name="scene_stdin") stdin_t.start() - # Thread: ffmpeg stderr → parse showinfo timestamps → queue - ts_queue = Queue() - offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 + # Compute time offset: detector PTS starts from 0 when it connects, + # but the recording has been running since session start. + # recording_ts = detector_pts + pts_offset + # + # Use wall-clock time for accurate offset. The fMP4 file lags behind + # by ~2s due to fragment boundaries, so we can't extract at rec_ts + # immediately — _extract_scene_frame handles this by retrying. + pts_offset = self._wall_clock_offset() + global_offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 + log.info("Scene detector: pts_offset=%.1f (wall-clock seconds since session start)", + pts_offset) + + # Stderr thread: parse showinfo timestamps, apply flush dedup, + # extract frame from recording at corrected timestamp. + flush_window = (SCENE_FLUSH_FRAMES + 1) / 30.0 + last_pts = [0.0] # mutable for thread def _read_stderr(): for raw in proc.stderr: @@ -307,7 +378,14 @@ class SessionProcessor: if "showinfo" in line: pts_match = re.search(r"pts_time:\s*([\d.]+)", line) if pts_match: - ts_queue.put(float(pts_match.group(1))) + pts_time = float(pts_match.group(1)) + if pts_time - last_pts[0] < flush_window: + log.debug("Skipping flush frame at pts=%.3f", pts_time) + continue + last_pts[0] = pts_time + # Extract frame from recording at corrected timestamp. + rec_ts = pts_time + pts_offset + self._extract_scene_frame(rec_ts, rec_ts + global_offset) elif line.startswith("[") or "error" in line.lower() or "warning" in line.lower(): log.debug("[scene] %s", line) log.debug("[scene] stderr closed") @@ -315,46 +393,14 @@ class SessionProcessor: stderr_t = Thread(target=_read_stderr, daemon=True, name="scene_stderr") stderr_t.start() - # Main: read JPEG frames from stdout, pair with stderr timestamps, - # skip flush frames. Same proven pattern as StreamRecorder._read_stdout. - flush_window = (SCENE_FLUSH_FRAMES + 1) / 30.0 - last_pts = 0.0 - buf = b"" + # Main: drain stdout to prevent ffmpeg from stalling. + # We don't use the JPEG data — frames come from the recording. raw_fd = proc.stdout.fileno() - while True: - chunk = os.read(raw_fd, 65536) - if not chunk: - break - buf += chunk - while True: - soi = buf.find(b"\xff\xd8") - if soi < 0: - buf = b"" - break - eoi = buf.find(b"\xff\xd9", soi + 2) - if eoi < 0: - buf = buf[soi:] - break - jpeg_data = buf[soi:eoi + 2] - buf = buf[eoi + 2:] - - try: - pts_time = ts_queue.get(timeout=2.0) - except Empty: - log.warning("No timestamp for scene frame, using 0") - pts_time = 0.0 - - if pts_time - last_pts < flush_window: - log.debug("Skipping flush frame at pts=%.3f", pts_time) - continue - last_pts = pts_time - - global_ts = pts_time + offset - log.debug("Scene frame at pts=%.3f (global=%.3f)", pts_time, global_ts) - self.on_raw_frame(jpeg_data, global_ts) + while os.read(raw_fd, 65536): + pass ff.stop_proc(proc, timeout=3) - log.info("Scene detector: ffmpeg exited (last_pts=%.1f)", last_pts) + log.info("Scene detector: ffmpeg exited (last_pts=%.1f)", last_pts[0]) def start_audio_extractor(self, on_new_audio=None): """Periodically extract audio from the growing fMP4 as WAV chunks.""" diff --git a/media/client/src/backends/subprocess.rs b/media/client/src/backends/subprocess.rs index ff839f4..8b5450d 100644 --- a/media/client/src/backends/subprocess.rs +++ b/media/client/src/backends/subprocess.rs @@ -66,6 +66,25 @@ pub fn run( // Keep stdout alive for the duration of demuxing. let _stdout_guard = stdout; + // Watch for stop flag on a separate thread and kill ffmpeg to unblock + // the packet iterator (which is a blocking read on the pipe fd). + let stop_watcher = stop.clone(); + let child_pid = child.id(); + std::thread::Builder::new() + .name("ffmpeg-stop-watcher".into()) + .spawn(move || { + while !stop_watcher.load(Ordering::Relaxed) { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + // Send SIGINT to ffmpeg so it flushes and closes stdout, + // which unblocks the packet iterator in demux_and_send. + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + let _ = kill(Pid::from_raw(child_pid as i32), Signal::SIGINT); + info!("Stop watcher: sent SIGINT to ffmpeg pid={child_pid}"); + }) + .expect("spawn stop watcher"); + let result = demux_and_send(fd, packet_tx, stop, &mut child); // Clean up subprocess regardless of result.