313 lines
11 KiB
Python
313 lines
11 KiB
Python
"""
|
|
StreamManager: orchestrates ffmpeg for recording and scene detection.
|
|
|
|
Architecture:
|
|
sender → TCP:4444 → ffmpeg (writes recording.ts)
|
|
recording.ts → mpv (plays via Timeline)
|
|
recording.ts → ffmpeg scene detection (periodic, incremental)
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from threading import Thread
|
|
|
|
from cht.config import (
|
|
STREAM_HOST,
|
|
STREAM_PORT,
|
|
RELAY_PORT,
|
|
SCENE_THRESHOLD,
|
|
SESSIONS_DIR,
|
|
)
|
|
from cht.stream import ffmpeg as ff
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamManager:
|
|
def __init__(self, session_id=None):
|
|
if session_id is None:
|
|
session_id = time.strftime("%Y%m%d_%H%M%S")
|
|
self.session_id = session_id
|
|
self.session_dir = SESSIONS_DIR / session_id
|
|
self.stream_dir = self.session_dir / "stream"
|
|
self.frames_dir = self.session_dir / "frames"
|
|
self.transcript_dir = self.session_dir / "transcript"
|
|
self.agent_dir = self.session_dir / "agent"
|
|
|
|
self._procs = {}
|
|
self._threads = {}
|
|
self._stop_flags = set()
|
|
self._segment = 0
|
|
self.scene_threshold = SCENE_THRESHOLD
|
|
log.info("Session: %s", session_id)
|
|
|
|
def setup_dirs(self):
|
|
for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.agent_dir):
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
@property
|
|
def stream_url(self):
|
|
return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen"
|
|
|
|
@property
|
|
def relay_url(self):
|
|
return f"udp://127.0.0.1:{RELAY_PORT}"
|
|
|
|
@property
|
|
def recording_path(self):
|
|
"""Current recording segment path."""
|
|
return self.stream_dir / f"recording_{self._segment:03d}.mp4"
|
|
|
|
@property
|
|
def recording_segments(self):
|
|
"""All recording segments in order."""
|
|
return sorted(self.stream_dir.glob("recording_*.mp4"))
|
|
|
|
# -- Recording --
|
|
|
|
def start_recorder(self):
|
|
"""Start ffmpeg to receive TCP stream, write to fMP4, and relay to UDP."""
|
|
self._segment = 0
|
|
self._launch_recorder()
|
|
|
|
def restart_recorder(self):
|
|
"""Restart recorder into a new segment. Session stays alive."""
|
|
old = self._procs.pop("recorder", None)
|
|
if old:
|
|
ff.stop_proc(old)
|
|
self._segment += 1
|
|
log.info("Restarting recorder → segment %d", self._segment)
|
|
self._launch_recorder()
|
|
|
|
def recorder_alive(self):
|
|
"""Check if the recorder process is still running."""
|
|
proc = self._procs.get("recorder")
|
|
return proc is not None and proc.poll() is None
|
|
|
|
def _launch_recorder(self):
|
|
node = ff.receive_record_and_relay(self.stream_url, self.recording_path, self.relay_url)
|
|
proc = ff.run_async(node, pipe_stderr=True)
|
|
self._procs["recorder"] = proc
|
|
log.info("Recorder: pid=%s → %s", proc.pid, self.recording_path)
|
|
self._start_stderr_reader("recorder", proc)
|
|
|
|
# -- Scene Detection --
|
|
|
|
def start_scene_detector(self, on_new_frames=None):
|
|
"""Periodically run scene detection on new portions of the recording.
|
|
|
|
Args:
|
|
on_new_frames: callback(list of {id, timestamp, path}) for new frames
|
|
"""
|
|
self._on_new_frames = on_new_frames
|
|
|
|
def _detect():
|
|
processed_time = 0.0
|
|
idle_cycles = 0
|
|
current_segment = None
|
|
|
|
while "stop" not in self._stop_flags:
|
|
# Adaptive sleep: faster at lower thresholds (more sensitive)
|
|
# threshold 0.01→1s base, 0.10→1s, 0.50→2s
|
|
base = max(1.0, min(2.0, self.scene_threshold * 10))
|
|
sleep_secs = base if idle_cycles == 0 else min(base * 2, base * (2 ** idle_cycles))
|
|
time.sleep(sleep_secs)
|
|
|
|
seg = self.recording_path
|
|
if not seg.exists():
|
|
continue
|
|
|
|
# New segment started — reset per-segment progress
|
|
if seg != current_segment:
|
|
current_segment = seg
|
|
processed_time = 0.0
|
|
idle_cycles = 0
|
|
log.info("Scene detector: switched to %s", seg.name)
|
|
|
|
size = seg.stat().st_size
|
|
if size < 100_000:
|
|
continue
|
|
|
|
# Probe current segment duration directly (not total across segments)
|
|
safe_duration = self._estimate_safe_duration()
|
|
if safe_duration is None or safe_duration <= 0:
|
|
continue
|
|
|
|
# 2s safety margin for incomplete tail fragments
|
|
process_to = safe_duration - 2
|
|
if process_to <= processed_time + 0.5:
|
|
continue
|
|
|
|
log.info("Scene detection: %.1fs → %.1fs", processed_time, process_to)
|
|
new_frames = self._detect_scenes(
|
|
start_time=processed_time,
|
|
end_time=process_to,
|
|
)
|
|
|
|
if new_frames:
|
|
idle_cycles = 0
|
|
log.info("Found %d new scene frames (total: %d)",
|
|
len(new_frames), self._next_frame_number() - 1)
|
|
if self._on_new_frames:
|
|
self._on_new_frames(new_frames)
|
|
else:
|
|
idle_cycles += 1
|
|
|
|
processed_time = process_to
|
|
|
|
log.info("Scene detector stopped")
|
|
|
|
t = Thread(target=_detect, daemon=True, name="scene_detector")
|
|
t.start()
|
|
self._threads["scene_detector"] = t
|
|
|
|
def _estimate_safe_duration(self):
|
|
"""Estimate recording duration. Uses ffprobe, falls back to file size.
|
|
|
|
For fragmented MP4 (empty_moov), format-level duration is 0 so we
|
|
check stream duration from the last video stream instead.
|
|
"""
|
|
try:
|
|
import ffmpeg as ffmpeg_lib
|
|
info = ffmpeg_lib.probe(str(self.recording_path))
|
|
# Format duration works for non-fragmented; 0 for empty_moov fMP4
|
|
dur = float(info.get("format", {}).get("duration", 0))
|
|
if dur > 0:
|
|
return dur
|
|
# Fragmented MP4: check video stream duration
|
|
for stream in info.get("streams", []):
|
|
sdur = float(stream.get("duration", 0))
|
|
if sdur > 0:
|
|
return sdur
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback: rough estimate from file size (~500kbit/s typical for this stream)
|
|
try:
|
|
size = self.recording_path.stat().st_size
|
|
return size / 65_000 # ~500kbps → 62.5 KB/s
|
|
except Exception:
|
|
return None
|
|
|
|
def _next_frame_number(self):
|
|
"""Determine next frame number from the index (source of truth)."""
|
|
index_path = self.frames_dir / "index.json"
|
|
if index_path.exists():
|
|
index = json.loads(index_path.read_text())
|
|
return len(index) + 1
|
|
return 1
|
|
|
|
def _detect_scenes(self, start_time, end_time):
|
|
"""Run ffmpeg scene detection on a time range. Returns list of new frame entries."""
|
|
duration = end_time - start_time
|
|
start_number = self._next_frame_number()
|
|
|
|
try:
|
|
_stdout, stderr = ff.extract_scene_frames(
|
|
self.recording_path,
|
|
self.frames_dir,
|
|
scene_threshold=self.scene_threshold,
|
|
start_number=start_number,
|
|
start_time=start_time,
|
|
duration=duration,
|
|
)
|
|
except Exception as e:
|
|
log.error("Scene detection failed: %s", e)
|
|
return []
|
|
|
|
# Parse new frames from showinfo output — match each showinfo line
|
|
# to the corresponding file ffmpeg wrote (sequential from start_number)
|
|
new_frames = []
|
|
index_path = self.frames_dir / "index.json"
|
|
index = json.loads(index_path.read_text()) if index_path.exists() else []
|
|
|
|
frame_num = start_number
|
|
for line in stderr.splitlines():
|
|
if "showinfo" not in line:
|
|
continue
|
|
pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
|
|
if pts_match:
|
|
pts_time = float(pts_match.group(1))
|
|
frame_id = f"F{frame_num:04d}"
|
|
frame_path = self.frames_dir / f"{frame_id}.jpg"
|
|
if frame_path.exists():
|
|
entry = {
|
|
"id": frame_id,
|
|
"timestamp": pts_time,
|
|
"path": str(frame_path),
|
|
"sent_to_agent": False,
|
|
}
|
|
index.append(entry)
|
|
new_frames.append(entry)
|
|
frame_num += 1
|
|
|
|
index_path.write_text(json.dumps(index, indent=2))
|
|
return new_frames
|
|
|
|
def capture_now(self, on_new_frames=None):
|
|
"""Capture a single frame from the current recording position.
|
|
|
|
Grabs the latest available frame (safe_duration - 1s) and adds it
|
|
to the index. Runs in a thread to avoid blocking the UI.
|
|
"""
|
|
def _capture():
|
|
safe_duration = self._estimate_safe_duration()
|
|
if not safe_duration or safe_duration < 1:
|
|
log.warning("capture_now: recording too short")
|
|
return
|
|
|
|
timestamp = safe_duration - 1
|
|
index_path = self.frames_dir / "index.json"
|
|
index = json.loads(index_path.read_text()) if index_path.exists() else []
|
|
frame_num = len(index) + 1
|
|
frame_id = f"F{frame_num:04d}"
|
|
frame_path = self.frames_dir / f"{frame_id}.jpg"
|
|
|
|
try:
|
|
ff.extract_frame_at(self.recording_path, frame_path, timestamp)
|
|
except Exception as e:
|
|
log.error("capture_now failed: %s", e)
|
|
return
|
|
|
|
if not frame_path.exists():
|
|
log.warning("capture_now: frame not written")
|
|
return
|
|
|
|
entry = {
|
|
"id": frame_id,
|
|
"timestamp": timestamp,
|
|
"path": str(frame_path),
|
|
"sent_to_agent": False,
|
|
}
|
|
index.append(entry)
|
|
index_path.write_text(json.dumps(index, indent=2))
|
|
log.info("Manual capture: %s at %.1fs", frame_id, timestamp)
|
|
|
|
if on_new_frames:
|
|
on_new_frames([entry])
|
|
|
|
Thread(target=_capture, daemon=True, name="capture_now").start()
|
|
|
|
# -- Lifecycle --
|
|
|
|
def stop_all(self):
|
|
log.info("Stopping all...")
|
|
self._stop_flags.add("stop")
|
|
for name, proc in self._procs.items():
|
|
log.info("Stopping %s", name)
|
|
ff.stop_proc(proc)
|
|
self._procs.clear()
|
|
|
|
def _start_stderr_reader(self, name, proc):
|
|
def _read():
|
|
for line in proc.stderr:
|
|
text = line.decode("utf-8", errors="replace").rstrip()
|
|
if text:
|
|
log.debug("[%s] %s", name, text)
|
|
log.info("[%s] exited: %s", name, proc.poll())
|
|
|
|
Thread(target=_read, daemon=True, name=f"{name}_stderr").start()
|