""" StreamManager: orchestrates ffmpeg for recording and scene detection. Architecture: sender → TCP:4444 → ffmpeg (writes recording.ts) recording.ts → mpv (plays via Timeline) recording.ts → ffmpeg scene detection (periodic, incremental) """ import json import logging import re import time from pathlib import Path from threading import Thread from cht.config import ( STREAM_HOST, STREAM_PORT, RELAY_PORT, SCENE_THRESHOLD, SESSIONS_DIR, ) from cht.stream import ffmpeg as ff log = logging.getLogger(__name__) def list_sessions(): """Return list of (session_id, session_dir) sorted newest first.""" if not SESSIONS_DIR.exists(): return [] sessions = [] for d in sorted(SESSIONS_DIR.iterdir(), reverse=True): if d.is_dir() and (d / "frames").exists(): sessions.append((d.name, d)) return sessions class StreamManager: def __init__(self, session_id=None): if session_id is None: session_id = time.strftime("%Y%m%d_%H%M%S") self.session_id = session_id self.session_dir = SESSIONS_DIR / session_id self.stream_dir = self.session_dir / "stream" self.frames_dir = self.session_dir / "frames" self.transcript_dir = self.session_dir / "transcript" self.agent_dir = self.session_dir / "agent" self._procs = {} self._threads = {} self._stop_flags = set() self._segment = 0 self.scene_threshold = SCENE_THRESHOLD self.readonly = False # True when loaded from existing session log.info("Session: %s", session_id) @classmethod def from_existing(cls, session_id): """Load an existing session without starting any ffmpeg processes.""" mgr = cls(session_id=session_id) if not mgr.session_dir.exists(): raise FileNotFoundError(f"Session not found: {session_id}") mgr.readonly = True # Point _segment to last recording segment segments = mgr.recording_segments if segments: mgr._segment = len(segments) - 1 log.info("Loaded existing session: %s (%d segments, %d frames)", session_id, len(segments), mgr.frame_count) return mgr @property def frame_count(self): index_path = self.frames_dir / "index.json" if index_path.exists(): try: return len(json.loads(index_path.read_text())) except Exception: pass return 0 def total_duration(self): """Probe total duration across all segments (for completed sessions).""" total = 0.0 for seg in self.recording_segments: try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(seg)) dur = float(info.get("format", {}).get("duration", 0)) if dur <= 0: for s in info.get("streams", []): sdur = float(s.get("duration", 0)) if sdur > 0: dur = sdur break if dur <= 0: dur = seg.stat().st_size / 65_000 total += dur except Exception: total += seg.stat().st_size / 65_000 return total def setup_dirs(self): for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.agent_dir): d.mkdir(parents=True, exist_ok=True) @property def stream_url(self): return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" @property def relay_url(self): return f"udp://127.0.0.1:{RELAY_PORT}" @property def recording_path(self): """Current recording segment path.""" return self.stream_dir / f"recording_{self._segment:03d}.mp4" @property def recording_segments(self): """All recording segments in order.""" return sorted(self.stream_dir.glob("recording_*.mp4")) # -- Recording -- def start_recorder(self): """Start ffmpeg to receive TCP stream, write to fMP4, and relay to UDP.""" self._segment = 0 self._launch_recorder() def restart_recorder(self): """Restart recorder into a new segment. Session stays alive.""" old = self._procs.pop("recorder", None) if old: ff.stop_proc(old) self._segment += 1 log.info("Restarting recorder → segment %d", self._segment) self._launch_recorder() def recorder_alive(self): """Check if the recorder process is still running.""" proc = self._procs.get("recorder") return proc is not None and proc.poll() is None def _launch_recorder(self): node = ff.receive_record_and_relay(self.stream_url, self.recording_path, self.relay_url) proc = ff.run_async(node, pipe_stderr=True) self._procs["recorder"] = proc log.info("Recorder: pid=%s → %s", proc.pid, self.recording_path) self._start_stderr_reader("recorder", proc) # -- Scene Detection -- def start_scene_detector(self, on_new_frames=None): """Periodically run scene detection on new portions of the recording. Args: on_new_frames: callback(list of {id, timestamp, path}) for new frames """ self._on_new_frames = on_new_frames def _detect(): processed_time = 0.0 idle_cycles = 0 current_segment = None while "stop" not in self._stop_flags: # Adaptive sleep: faster at lower thresholds (more sensitive) # threshold 0.01→1s base, 0.10→1s, 0.50→2s base = max(1.0, min(2.0, self.scene_threshold * 10)) sleep_secs = base if idle_cycles == 0 else min(base * 2, base * (2 ** idle_cycles)) time.sleep(sleep_secs) seg = self.recording_path if not seg.exists(): continue # New segment started — reset per-segment progress if seg != current_segment: current_segment = seg processed_time = 0.0 idle_cycles = 0 log.info("Scene detector: switched to %s", seg.name) size = seg.stat().st_size if size < 100_000: continue # Probe current segment duration directly (not total across segments) safe_duration = self._estimate_safe_duration() if safe_duration is None or safe_duration <= 0: continue # 2s safety margin for incomplete tail fragments process_to = safe_duration - 2 if process_to <= processed_time + 0.5: continue log.info("Scene detection: %.1fs → %.1fs", processed_time, process_to) new_frames = self._detect_scenes( start_time=processed_time, end_time=process_to, ) if new_frames: idle_cycles = 0 log.info("Found %d new scene frames (total: %d)", len(new_frames), self._next_frame_number() - 1) if self._on_new_frames: self._on_new_frames(new_frames) else: idle_cycles += 1 processed_time = process_to log.info("Scene detector stopped") t = Thread(target=_detect, daemon=True, name="scene_detector") t.start() self._threads["scene_detector"] = t def _estimate_safe_duration(self): """Estimate recording duration. Uses ffprobe, falls back to file size. For fragmented MP4 (empty_moov), format-level duration is 0 so we check stream duration from the last video stream instead. """ try: import ffmpeg as ffmpeg_lib info = ffmpeg_lib.probe(str(self.recording_path)) # Format duration works for non-fragmented; 0 for empty_moov fMP4 dur = float(info.get("format", {}).get("duration", 0)) if dur > 0: return dur # Fragmented MP4: check video stream duration for stream in info.get("streams", []): sdur = float(stream.get("duration", 0)) if sdur > 0: return sdur except Exception: pass # Fallback: rough estimate from file size (~500kbit/s typical for this stream) try: size = self.recording_path.stat().st_size return size / 65_000 # ~500kbps → 62.5 KB/s except Exception: return None def _next_frame_number(self): """Determine next frame number from the index (source of truth).""" index_path = self.frames_dir / "index.json" if index_path.exists(): index = json.loads(index_path.read_text()) return len(index) + 1 return 1 def _detect_scenes(self, start_time, end_time): """Run ffmpeg scene detection on a time range. Returns list of new frame entries.""" duration = end_time - start_time start_number = self._next_frame_number() try: _stdout, stderr = ff.extract_scene_frames( self.recording_path, self.frames_dir, scene_threshold=self.scene_threshold, start_number=start_number, start_time=start_time, duration=duration, ) except Exception as e: log.error("Scene detection failed: %s", e) return [] # Parse new frames from showinfo output — match each showinfo line # to the corresponding file ffmpeg wrote (sequential from start_number) new_frames = [] index_path = self.frames_dir / "index.json" index = json.loads(index_path.read_text()) if index_path.exists() else [] frame_num = start_number for line in stderr.splitlines(): if "showinfo" not in line: continue pts_match = re.search(r"pts_time:\s*([\d.]+)", line) if pts_match: pts_time = float(pts_match.group(1)) frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" if frame_path.exists(): entry = { "id": frame_id, "timestamp": pts_time, "path": str(frame_path), "sent_to_agent": False, } index.append(entry) new_frames.append(entry) frame_num += 1 index_path.write_text(json.dumps(index, indent=2)) return new_frames def capture_now(self, on_new_frames=None): """Capture a single frame from the current recording position. Grabs the latest available frame (safe_duration - 1s) and adds it to the index. Runs in a thread to avoid blocking the UI. """ def _capture(): safe_duration = self._estimate_safe_duration() if not safe_duration or safe_duration < 1: log.warning("capture_now: recording too short") return timestamp = safe_duration - 1 index_path = self.frames_dir / "index.json" index = json.loads(index_path.read_text()) if index_path.exists() else [] frame_num = len(index) + 1 frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" try: ff.extract_frame_at(self.recording_path, frame_path, timestamp) except Exception as e: log.error("capture_now failed: %s", e) return if not frame_path.exists(): log.warning("capture_now: frame not written") return entry = { "id": frame_id, "timestamp": timestamp, "path": str(frame_path), "sent_to_agent": False, } index.append(entry) index_path.write_text(json.dumps(index, indent=2)) log.info("Manual capture: %s at %.1fs", frame_id, timestamp) if on_new_frames: on_new_frames([entry]) Thread(target=_capture, daemon=True, name="capture_now").start() # -- Lifecycle -- def stop_all(self): log.info("Stopping all...") self._stop_flags.add("stop") for name, proc in self._procs.items(): log.info("Stopping %s", name) ff.stop_proc(proc) self._procs.clear() def _start_stderr_reader(self, name, proc): def _read(): for line in proc.stderr: text = line.decode("utf-8", errors="replace").rstrip() if text: log.debug("[%s] %s", name, text) log.info("[%s] exited: %s", name, proc.poll()) Thread(target=_read, daemon=True, name=f"{name}_stderr").start()