""" StreamManager: orchestrates ffmpeg for recording and scene detection. Architecture: sender → TCP:4444 → single ffmpeg process: 1. writes fMP4 to disk (c=copy) 2. relays UDP for live display (c=copy) 3. CUDA decode → scene filter → JPEG frames (real-time) """ import json import logging import re import time from queue import Queue, Empty from threading import Thread from cht.config import ( STREAM_HOST, STREAM_PORT, RELAY_PORT, SCENE_THRESHOLD, SESSIONS_DIR, AUDIO_EXTRACT_INTERVAL, AUDIO_SAFETY_MARGIN, ) from cht.stream import ffmpeg as ff log = logging.getLogger(__name__) def list_sessions(): """Return list of (session_id, session_dir) sorted newest first.""" if not SESSIONS_DIR.exists(): return [] sessions = [] for d in sorted(SESSIONS_DIR.iterdir(), reverse=True): if d.is_dir() and (d / "frames").exists(): sessions.append((d.name, d)) return sessions def delete_sessions(session_ids): """Delete session directories by ID.""" import shutil for sid in session_ids: path = SESSIONS_DIR / sid if path.exists() and path.is_dir(): shutil.rmtree(path) log.info("Deleted session: %s", sid) class StreamManager: def __init__(self, session_id=None): if session_id is None: session_id = time.strftime("%Y%m%d_%H%M%S") self.session_id = session_id self.session_dir = SESSIONS_DIR / session_id self.stream_dir = self.session_dir / "stream" self.frames_dir = self.session_dir / "frames" self.transcript_dir = self.session_dir / "transcript" self.audio_dir = self.session_dir / "audio" self.agent_dir = self.session_dir / "agent" self._procs = {} self._threads = {} self._stop_flags = set() self._segment = 0 self._segment_offsets = {0: 0.0} # segment_index → global_offset self.scene_threshold = SCENE_THRESHOLD self.readonly = False # True when loaded from existing session self.telemetry = None # set by window after start log.info("Session: %s", session_id) @classmethod def from_existing(cls, session_id): """Load an existing session without starting any ffmpeg processes.""" from cht.session import rebuild_manifest mgr = cls(session_id=session_id) if not mgr.session_dir.exists(): raise FileNotFoundError(f"Session not found: {session_id}") mgr.readonly = True # Point _segment to last recording segment segments = mgr.recording_segments if segments: mgr._segment = len(segments) - 1 mgr._rebuild_offsets() rebuild_manifest(mgr.session_dir) log.info("Loaded existing session: %s (%d segments, %d frames)", session_id, len(segments), mgr.frame_count) return mgr @property def current_global_offset(self) -> float: """Global time offset for the current recording segment.""" return self._segment_offsets.get(self._segment, 0.0) def _rebuild_offsets(self): """Compute global offsets from all segments on disk.""" from cht.session import probe_duration offset = 0.0 self._segment_offsets = {} for i, seg in enumerate(self.recording_segments): self._segment_offsets[i] = offset offset += probe_duration(seg) def _advance_segment_offset(self, completed_segment_path): """Update offsets after a segment completes and a new one begins.""" from cht.session import probe_duration dur = probe_duration(completed_segment_path) prev_offset = self._segment_offsets.get(self._segment, 0.0) self._segment_offsets[self._segment + 1] = prev_offset + dur log.info("Segment %d completed (%.1fs), next offset: %.1fs", self._segment, dur, prev_offset + dur) @property def frame_count(self): index_path = self.frames_dir / "index.json" if index_path.exists(): try: return len(json.loads(index_path.read_text())) except Exception: pass return 0 def total_duration(self): """Probe total duration across all segments (for completed sessions).""" total = 0.0 for seg in self.recording_segments: try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(seg)) dur = float(info.get("format", {}).get("duration", 0)) if dur <= 0: for s in info.get("streams", []): sdur = float(s.get("duration", 0)) if sdur > 0: dur = sdur break if dur <= 0: dur = seg.stat().st_size / 65_000 total += dur except Exception: total += seg.stat().st_size / 65_000 return total def setup_dirs(self): for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.audio_dir, self.agent_dir): d.mkdir(parents=True, exist_ok=True) @property def stream_url(self): return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" @property def relay_url(self): return f"udp://127.0.0.1:{RELAY_PORT}" @property def recording_path(self): """Current recording segment path.""" return self.stream_dir / f"recording_{self._segment:03d}.mp4" @property def recording_segments(self): """All recording segments in order.""" return sorted(self.stream_dir.glob("recording_*.mp4")) # -- Recording -- def start_recorder(self): """Start ffmpeg to receive TCP stream, write to fMP4, and relay to UDP.""" # Start after existing segments (for resumed sessions) existing = self.recording_segments self._segment = len(existing) self._rebuild_offsets() self._launch_recorder() def restart_recorder(self): """Restart recorder into a new segment. Session stays alive.""" old = self._procs.pop("recorder", None) if old: ff.stop_proc(old) completed_path = self.recording_path self._advance_segment_offset(completed_path) self._segment += 1 log.info("Restarting recorder → segment %d (offset %.1fs)", self._segment, self.current_global_offset) self._launch_recorder() def recorder_alive(self): """Check if the recorder process is still running.""" proc = self._procs.get("recorder") return proc is not None and proc.poll() is None def _launch_recorder(self): start_number = self._next_frame_number() node = ff.receive_record_relay_and_detect( self.stream_url, self.recording_path, self.relay_url, scene_threshold=self.scene_threshold, ) proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True) self._procs["recorder"] = proc log.info("Recorder+scene: pid=%s → %s (threshold=%.2f, start_number=%d)", proc.pid, self.recording_path, self.scene_threshold, start_number) self._start_scene_readers(proc, start_number) # -- Scene Detection -- def start_scene_detector(self, on_new_frames=None): """Register callback for new scene frames. Scene detection runs inside the recorder process (single ffmpeg). The stderr reader thread parses showinfo lines and fires this callback. """ self._on_new_frames = on_new_frames def update_scene_threshold(self, new_threshold): """Update scene threshold. Restarts the recorder to apply new filter.""" self.scene_threshold = new_threshold log.info("Threshold changed → %.2f, restarting recorder", new_threshold) self.restart_recorder() def _next_frame_number(self): """Determine next frame number from the index (source of truth).""" index_path = self.frames_dir / "index.json" if index_path.exists(): index = json.loads(index_path.read_text()) return len(index) + 1 return 1 def _append_frame_index(self, entry): """Append a frame entry to index.json.""" index_path = self.frames_dir / "index.json" index = json.loads(index_path.read_text()) if index_path.exists() else [] index.append(entry) index_path.write_text(json.dumps(index, indent=2)) def _start_scene_readers(self, proc, start_number): """Read scene frames from stdout (MJPEG pipe) and timestamps from stderr. Two threads: - stderr: parses showinfo lines, queues pts_time values - stdout: reads JPEG frames from pipe, pairs with queued timestamps, writes files to disk, fires callbacks immediately showinfo fires before the JPEG encoder, so timestamps are always queued before the corresponding JPEG data arrives on stdout. """ ts_queue = Queue() def _read_stderr(): for raw in proc.stderr: line = raw.decode("utf-8", errors="replace").rstrip() if not line: continue if "showinfo" not in line: log.debug("[recorder] %s", line) continue pts_match = re.search(r"pts_time:\s*([\d.]+)", line) if pts_match: ts_queue.put(float(pts_match.group(1))) log.info("[recorder] stderr closed, exit=%s", proc.poll()) def _read_stdout(): frame_num = start_number offset = self.current_global_offset buf = b"" while True: chunk = proc.stdout.read(4096) if not chunk: break buf += chunk # Split JPEG frames by SOI (0xFFD8) and EOI (0xFFD9) markers while True: soi = buf.find(b"\xff\xd8") if soi < 0: buf = b"" break eoi = buf.find(b"\xff\xd9", soi + 2) if eoi < 0: buf = buf[soi:] # keep from SOI, need more data break jpeg_data = buf[soi:eoi + 2] buf = buf[eoi + 2:] # Get timestamp (showinfo fires before encode, so it's queued) try: pts_time = ts_queue.get(timeout=2.0) except Empty: log.warning("No timestamp for scene frame %d", frame_num) pts_time = 0.0 frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" frame_path.write_bytes(jpeg_data) entry = { "id": frame_id, "timestamp": pts_time + offset, "path": str(frame_path), "sent_to_agent": False, } self._append_frame_index(entry) log.info("Scene frame: %s at %.1fs (pts=%.1f + offset=%.1f)", frame_id, entry["timestamp"], pts_time, offset) if self._on_new_frames: self._on_new_frames([entry]) frame_num += 1 log.info("[recorder] stdout closed") Thread(target=_read_stderr, daemon=True, name="recorder_stderr").start() Thread(target=_read_stdout, daemon=True, name="recorder_stdout").start() def _probe_safe_duration(self): """Probe current recording duration via ffprobe. Returns seconds or None.""" try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(self.recording_path)) dur = float(info.get("format", {}).get("duration", 0)) if dur > 0: return dur for stream in info.get("streams", []): sdur = float(stream.get("duration", 0)) if sdur > 0: return sdur except Exception: pass try: return self.recording_path.stat().st_size / 65_000 except Exception: return None def capture_now(self, on_new_frames=None): """Capture a single frame from the current recording position. Grabs the latest available frame (safe_duration - 1s) and adds it to the index. Runs in a thread to avoid blocking the UI. """ def _capture(): safe_duration = self._probe_safe_duration() if not safe_duration or safe_duration < 1: log.warning("capture_now: recording too short") return local_timestamp = safe_duration - 1 timestamp = local_timestamp + self.current_global_offset frame_num = self._next_frame_number() frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" try: ff.extract_frame_at(self.recording_path, frame_path, local_timestamp) except Exception as e: log.error("capture_now failed: %s", e) return if not frame_path.exists(): log.warning("capture_now: frame not written") return entry = { "id": frame_id, "timestamp": timestamp, "path": str(frame_path), "sent_to_agent": False, } self._append_frame_index(entry) log.info("Manual capture: %s at %.1fs", frame_id, timestamp) if on_new_frames: on_new_frames([entry]) Thread(target=_capture, daemon=True, name="capture_now").start() # -- Audio Extraction -- def start_audio_extractor(self, on_new_audio=None): """Periodically extract audio from the growing recording as WAV chunks. Same incremental pattern as scene detector: polls recording, extracts new time range, calls back with (wav_path, start_time, duration). Args: on_new_audio: callback(wav_path, start_time, duration) """ self._on_new_audio = on_new_audio self.audio_dir.mkdir(parents=True, exist_ok=True) def _extract(): processed_time = 0.0 chunk_num = 0 current_segment = None while "stop" not in self._stop_flags: time.sleep(AUDIO_EXTRACT_INTERVAL) seg = self.recording_path if not seg.exists(): continue if seg != current_segment: current_segment = seg processed_time = 0.0 chunk_num = 0 log.info("Audio extractor: switched to %s", seg.name) if seg.stat().st_size < 100_000: continue safe_duration = self._probe_safe_duration() if safe_duration is None or safe_duration <= 0: continue process_to = safe_duration - AUDIO_SAFETY_MARGIN if process_to <= processed_time + 1.0: continue chunk_duration = process_to - processed_time wav_path = self.audio_dir / f"chunk_{chunk_num:04d}.wav" try: ff.extract_audio_chunk( seg, wav_path, start_time=processed_time, duration=chunk_duration, ) except Exception as e: log.error("Audio extraction failed: %s", e) continue if wav_path.exists() and wav_path.stat().st_size > 100: global_start = processed_time + self.current_global_offset log.info("Audio chunk: %s (%.1fs → %.1fs, global %.1fs)", wav_path.name, processed_time, process_to, global_start) if self._on_new_audio: self._on_new_audio( wav_path, global_start, chunk_duration, segment_path=seg, local_start=processed_time, ) chunk_num += 1 processed_time = process_to log.info("Audio extractor stopped") t = Thread(target=_extract, daemon=True, name="audio_extractor") t.start() self._threads["audio_extractor"] = t # -- Lifecycle -- def stop_all(self): log.info("Stopping all...") self._stop_flags.add("stop") for name, proc in self._procs.items(): log.info("Stopping %s", name) ff.stop_proc(proc) self._procs.clear()