""" StreamManager: orchestrates ffmpeg for recording and scene detection. Architecture: sender → TCP:4444 → single ffmpeg process: 1. writes fMP4 to disk (c=copy) 2. relays UDP for live display (c=copy) 3. CUDA decode → scene filter → JPEG frames (real-time) """ import json import logging import re import time from threading import Thread from cht.config import ( STREAM_HOST, STREAM_PORT, RELAY_PORT, SCENE_THRESHOLD, SCENE_FLUSH_FRAMES, SESSIONS_DIR, AUDIO_EXTRACT_INTERVAL, AUDIO_SAFETY_MARGIN, ) from cht.stream import ffmpeg as ff log = logging.getLogger(__name__) def list_sessions(): """Return list of (session_id, session_dir) sorted newest first.""" if not SESSIONS_DIR.exists(): return [] sessions = [] for d in sorted(SESSIONS_DIR.iterdir(), reverse=True): if d.is_dir() and (d / "frames").exists(): sessions.append((d.name, d)) return sessions def delete_sessions(session_ids): """Delete session directories by ID.""" import shutil for sid in session_ids: path = SESSIONS_DIR / sid if path.exists() and path.is_dir(): shutil.rmtree(path) log.info("Deleted session: %s", sid) class StreamManager: def __init__(self, session_id=None): if session_id is None: session_id = time.strftime("%Y%m%d_%H%M%S") self.session_id = session_id self.session_dir = SESSIONS_DIR / session_id self.stream_dir = self.session_dir / "stream" self.frames_dir = self.session_dir / "frames" self.transcript_dir = self.session_dir / "transcript" self.audio_dir = self.session_dir / "audio" self.agent_dir = self.session_dir / "agent" self._procs = {} self._threads = {} self._stop_flags = set() self._segment = 0 self._segment_offsets = {0: 0.0} # segment_index → global_offset self.scene_threshold = SCENE_THRESHOLD self.readonly = False # True when loaded from existing session self.telemetry = None # set by window after start log.info("Session: %s", session_id) @classmethod def from_existing(cls, session_id): """Load an existing session without starting any ffmpeg processes.""" from cht.session import rebuild_manifest mgr = cls(session_id=session_id) if not mgr.session_dir.exists(): raise FileNotFoundError(f"Session not found: {session_id}") mgr.readonly = True # Point _segment to last recording segment segments = mgr.recording_segments if segments: mgr._segment = len(segments) - 1 mgr._rebuild_offsets() rebuild_manifest(mgr.session_dir) log.info("Loaded existing session: %s (%d segments, %d frames)", session_id, len(segments), mgr.frame_count) return mgr @property def current_global_offset(self) -> float: """Global time offset for the current recording segment.""" return self._segment_offsets.get(self._segment, 0.0) def _rebuild_offsets(self): """Compute global offsets from all segments on disk.""" from cht.session import probe_duration offset = 0.0 self._segment_offsets = {} for i, seg in enumerate(self.recording_segments): self._segment_offsets[i] = offset offset += probe_duration(seg) def _advance_segment_offset(self, completed_segment_path): """Update offsets after a segment completes and a new one begins.""" from cht.session import probe_duration dur = probe_duration(completed_segment_path) prev_offset = self._segment_offsets.get(self._segment, 0.0) self._segment_offsets[self._segment + 1] = prev_offset + dur log.info("Segment %d completed (%.1fs), next offset: %.1fs", self._segment, dur, prev_offset + dur) @property def frame_count(self): index_path = self.frames_dir / "index.json" if index_path.exists(): try: return len(json.loads(index_path.read_text())) except Exception: pass return 0 def total_duration(self): """Probe total duration across all segments (for completed sessions).""" total = 0.0 for seg in self.recording_segments: try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(seg)) dur = float(info.get("format", {}).get("duration", 0)) if dur <= 0: for s in info.get("streams", []): sdur = float(s.get("duration", 0)) if sdur > 0: dur = sdur break if dur <= 0: dur = seg.stat().st_size / 65_000 total += dur except Exception: total += seg.stat().st_size / 65_000 return total def setup_dirs(self): for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.audio_dir, self.agent_dir): d.mkdir(parents=True, exist_ok=True) @property def stream_url(self): return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" @property def relay_url(self): return f"udp://127.0.0.1:{RELAY_PORT}" @property def recording_path(self): """Current recording segment path.""" return self.stream_dir / f"recording_{self._segment:03d}.mp4" @property def recording_segments(self): """All recording segments in order.""" return sorted(self.stream_dir.glob("recording_*.mp4")) # -- Recording -- def start_recorder(self): """Start ffmpeg to receive TCP stream, write to fMP4, and relay to UDP.""" # Start after existing segments (for resumed sessions) existing = self.recording_segments self._segment = len(existing) self._rebuild_offsets() self._launch_recorder() def restart_recorder(self): """Restart recorder into a new segment. Session stays alive.""" old = self._procs.pop("recorder", None) if old: ff.stop_proc(old) completed_path = self.recording_path self._advance_segment_offset(completed_path) self._segment += 1 log.info("Restarting recorder → segment %d (offset %.1fs)", self._segment, self.current_global_offset) self._launch_recorder() def recorder_alive(self): """Check if the recorder process is still running.""" proc = self._procs.get("recorder") return proc is not None and proc.poll() is None def _launch_recorder(self): start_number = self._next_frame_number() node = ff.receive_record_relay_and_detect( self.stream_url, self.recording_path, self.relay_url, scene_threshold=self.scene_threshold, flush_frames=SCENE_FLUSH_FRAMES, ) proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True) self._procs["recorder"] = proc log.info("Recorder+scene: pid=%s → %s (threshold=%.2f, start_number=%d)", proc.pid, self.recording_path, self.scene_threshold, start_number) self._start_scene_readers(proc, start_number) # -- Scene Detection -- def start_scene_detector(self, on_new_frames=None): """Register callback for new scene frames. Scene detection runs inside the recorder process (single ffmpeg). The stderr reader thread parses showinfo lines and fires this callback. """ self._on_new_frames = on_new_frames def update_scene_threshold(self, new_threshold): """Update scene threshold. Restarts the recorder to apply new filter.""" self.scene_threshold = new_threshold log.info("Threshold changed → %.2f, restarting recorder", new_threshold) self.restart_recorder() def _next_frame_number(self): """Determine next frame number from the index (source of truth).""" index_path = self.frames_dir / "index.json" if index_path.exists(): index = json.loads(index_path.read_text()) return len(index) + 1 return 1 def _append_frame_index(self, entry): """Append a frame entry to index.json.""" index_path = self.frames_dir / "index.json" index = json.loads(index_path.read_text()) if index_path.exists() else [] index.append(entry) index_path.write_text(json.dumps(index, indent=2)) def _start_scene_readers(self, proc, start_number): """Read scene frames from stdout (MJPEG pipe) and timestamps from stderr. Two threads: - stderr: parses showinfo lines, queues pts_time values - stdout: reads JPEG frames from pipe, pairs with queued timestamps, writes files to disk, fires callbacks immediately """ from queue import Queue, Empty import os ts_queue = Queue() def _read_stderr(): for raw in proc.stderr: line = raw.decode("utf-8", errors="replace").rstrip() if not line: continue if "showinfo" not in line: log.debug("[recorder] %s", line) continue pts_match = re.search(r"pts_time:\s*([\d.]+)", line) if pts_match: ts_queue.put(float(pts_match.group(1))) log.info("[recorder] stderr closed, exit=%s", proc.poll()) def _read_stdout(): frame_num = start_number offset = self.current_global_offset last_pts = -1.0 buf = b"" raw_fd = proc.stdout.fileno() while True: chunk = os.read(raw_fd, 65536) if not chunk: break buf += chunk while True: soi = buf.find(b"\xff\xd8") if soi < 0: buf = b"" break eoi = buf.find(b"\xff\xd9", soi + 2) if eoi < 0: buf = buf[soi:] break jpeg_data = buf[soi:eoi + 2] buf = buf[eoi + 2:] try: pts_time = ts_queue.get(timeout=2.0) except Empty: log.warning("No timestamp for scene frame %d", frame_num) pts_time = 0.0 # Skip flush frames (within 100ms of previous = duplicate) if pts_time - last_pts < 0.1: log.debug("Skipping flush frame at pts=%.3f", pts_time) continue last_pts = pts_time frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" frame_path.write_bytes(jpeg_data) entry = { "id": frame_id, "timestamp": pts_time + offset, "path": str(frame_path), "sent_to_agent": False, } self._append_frame_index(entry) log.info("Scene frame: %s at %.1fs (pts=%.1f + offset=%.1f)", frame_id, entry["timestamp"], pts_time, offset) if self._on_new_frames: self._on_new_frames([entry]) frame_num += 1 log.info("[recorder] stdout closed") Thread(target=_read_stderr, daemon=True, name="recorder_stderr").start() Thread(target=_read_stdout, daemon=True, name="recorder_stdout").start() def _probe_safe_duration(self): """Probe current recording duration via ffprobe. Returns seconds or None.""" try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(self.recording_path)) dur = float(info.get("format", {}).get("duration", 0)) if dur > 0: return dur for stream in info.get("streams", []): sdur = float(stream.get("duration", 0)) if sdur > 0: return sdur except Exception: pass try: return self.recording_path.stat().st_size / 65_000 except Exception: return None def capture_now(self, on_new_frames=None): """Capture a single frame from the current recording position. Grabs the latest available frame (safe_duration - 1s) and adds it to the index. Runs in a thread to avoid blocking the UI. """ def _capture(): safe_duration = self._probe_safe_duration() if not safe_duration or safe_duration < 1: log.warning("capture_now: recording too short") return local_timestamp = safe_duration - 1 timestamp = local_timestamp + self.current_global_offset frame_num = self._next_frame_number() frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" try: ff.extract_frame_at(self.recording_path, frame_path, local_timestamp) except Exception as e: log.error("capture_now failed: %s", e) return if not frame_path.exists(): log.warning("capture_now: frame not written") return entry = { "id": frame_id, "timestamp": timestamp, "path": str(frame_path), "sent_to_agent": False, } self._append_frame_index(entry) log.info("Manual capture: %s at %.1fs", frame_id, timestamp) if on_new_frames: on_new_frames([entry]) Thread(target=_capture, daemon=True, name="capture_now").start() # -- Audio Extraction -- def start_audio_extractor(self, on_new_audio=None): """Periodically extract audio from the growing recording as WAV chunks. Same incremental pattern as scene detector: polls recording, extracts new time range, calls back with (wav_path, start_time, duration). Args: on_new_audio: callback(wav_path, start_time, duration) """ self._on_new_audio = on_new_audio self.audio_dir.mkdir(parents=True, exist_ok=True) def _extract(): processed_time = 0.0 chunk_num = 0 current_segment = None while "stop" not in self._stop_flags: time.sleep(AUDIO_EXTRACT_INTERVAL) seg = self.recording_path if not seg.exists(): continue if seg != current_segment: current_segment = seg processed_time = 0.0 chunk_num = 0 log.info("Audio extractor: switched to %s", seg.name) if seg.stat().st_size < 100_000: continue safe_duration = self._probe_safe_duration() if safe_duration is None or safe_duration <= 0: continue process_to = safe_duration - AUDIO_SAFETY_MARGIN if process_to <= processed_time + 1.0: continue chunk_duration = process_to - processed_time wav_path = self.audio_dir / f"chunk_{chunk_num:04d}.wav" try: ff.extract_audio_chunk( seg, wav_path, start_time=processed_time, duration=chunk_duration, ) except Exception as e: log.error("Audio extraction failed: %s", e) continue if wav_path.exists() and wav_path.stat().st_size > 100: global_start = processed_time + self.current_global_offset log.info("Audio chunk: %s (%.1fs → %.1fs, global %.1fs)", wav_path.name, processed_time, process_to, global_start) if self._on_new_audio: self._on_new_audio( wav_path, global_start, chunk_duration, segment_path=seg, local_start=processed_time, ) chunk_num += 1 processed_time = process_to log.info("Audio extractor stopped") t = Thread(target=_extract, daemon=True, name="audio_extractor") t.start() self._threads["audio_extractor"] = t # -- Lifecycle -- def stop_all(self): log.info("Stopping all...") self._stop_flags.add("stop") for name, proc in self._procs.items(): log.info("Stopping %s", name) ff.stop_proc(proc) self._procs.clear()