"""SessionProcessor: processes raw frame data and audio from recordings. Receives raw JPEG frames from StreamRecorder (via on_raw_frame callback) and handles all frame processing: file writing, frame index, GUI callbacks. Also extracts audio from fMP4 files by polling (latency-insensitive). The boundary with StreamRecorder: Recorder: reads pipe → fires on_raw_frame(jpeg_bytes, global_ts) Processor: writes JPEG to disk, updates index, fires on_new_frames to GUI When Rust owns transport, SessionProcessor connects to the server's Unix domain socket (scene.sock) for a live H.264 stream, pipes it to ffmpeg for GPU scene detection. Continuous stream — no polling, no restarts. """ import json import logging import os import re import socket import time from pathlib import Path from queue import Queue, Empty from threading import Thread, Event from cht.config import ( AUDIO_EXTRACT_INTERVAL, AUDIO_SAFETY_MARGIN, SCENE_THRESHOLD, SCENE_FLUSH_FRAMES, ) from cht.stream import ffmpeg as ff log = logging.getLogger(__name__) class SessionProcessor: """Writes scene frames to disk and extracts audio from fMP4.""" def __init__(self, session_dir: Path): self.session_dir = session_dir self.frames_dir = session_dir / "frames" self.audio_dir = session_dir / "audio" self._stop_event = Event() self._threads: dict[str, Thread] = {} self._on_new_frames = None self._on_new_audio = None self._last_scene_capture = 0.0 self._get_recording_path = None self._get_current_global_offset = None self._telemetry = None def attach(self, get_recording_path, get_current_global_offset): """Wire up callbacks to query the recorder's current state.""" self._get_recording_path = get_recording_path self._get_current_global_offset = get_current_global_offset # -- Scene frame handling (called from recorder's pipe thread) -- def on_raw_frame(self, jpeg_bytes: bytes, global_ts: float): """Receive a raw JPEG frame from the recorder pipe. Write and index it.""" frame_num = self._next_frame_number() frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" frame_path.write_bytes(jpeg_bytes) entry = { "id": frame_id, "timestamp": global_ts, "path": str(frame_path), "sent_to_agent": False, } self._append_frame_index(entry) log.info("Scene frame: %s at %.1fs", frame_id, global_ts) if self._telemetry: self._telemetry.metric("scene_frame", { "id": frame_id, "global_ts": global_ts, }) if self._on_new_frames: self._on_new_frames([entry]) def set_on_new_frames(self, cb): self._on_new_frames = cb # -- On-demand capture (recorder extracts bytes, processor indexes) -- def on_captured_frame(self, jpeg_bytes: bytes, global_ts: float): """Receive a manually captured frame. Write and index it.""" self.on_raw_frame(jpeg_bytes, global_ts) def capture_now_from_file(self): """Extract the current frame from the growing fMP4 (Rust transport mode).""" import tempfile, os as _os def _capture(): seg = self._get_recording_path() if self._get_recording_path else None if not seg or not seg.exists(): log.warning("capture_now: no recording file") return try: import subprocess result = subprocess.run( ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(seg)], capture_output=True, text=True, ) duration = float(result.stdout.strip()) except Exception as e: log.warning("capture_now: could not probe duration: %s", e) return if duration < 1: log.warning("capture_now: recording too short") return timestamp = max(0, duration - 0.5) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp_path = Path(tmp.name) try: ff.extract_frame_at(seg, tmp_path, timestamp) if not tmp_path.exists(): log.warning("capture_now: frame not written") return jpeg_bytes = tmp_path.read_bytes() except Exception as e: log.error("capture_now failed: %s", e) return finally: try: _os.unlink(tmp_path) except Exception: pass offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 self.on_raw_frame(jpeg_bytes, timestamp + offset) Thread(target=_capture, daemon=True, name="capture_now").start() def _capture_current_frame(self): """Capture a fresh frame from the recording file's current tip. Called when scene detection triggers. The scene filter's own JPEG is stale (buffered in the encoder), so we extract directly from the fMP4 which is always near-current. """ seg = self._get_recording_path() if self._get_recording_path else None if not seg or not seg.exists(): return duration = self._probe_safe_duration(seg) if not duration or duration < 0.5: return local_ts = max(0, duration - 0.3) offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 import tempfile, os as _os with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp_path = Path(tmp.name) try: ff.extract_frame_at(seg, tmp_path, local_ts) if not tmp_path.exists() or tmp_path.stat().st_size == 0: return jpeg_bytes = tmp_path.read_bytes() except Exception as e: log.debug("Scene capture failed: %s", e) return finally: try: _os.unlink(tmp_path) except Exception: pass self.on_raw_frame(jpeg_bytes, local_ts + offset) def _extract_scene_frame(self, rec_ts, global_ts): """Extract a frame from the recording at a specific timestamp. Called from the scene detector when showinfo fires. The timestamp has already been corrected for the offset between the detector's PTS and the recording's timeline. The fMP4 file lags ~2s behind real-time due to fragment boundaries. If the target timestamp isn't available yet, retry briefly. """ seg = self._get_recording_path() if self._get_recording_path else None if not seg or not seg.exists(): return import tempfile, os as _os for attempt in range(4): # up to ~3s of waiting with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp_path = Path(tmp.name) try: ff.extract_frame_at(seg, tmp_path, rec_ts) if tmp_path.exists() and tmp_path.stat().st_size > 0: jpeg_bytes = tmp_path.read_bytes() log.info("Scene frame: rec_ts=%.3f global_ts=%.3f (attempt %d)", rec_ts, global_ts, attempt) self.on_raw_frame(jpeg_bytes, global_ts) return except Exception: pass finally: try: _os.unlink(tmp_path) except Exception: pass # Recording file not ready yet — wait for fragments to flush. if attempt < 3: time.sleep(1.0) log.warning("Scene extract gave up at rec_ts=%.3f after retries", rec_ts) def _wall_clock_offset(self): """Seconds elapsed since session start, using wall clock. The session dir name is the start time in YYYYmmdd_HHMMSS format. This avoids fMP4 probe lag which underestimates by ~2s. """ from datetime import datetime try: session_name = self.session_dir.name # e.g. "20260410_020644" start_time = datetime.strptime(session_name, "%Y%m%d_%H%M%S") elapsed = (datetime.now() - start_time).total_seconds() return max(0.0, elapsed) except Exception as e: log.warning("Could not compute wall-clock offset: %s", e) # Fall back to fMP4 probe. seg = self._get_recording_path() if self._get_recording_path else None return self._probe_safe_duration(seg) if seg and seg.exists() else 0.0 def restart_scene_detector(self, threshold): """Restart scene detector with a new threshold. Kills the running ffmpeg — the detector thread reconnects automatically and picks up the new threshold on the next call to start_scene_detector. """ if "scene_detector" in self._procs: ff.stop_proc(self._procs.pop("scene_detector"), timeout=2) # Spawn a fresh thread with the new threshold; old thread will exit # when its ffmpeg proc dies. self.start_scene_detector(threshold=threshold) # -- Frame index -- @property def frame_count(self) -> int: index_path = self.frames_dir / "index.json" if index_path.exists(): try: return len(json.loads(index_path.read_text())) except Exception: pass return 0 def _next_frame_number(self) -> int: index_path = self.frames_dir / "index.json" if index_path.exists(): try: return len(json.loads(index_path.read_text())) + 1 except Exception: pass return 1 def _append_frame_index(self, entry: dict): index_path = self.frames_dir / "index.json" index = json.loads(index_path.read_text()) if index_path.exists() else [] index.append(entry) index_path.write_text(json.dumps(index, indent=2)) # -- Scene detection via Unix socket (Rust transport mode) -- def start_scene_detector(self, threshold=None): """Connect to Rust server's scene socket and run GPU scene detection. The server provides a live H.264 stream via a Unix domain socket at stream/scene.sock. We pipe it to ffmpeg for CUDA scene detection — continuous stream, no polling, no restarts. """ threshold = threshold or SCENE_THRESHOLD t = Thread(target=self._scene_detect_loop, daemon=True, name="scene_detector", args=(threshold,)) t.start() self._threads["scene_detector"] = t def _scene_detect_loop(self, threshold): """Connect to scene socket, pipe H.264 to ffmpeg, read scene frames. Retries on failure (e.g. ffmpeg dies from bad initial frames). The server buffers the latest keyframe so reconnects start clean. """ from cht.config import DATA_DIR socket_path = DATA_DIR / "scene.sock" # Wait for the socket to appear (server creates it on session start). while not self._stop_event.is_set(): if socket_path.exists(): break time.sleep(0.5) if self._stop_event.is_set(): return while not self._stop_event.is_set(): try: self._run_scene_session(socket_path, threshold) except Exception: log.exception("Scene detector error") if self._stop_event.is_set(): break # If the socket is gone, the session ended — don't retry. if not socket_path.exists(): log.info("Scene detector: socket gone, session ended") break log.info("Scene detector: reconnecting in 2s...") self._stop_event.wait(timeout=2.0) log.info("Scene detector stopped") def _run_scene_session(self, socket_path, threshold): """Single scene detection session: connect, run ffmpeg, read frames.""" log.info("Scene detector: connecting to %s", socket_path) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: sock.connect(str(socket_path)) except OSError as e: log.debug("Scene detector: connect failed: %s", e) return log.info("Scene detector: connected, starting ffmpeg") node = ff.detect_scenes_from_pipe( scene_threshold=threshold, flush_frames=SCENE_FLUSH_FRAMES, ) proc = ff.run_async(node, pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) self._procs = getattr(self, "_procs", {}) self._procs["scene_detector"] = proc # Thread: socket → ffmpeg stdin def _feed_stdin(): try: while not self._stop_event.is_set(): data = sock.recv(65536) if not data: break try: proc.stdin.write(data) proc.stdin.flush() except (BrokenPipeError, OSError): break finally: try: proc.stdin.close() except OSError: pass sock.close() log.debug("Scene detector: stdin feeder stopped") stdin_t = Thread(target=_feed_stdin, daemon=True, name="scene_stdin") stdin_t.start() # Compute time offset: detector PTS starts from 0 when it connects, # but the recording has been running since session start. # recording_ts = detector_pts + pts_offset # # Use wall-clock time for accurate offset. The fMP4 file lags behind # by ~2s due to fragment boundaries, so we can't extract at rec_ts # immediately — _extract_scene_frame handles this by retrying. pts_offset = self._wall_clock_offset() global_offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 log.info("Scene detector: pts_offset=%.1f (wall-clock seconds since session start)", pts_offset) # Stderr thread: parse showinfo timestamps, apply flush dedup, # extract frame from recording at corrected timestamp. flush_window = (SCENE_FLUSH_FRAMES + 1) / 30.0 last_pts = [0.0] # mutable for thread def _read_stderr(): for raw in proc.stderr: line = raw.decode("utf-8", errors="replace").rstrip() if not line: continue if "showinfo" in line: pts_match = re.search(r"pts_time:\s*([\d.]+)", line) if pts_match: pts_time = float(pts_match.group(1)) if pts_time - last_pts[0] < flush_window: log.debug("Skipping flush frame at pts=%.3f", pts_time) continue last_pts[0] = pts_time # Extract frame from recording at corrected timestamp. rec_ts = pts_time + pts_offset self._extract_scene_frame(rec_ts, rec_ts + global_offset) elif line.startswith("[") or "error" in line.lower() or "warning" in line.lower(): log.debug("[scene] %s", line) log.debug("[scene] stderr closed") stderr_t = Thread(target=_read_stderr, daemon=True, name="scene_stderr") stderr_t.start() # Main: drain stdout to prevent ffmpeg from stalling. # We don't use the JPEG data — frames come from the recording. raw_fd = proc.stdout.fileno() while os.read(raw_fd, 65536): pass ff.stop_proc(proc, timeout=3) log.info("Scene detector: ffmpeg exited (last_pts=%.1f)", last_pts[0]) def start_audio_extractor(self, on_new_audio=None): """Periodically extract audio from the growing fMP4 as WAV chunks.""" self._on_new_audio = on_new_audio self.audio_dir.mkdir(parents=True, exist_ok=True) t = Thread(target=self._audio_loop, daemon=True, name="audio_extractor") t.start() self._threads["audio_extractor"] = t def stop(self): self._stop_event.set() for name, proc in getattr(self, "_procs", {}).items(): ff.stop_proc(proc, timeout=3) self._procs = {} # Join all threads so caller knows they're done before starting a new session for name, t in list(self._threads.items()): t.join(timeout=5) if t.is_alive(): log.warning("Thread %s still alive after stop timeout", name) self._threads.clear() def _has_audio_stream(self, seg: Path) -> bool: try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(seg)) return any(s.get("codec_type") == "audio" for s in info.get("streams", [])) except Exception: return False def _find_audio_source(self): """Find audio source: fMP4 with audio track, or standalone audio.aac from Rust server.""" seg = self._get_recording_path() if self._get_recording_path else None if seg and seg.exists() and self._has_audio_stream(seg): return seg # Rust server writes raw AAC alongside the fMP4 stream_dir = self.session_dir / "stream" aac_path = stream_dir / "audio.aac" if aac_path.exists() and aac_path.stat().st_size > 100: return aac_path return None def _audio_loop(self): processed_time = 0.0 chunk_num = 0 current_source = None while not self._stop_event.wait(timeout=AUDIO_EXTRACT_INTERVAL): source = self._find_audio_source() if not source: continue if source != current_source: current_source = source processed_time = 0.0 chunk_num = 0 log.info("Audio extractor: using %s", source.name) if source.stat().st_size < 100_000: continue safe_duration = self._probe_safe_duration(source) if safe_duration is None or safe_duration <= 0: continue # Raw AAC files (from Rust server) have no reliable duration metadata. # ffprobe wildly overestimates (e.g. 1569s for a 50s session). # Cap to wall-clock elapsed time as a sanity bound. wall_elapsed = self._wall_clock_offset() if wall_elapsed > 0 and safe_duration > wall_elapsed * 1.5: log.debug("Audio: capping probed duration %.1fs to wall-clock %.1fs", safe_duration, wall_elapsed) safe_duration = wall_elapsed # Fail-safe: processed_time can accumulate past the file if the # source was recreated (e.g. server restarted same session). if processed_time > safe_duration: log.warning( "Audio extractor: processed_time %.1fs > file duration %.1fs — resetting", processed_time, safe_duration, ) processed_time = 0.0 chunk_num = 0 process_to = safe_duration - AUDIO_SAFETY_MARGIN if process_to <= processed_time + 1.0: continue chunk_duration = process_to - processed_time wav_path = self.audio_dir / f"chunk_{chunk_num:04d}.wav" try: ff.extract_audio_chunk(source, wav_path, start_time=processed_time, duration=chunk_duration) except Exception as e: log.error("Audio extraction failed: %s", e) continue if wav_path.exists() and wav_path.stat().st_size > 100: offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0 global_start = processed_time + offset log.info("Audio chunk: %s (%.1fs → %.1fs, global %.1fs)", wav_path.name, processed_time, process_to, global_start) if self._telemetry: self._telemetry.metric("audio_chunk", { "chunk": chunk_num, "start": processed_time, "end": process_to, "global_start": global_start, }) if self._on_new_audio: try: self._on_new_audio( wav_path, global_start, chunk_duration, segment_path=source, local_start=processed_time, ) except Exception as e: log.error("Audio callback failed: %s", e) chunk_num += 1 processed_time = process_to log.info("Audio extractor stopped") def _probe_safe_duration(self, seg: Path): try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(seg)) dur = float(info.get("format", {}).get("duration", 0)) if dur > 0: return dur for stream in info.get("streams", []): sdur = float(stream.get("duration", 0)) if sdur > 0: return sdur except Exception: pass try: return seg.stat().st_size / 65_000 except Exception: return None