good checkpoint
This commit is contained in:
@@ -167,6 +167,64 @@ class SessionProcessor:
|
|||||||
|
|
||||||
self.on_raw_frame(jpeg_bytes, local_ts + offset)
|
self.on_raw_frame(jpeg_bytes, local_ts + offset)
|
||||||
|
|
||||||
|
def _extract_scene_frame(self, rec_ts, global_ts):
|
||||||
|
"""Extract a frame from the recording at a specific timestamp.
|
||||||
|
|
||||||
|
Called from the scene detector when showinfo fires. The timestamp
|
||||||
|
has already been corrected for the offset between the detector's
|
||||||
|
PTS and the recording's timeline.
|
||||||
|
|
||||||
|
The fMP4 file lags ~2s behind real-time due to fragment boundaries.
|
||||||
|
If the target timestamp isn't available yet, retry briefly.
|
||||||
|
"""
|
||||||
|
seg = self._get_recording_path() if self._get_recording_path else None
|
||||||
|
if not seg or not seg.exists():
|
||||||
|
return
|
||||||
|
|
||||||
|
import tempfile, os as _os
|
||||||
|
|
||||||
|
for attempt in range(4): # up to ~3s of waiting
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
|
||||||
|
tmp_path = Path(tmp.name)
|
||||||
|
try:
|
||||||
|
ff.extract_frame_at(seg, tmp_path, rec_ts)
|
||||||
|
if tmp_path.exists() and tmp_path.stat().st_size > 0:
|
||||||
|
jpeg_bytes = tmp_path.read_bytes()
|
||||||
|
log.info("Scene frame: rec_ts=%.3f global_ts=%.3f (attempt %d)",
|
||||||
|
rec_ts, global_ts, attempt)
|
||||||
|
self.on_raw_frame(jpeg_bytes, global_ts)
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
_os.unlink(tmp_path)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# Recording file not ready yet — wait for fragments to flush.
|
||||||
|
if attempt < 3:
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
|
log.warning("Scene extract gave up at rec_ts=%.3f after retries", rec_ts)
|
||||||
|
|
||||||
|
def _wall_clock_offset(self):
|
||||||
|
"""Seconds elapsed since session start, using wall clock.
|
||||||
|
|
||||||
|
The session dir name is the start time in YYYYmmdd_HHMMSS format.
|
||||||
|
This avoids fMP4 probe lag which underestimates by ~2s.
|
||||||
|
"""
|
||||||
|
from datetime import datetime
|
||||||
|
try:
|
||||||
|
session_name = self.session_dir.name # e.g. "20260410_020644"
|
||||||
|
start_time = datetime.strptime(session_name, "%Y%m%d_%H%M%S")
|
||||||
|
elapsed = (datetime.now() - start_time).total_seconds()
|
||||||
|
return max(0.0, elapsed)
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("Could not compute wall-clock offset: %s", e)
|
||||||
|
# Fall back to fMP4 probe.
|
||||||
|
seg = self._get_recording_path() if self._get_recording_path else None
|
||||||
|
return self._probe_safe_duration(seg) if seg and seg.exists() else 0.0
|
||||||
|
|
||||||
def restart_scene_detector(self, threshold):
|
def restart_scene_detector(self, threshold):
|
||||||
"""Restart scene detector with a new threshold.
|
"""Restart scene detector with a new threshold.
|
||||||
|
|
||||||
@@ -295,9 +353,22 @@ class SessionProcessor:
|
|||||||
stdin_t = Thread(target=_feed_stdin, daemon=True, name="scene_stdin")
|
stdin_t = Thread(target=_feed_stdin, daemon=True, name="scene_stdin")
|
||||||
stdin_t.start()
|
stdin_t.start()
|
||||||
|
|
||||||
# Thread: ffmpeg stderr → parse showinfo timestamps → queue
|
# Compute time offset: detector PTS starts from 0 when it connects,
|
||||||
ts_queue = Queue()
|
# but the recording has been running since session start.
|
||||||
offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0
|
# recording_ts = detector_pts + pts_offset
|
||||||
|
#
|
||||||
|
# Use wall-clock time for accurate offset. The fMP4 file lags behind
|
||||||
|
# by ~2s due to fragment boundaries, so we can't extract at rec_ts
|
||||||
|
# immediately — _extract_scene_frame handles this by retrying.
|
||||||
|
pts_offset = self._wall_clock_offset()
|
||||||
|
global_offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0
|
||||||
|
log.info("Scene detector: pts_offset=%.1f (wall-clock seconds since session start)",
|
||||||
|
pts_offset)
|
||||||
|
|
||||||
|
# Stderr thread: parse showinfo timestamps, apply flush dedup,
|
||||||
|
# extract frame from recording at corrected timestamp.
|
||||||
|
flush_window = (SCENE_FLUSH_FRAMES + 1) / 30.0
|
||||||
|
last_pts = [0.0] # mutable for thread
|
||||||
|
|
||||||
def _read_stderr():
|
def _read_stderr():
|
||||||
for raw in proc.stderr:
|
for raw in proc.stderr:
|
||||||
@@ -307,7 +378,14 @@ class SessionProcessor:
|
|||||||
if "showinfo" in line:
|
if "showinfo" in line:
|
||||||
pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
|
pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
|
||||||
if pts_match:
|
if pts_match:
|
||||||
ts_queue.put(float(pts_match.group(1)))
|
pts_time = float(pts_match.group(1))
|
||||||
|
if pts_time - last_pts[0] < flush_window:
|
||||||
|
log.debug("Skipping flush frame at pts=%.3f", pts_time)
|
||||||
|
continue
|
||||||
|
last_pts[0] = pts_time
|
||||||
|
# Extract frame from recording at corrected timestamp.
|
||||||
|
rec_ts = pts_time + pts_offset
|
||||||
|
self._extract_scene_frame(rec_ts, rec_ts + global_offset)
|
||||||
elif line.startswith("[") or "error" in line.lower() or "warning" in line.lower():
|
elif line.startswith("[") or "error" in line.lower() or "warning" in line.lower():
|
||||||
log.debug("[scene] %s", line)
|
log.debug("[scene] %s", line)
|
||||||
log.debug("[scene] stderr closed")
|
log.debug("[scene] stderr closed")
|
||||||
@@ -315,46 +393,14 @@ class SessionProcessor:
|
|||||||
stderr_t = Thread(target=_read_stderr, daemon=True, name="scene_stderr")
|
stderr_t = Thread(target=_read_stderr, daemon=True, name="scene_stderr")
|
||||||
stderr_t.start()
|
stderr_t.start()
|
||||||
|
|
||||||
# Main: read JPEG frames from stdout, pair with stderr timestamps,
|
# Main: drain stdout to prevent ffmpeg from stalling.
|
||||||
# skip flush frames. Same proven pattern as StreamRecorder._read_stdout.
|
# We don't use the JPEG data — frames come from the recording.
|
||||||
flush_window = (SCENE_FLUSH_FRAMES + 1) / 30.0
|
|
||||||
last_pts = 0.0
|
|
||||||
buf = b""
|
|
||||||
raw_fd = proc.stdout.fileno()
|
raw_fd = proc.stdout.fileno()
|
||||||
while True:
|
while os.read(raw_fd, 65536):
|
||||||
chunk = os.read(raw_fd, 65536)
|
pass
|
||||||
if not chunk:
|
|
||||||
break
|
|
||||||
buf += chunk
|
|
||||||
while True:
|
|
||||||
soi = buf.find(b"\xff\xd8")
|
|
||||||
if soi < 0:
|
|
||||||
buf = b""
|
|
||||||
break
|
|
||||||
eoi = buf.find(b"\xff\xd9", soi + 2)
|
|
||||||
if eoi < 0:
|
|
||||||
buf = buf[soi:]
|
|
||||||
break
|
|
||||||
jpeg_data = buf[soi:eoi + 2]
|
|
||||||
buf = buf[eoi + 2:]
|
|
||||||
|
|
||||||
try:
|
|
||||||
pts_time = ts_queue.get(timeout=2.0)
|
|
||||||
except Empty:
|
|
||||||
log.warning("No timestamp for scene frame, using 0")
|
|
||||||
pts_time = 0.0
|
|
||||||
|
|
||||||
if pts_time - last_pts < flush_window:
|
|
||||||
log.debug("Skipping flush frame at pts=%.3f", pts_time)
|
|
||||||
continue
|
|
||||||
last_pts = pts_time
|
|
||||||
|
|
||||||
global_ts = pts_time + offset
|
|
||||||
log.debug("Scene frame at pts=%.3f (global=%.3f)", pts_time, global_ts)
|
|
||||||
self.on_raw_frame(jpeg_data, global_ts)
|
|
||||||
|
|
||||||
ff.stop_proc(proc, timeout=3)
|
ff.stop_proc(proc, timeout=3)
|
||||||
log.info("Scene detector: ffmpeg exited (last_pts=%.1f)", last_pts)
|
log.info("Scene detector: ffmpeg exited (last_pts=%.1f)", last_pts[0])
|
||||||
|
|
||||||
def start_audio_extractor(self, on_new_audio=None):
|
def start_audio_extractor(self, on_new_audio=None):
|
||||||
"""Periodically extract audio from the growing fMP4 as WAV chunks."""
|
"""Periodically extract audio from the growing fMP4 as WAV chunks."""
|
||||||
|
|||||||
@@ -66,6 +66,25 @@ pub fn run(
|
|||||||
// Keep stdout alive for the duration of demuxing.
|
// Keep stdout alive for the duration of demuxing.
|
||||||
let _stdout_guard = stdout;
|
let _stdout_guard = stdout;
|
||||||
|
|
||||||
|
// Watch for stop flag on a separate thread and kill ffmpeg to unblock
|
||||||
|
// the packet iterator (which is a blocking read on the pipe fd).
|
||||||
|
let stop_watcher = stop.clone();
|
||||||
|
let child_pid = child.id();
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name("ffmpeg-stop-watcher".into())
|
||||||
|
.spawn(move || {
|
||||||
|
while !stop_watcher.load(Ordering::Relaxed) {
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
}
|
||||||
|
// Send SIGINT to ffmpeg so it flushes and closes stdout,
|
||||||
|
// which unblocks the packet iterator in demux_and_send.
|
||||||
|
use nix::sys::signal::{kill, Signal};
|
||||||
|
use nix::unistd::Pid;
|
||||||
|
let _ = kill(Pid::from_raw(child_pid as i32), Signal::SIGINT);
|
||||||
|
info!("Stop watcher: sent SIGINT to ffmpeg pid={child_pid}");
|
||||||
|
})
|
||||||
|
.expect("spawn stop watcher");
|
||||||
|
|
||||||
let result = demux_and_send(fd, packet_tx, stop, &mut child);
|
let result = demux_and_send(fd, packet_tx, stop, &mut child);
|
||||||
|
|
||||||
// Clean up subprocess regardless of result.
|
// Clean up subprocess regardless of result.
|
||||||
|
|||||||
Reference in New Issue
Block a user