Files
mitus/cht/stream/manager.py
2026-04-01 21:14:30 -03:00

225 lines
7.8 KiB
Python

"""
StreamManager: orchestrates ffmpeg for recording and scene detection.
Architecture:
sender → TCP:4444 → ffmpeg (writes recording.ts)
recording.ts → mpv (plays via Timeline)
recording.ts → ffmpeg scene detection (periodic, incremental)
"""
import json
import logging
import re
import time
from pathlib import Path
from threading import Thread
from cht.config import (
STREAM_HOST,
STREAM_PORT,
RELAY_PORT,
SCENE_THRESHOLD,
SESSIONS_DIR,
)
from cht.stream import ffmpeg as ff
log = logging.getLogger(__name__)
class StreamManager:
def __init__(self, session_id=None):
if session_id is None:
session_id = time.strftime("%Y%m%d_%H%M%S")
self.session_id = session_id
self.session_dir = SESSIONS_DIR / session_id
self.stream_dir = self.session_dir / "stream"
self.frames_dir = self.session_dir / "frames"
self.transcript_dir = self.session_dir / "transcript"
self.agent_dir = self.session_dir / "agent"
self._procs = {}
self._threads = {}
self._stop_flags = set()
log.info("Session: %s", session_id)
def setup_dirs(self):
for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.agent_dir):
d.mkdir(parents=True, exist_ok=True)
@property
def stream_url(self):
return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen"
@property
def relay_url(self):
return f"udp://127.0.0.1:{RELAY_PORT}"
@property
def recording_path(self):
return self.stream_dir / "recording.mp4"
# -- Recording --
def start_recorder(self):
"""Start ffmpeg to receive TCP stream, write to MKV, and relay to UDP."""
node = ff.receive_record_and_relay(self.stream_url, self.recording_path, self.relay_url)
proc = ff.run_async(node, pipe_stderr=True)
self._procs["recorder"] = proc
log.info("Recorder: pid=%s%s", proc.pid, self.recording_path)
self._start_stderr_reader("recorder", proc)
# -- Scene Detection --
def start_scene_detector(self, on_new_frames=None):
"""Periodically run scene detection on new portions of the recording.
Args:
on_new_frames: callback(list of {id, timestamp, path}) for new frames
"""
self._on_new_frames = on_new_frames
def _detect():
processed_time = 0.0
frame_count = 0
idle_cycles = 0 # consecutive cycles with no new frames
while "stop" not in self._stop_flags:
# Adaptive sleep: 1s after finding frames, then 2→4→8→10s backoff
sleep_secs = 1 if idle_cycles == 0 else min(2, 2 ** idle_cycles)
time.sleep(sleep_secs)
if not self.recording_path.exists():
continue
size = self.recording_path.stat().st_size
if size < 100_000:
continue
# 2s safety margin — fragmented MP4 is valid up to last complete
# keyframe fragment (~1 keyframe interval); 2s covers worst case.
safe_duration = self._estimate_safe_duration()
if safe_duration is None:
continue
process_to = safe_duration - 2
if process_to <= processed_time + 0.5:
continue
log.info("Scene detection: %.1fs → %.1fs", processed_time, process_to)
new_frames = self._detect_scenes(
start_time=processed_time,
end_time=process_to,
start_number=frame_count + 1,
)
if new_frames:
frame_count += len(new_frames)
idle_cycles = 0 # reset — check again quickly
log.info("Found %d new scene frames (total: %d)", len(new_frames), frame_count)
if self._on_new_frames:
self._on_new_frames(new_frames)
else:
idle_cycles += 1 # back off: 2s, 4s, 8s, 10s
processed_time = process_to
log.info("Scene detector stopped")
t = Thread(target=_detect, daemon=True, name="scene_detector")
t.start()
self._threads["scene_detector"] = t
def _estimate_safe_duration(self):
"""Estimate recording duration. Uses ffprobe, falls back to file size.
For fragmented MP4 (empty_moov), format-level duration is 0 so we
check stream duration from the last video stream instead.
"""
try:
import ffmpeg as ffmpeg_lib
info = ffmpeg_lib.probe(str(self.recording_path))
# Format duration works for non-fragmented; 0 for empty_moov fMP4
dur = float(info.get("format", {}).get("duration", 0))
if dur > 0:
return dur
# Fragmented MP4: check video stream duration
for stream in info.get("streams", []):
sdur = float(stream.get("duration", 0))
if sdur > 0:
return sdur
except Exception:
pass
# Fallback: rough estimate from file size (~500kbit/s typical for this stream)
try:
size = self.recording_path.stat().st_size
return size / 65_000 # ~500kbps → 62.5 KB/s
except Exception:
return None
def _detect_scenes(self, start_time, end_time, start_number):
"""Run ffmpeg scene detection on a time range. Returns list of new frame entries."""
duration = end_time - start_time
existing_before = set(f.name for f in self.frames_dir.glob("F*.jpg"))
try:
_stdout, stderr = ff.extract_scene_frames(
self.recording_path,
self.frames_dir,
scene_threshold=SCENE_THRESHOLD,
start_number=start_number,
start_time=start_time,
duration=duration,
)
except Exception as e:
log.error("Scene detection failed: %s", e)
return []
# Parse new frames from showinfo output
new_frames = []
index_path = self.frames_dir / "index.json"
index = json.loads(index_path.read_text()) if index_path.exists() else []
frame_num = start_number
for line in stderr.splitlines():
if "showinfo" not in line:
continue
pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
if pts_match:
pts_time = float(pts_match.group(1))
frame_id = f"F{frame_num:04d}"
frame_path = self.frames_dir / f"{frame_id}.jpg"
if frame_path.exists() and frame_path.name not in existing_before:
entry = {
"id": frame_id,
"timestamp": pts_time,
"path": str(frame_path),
"sent_to_agent": False,
}
index.append(entry)
new_frames.append(entry)
frame_num += 1
index_path.write_text(json.dumps(index, indent=2))
return new_frames
# -- Lifecycle --
def stop_all(self):
log.info("Stopping all...")
self._stop_flags.add("stop")
for name, proc in self._procs.items():
log.info("Stopping %s", name)
ff.stop_proc(proc)
self._procs.clear()
def _start_stderr_reader(self, name, proc):
def _read():
for line in proc.stderr:
text = line.decode("utf-8", errors="replace").rstrip()
if text:
log.debug("[%s] %s", name, text)
log.info("[%s] exited: %s", name, proc.poll())
Thread(target=_read, daemon=True, name=f"{name}_stderr").start()