""" StreamManager: orchestrates ffmpeg pipelines for receiving, recording, and frame extraction from a muxed mpegts/TCP stream. Architecture: sender → TCP:4444 → ffmpeg (writes growing recording.ts) └→ mpv plays recording.ts (DVR: live edge + scrub) └→ ffmpeg scene detection (periodic on recording) """ import json import logging import re import time from pathlib import Path from threading import Thread from cht.config import ( STREAM_HOST, STREAM_PORT, SCENE_THRESHOLD, MAX_FRAME_INTERVAL, SESSIONS_DIR, ) from cht.stream import ffmpeg as ff log = logging.getLogger(__name__) class StreamManager: def __init__(self, session_id=None): if session_id is None: session_id = time.strftime("%Y%m%d_%H%M%S") self.session_id = session_id self.session_dir = SESSIONS_DIR / session_id self.stream_dir = self.session_dir / "stream" self.frames_dir = self.session_dir / "frames" self.transcript_dir = self.session_dir / "transcript" self.agent_dir = self.session_dir / "agent" self._procs = {} self._threads = {} self._stop_flags = set() log.info("StreamManager created: session=%s dir=%s", session_id, self.session_dir) def setup_dirs(self): for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.agent_dir): d.mkdir(parents=True, exist_ok=True) log.info("Session directories created") @property def stream_url(self): return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" @property def recording_path(self): return self.stream_dir / "recording.ts" # -- Recording -- def start_recorder(self): """Start ffmpeg to receive TCP stream and write to recording.ts.""" node = ff.receive_and_record(self.stream_url, self.recording_path) proc = ff.run_async(node, pipe_stderr=True) self._procs["recorder"] = proc log.info("Recorder started: pid=%s url=%s → %s", proc.pid, self.stream_url, self.recording_path) self._start_stderr_reader("recorder", proc) # -- Scene detection -- def start_scene_detector(self): """Periodically run ffmpeg scene detection on the growing recording. Tracks how far we've processed to avoid re-scanning from the start. """ log.info("Starting scene detector (threshold=%.2f, interval=%ds)", SCENE_THRESHOLD, MAX_FRAME_INTERVAL) def _detect(): last_processed_size = 0 processed_duration = 0.0 # seconds already processed frame_count = 0 while "stop" not in self._stop_flags: time.sleep(10) if not self.recording_path.exists(): continue size = self.recording_path.stat().st_size if size <= last_processed_size or size < 100_000: continue log.info("Recording grew: %d → %d bytes, scanning from %.1fs", last_processed_size, size, processed_duration) last_processed_size = size try: new_count, new_duration = self._extract_new_frames( self.recording_path, start_time=processed_duration, start_number=frame_count + 1, ) if new_count > 0: frame_count += new_count log.info("Found %d new frames (total: %d)", new_count, frame_count) if new_duration > processed_duration: processed_duration = new_duration except Exception as e: log.error("Scene detection failed: %s", e) log.info("Scene detector stopped") t = Thread(target=_detect, daemon=True, name="scene_detector") t.start() self._threads["scene_detector"] = t def _extract_new_frames(self, path, start_time=0.0, start_number=1): """Extract scene-change frames starting from a given timestamp. Returns (new_frame_count, max_timestamp_seen). """ existing_before = set(f.name for f in self.frames_dir.glob("F*.jpg")) try: _stdout, stderr = ff.extract_scene_frames( path, self.frames_dir, scene_threshold=SCENE_THRESHOLD, max_interval=MAX_FRAME_INTERVAL, start_number=start_number, start_time=start_time, ) except Exception as e: log.error("ffmpeg scene extraction error: %s", e) return 0, start_time if stderr: for line in stderr.splitlines()[:5]: log.debug("[scene_detect:stderr] %s", line) # Parse timestamps and update index max_ts = start_time new_count = 0 index_path = self.frames_dir / "index.json" if index_path.exists(): with open(index_path) as f: index = json.load(f) else: index = [] frame_num = start_number for line in stderr.splitlines(): if "showinfo" not in line: continue pts_match = re.search(r"pts_time:\s*([\d.]+)", line) if pts_match: pts_time = float(pts_match.group(1)) frame_id = f"F{frame_num:04d}" frame_path = self.frames_dir / f"{frame_id}.jpg" if frame_path.exists() and frame_path.name not in existing_before: index.append({ "id": frame_id, "timestamp": pts_time, "path": str(frame_path), "sent_to_agent": False, }) log.info("Indexed frame %s at pts=%.2f", frame_id, pts_time) new_count += 1 if pts_time > max_ts: max_ts = pts_time frame_num += 1 with open(index_path, "w") as f: json.dump(index, f, indent=2) return new_count, max_ts # -- Lifecycle -- def stop_all(self): log.info("Stopping all processes...") self._stop_flags.add("stop") for name, proc in self._procs.items(): log.info("Stopping %s (pid=%s)", name, proc.pid if proc else "?") ff.stop_proc(proc) self._procs.clear() log.info("All processes stopped") def _start_stderr_reader(self, name, proc): def _read(): try: for line in proc.stderr: text = line.decode("utf-8", errors="replace").rstrip() if text: log.info("[%s:stderr] %s", name, text) except Exception as e: log.warning("[%s:stderr] read error: %s", name, e) retcode = proc.poll() log.info("[%s] process exited: code=%s", name, retcode) t = Thread(target=_read, daemon=True, name=f"{name}_stderr") t.start()