Files
mitus/cht/stream/manager.py
2026-04-03 11:05:42 -03:00

465 lines
17 KiB
Python

"""
StreamManager: orchestrates ffmpeg for recording and scene detection.
Architecture:
sender → TCP:4444 → single ffmpeg process:
1. writes fMP4 to disk (c=copy)
2. relays UDP for live display (c=copy)
3. CUDA decode → scene filter → JPEG frames (real-time)
"""
import json
import logging
import os
import re
import time
from queue import Queue, Empty
from threading import Thread
from cht.config import (
STREAM_HOST,
STREAM_PORT,
RELAY_PORT,
SCENE_THRESHOLD,
SESSIONS_DIR,
AUDIO_EXTRACT_INTERVAL,
AUDIO_SAFETY_MARGIN,
)
from cht.stream import ffmpeg as ff
log = logging.getLogger(__name__)
def list_sessions():
"""Return list of (session_id, session_dir) sorted newest first."""
if not SESSIONS_DIR.exists():
return []
sessions = []
for d in sorted(SESSIONS_DIR.iterdir(), reverse=True):
if d.is_dir() and (d / "frames").exists():
sessions.append((d.name, d))
return sessions
def delete_sessions(session_ids):
"""Delete session directories by ID."""
import shutil
for sid in session_ids:
path = SESSIONS_DIR / sid
if path.exists() and path.is_dir():
shutil.rmtree(path)
log.info("Deleted session: %s", sid)
class StreamManager:
def __init__(self, session_id=None):
if session_id is None:
session_id = time.strftime("%Y%m%d_%H%M%S")
self.session_id = session_id
self.session_dir = SESSIONS_DIR / session_id
self.stream_dir = self.session_dir / "stream"
self.frames_dir = self.session_dir / "frames"
self.transcript_dir = self.session_dir / "transcript"
self.audio_dir = self.session_dir / "audio"
self.agent_dir = self.session_dir / "agent"
self._procs = {}
self._threads = {}
self._stop_flags = set()
self._segment = 0
self._segment_offsets = {0: 0.0} # segment_index → global_offset
self.scene_threshold = SCENE_THRESHOLD
self.readonly = False # True when loaded from existing session
self.telemetry = None # set by window after start
log.info("Session: %s", session_id)
@classmethod
def from_existing(cls, session_id):
"""Load an existing session without starting any ffmpeg processes."""
from cht.session import rebuild_manifest
mgr = cls(session_id=session_id)
if not mgr.session_dir.exists():
raise FileNotFoundError(f"Session not found: {session_id}")
mgr.readonly = True
# Point _segment to last recording segment
segments = mgr.recording_segments
if segments:
mgr._segment = len(segments) - 1
mgr._rebuild_offsets()
rebuild_manifest(mgr.session_dir)
log.info("Loaded existing session: %s (%d segments, %d frames)",
session_id, len(segments), mgr.frame_count)
return mgr
@property
def current_global_offset(self) -> float:
"""Global time offset for the current recording segment."""
return self._segment_offsets.get(self._segment, 0.0)
def _rebuild_offsets(self):
"""Compute global offsets from all segments on disk."""
from cht.session import probe_duration
offset = 0.0
self._segment_offsets = {}
for i, seg in enumerate(self.recording_segments):
self._segment_offsets[i] = offset
offset += probe_duration(seg)
def _advance_segment_offset(self, completed_segment_path):
"""Update offsets after a segment completes and a new one begins."""
from cht.session import probe_duration
dur = probe_duration(completed_segment_path)
prev_offset = self._segment_offsets.get(self._segment, 0.0)
self._segment_offsets[self._segment + 1] = prev_offset + dur
log.info("Segment %d completed (%.1fs), next offset: %.1fs",
self._segment, dur, prev_offset + dur)
@property
def frame_count(self):
index_path = self.frames_dir / "index.json"
if index_path.exists():
try:
return len(json.loads(index_path.read_text()))
except Exception:
pass
return 0
def total_duration(self):
"""Probe total duration across all segments (for completed sessions)."""
total = 0.0
for seg in self.recording_segments:
try:
import ffmpeg as ffmpeg_lib
info = ffmpeg_lib.probe(str(seg))
dur = float(info.get("format", {}).get("duration", 0))
if dur <= 0:
for s in info.get("streams", []):
sdur = float(s.get("duration", 0))
if sdur > 0:
dur = sdur
break
if dur <= 0:
dur = seg.stat().st_size / 65_000
total += dur
except Exception:
total += seg.stat().st_size / 65_000
return total
def setup_dirs(self):
for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.audio_dir, self.agent_dir):
d.mkdir(parents=True, exist_ok=True)
@property
def stream_url(self):
return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen"
@property
def relay_url(self):
return f"udp://127.0.0.1:{RELAY_PORT}"
@property
def recording_path(self):
"""Current recording segment path."""
return self.stream_dir / f"recording_{self._segment:03d}.mp4"
@property
def recording_segments(self):
"""All recording segments in order."""
return sorted(self.stream_dir.glob("recording_*.mp4"))
# -- Recording --
def start_recorder(self):
"""Start ffmpeg to receive TCP stream, write to fMP4, and relay to UDP."""
# Start after existing segments (for resumed sessions)
existing = self.recording_segments
self._segment = len(existing)
self._rebuild_offsets()
self._launch_recorder()
def restart_recorder(self):
"""Restart recorder into a new segment. Session stays alive."""
old = self._procs.pop("recorder", None)
if old:
ff.stop_proc(old)
completed_path = self.recording_path
self._advance_segment_offset(completed_path)
self._segment += 1
log.info("Restarting recorder → segment %d (offset %.1fs)",
self._segment, self.current_global_offset)
self._launch_recorder()
def recorder_alive(self):
"""Check if the recorder process is still running."""
proc = self._procs.get("recorder")
return proc is not None and proc.poll() is None
def _launch_recorder(self):
start_number = self._next_frame_number()
node = ff.receive_record_relay_and_detect(
self.stream_url, self.recording_path, self.relay_url,
scene_threshold=self.scene_threshold,
)
proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True)
self._procs["recorder"] = proc
log.info("Recorder+scene: pid=%s%s (threshold=%.2f, start_number=%d)",
proc.pid, self.recording_path, self.scene_threshold, start_number)
self._start_scene_readers(proc, start_number)
# -- Scene Detection --
def start_scene_detector(self, on_new_frames=None):
"""Register callback for new scene frames.
Scene detection runs inside the recorder process (single ffmpeg).
The stderr reader thread parses showinfo lines and fires this callback.
"""
self._on_new_frames = on_new_frames
def update_scene_threshold(self, new_threshold):
"""Update scene threshold. Restarts the recorder to apply new filter."""
self.scene_threshold = new_threshold
log.info("Threshold changed → %.2f, restarting recorder", new_threshold)
self.restart_recorder()
def _next_frame_number(self):
"""Determine next frame number from the index (source of truth)."""
index_path = self.frames_dir / "index.json"
if index_path.exists():
index = json.loads(index_path.read_text())
return len(index) + 1
return 1
def _append_frame_index(self, entry):
"""Append a frame entry to index.json."""
index_path = self.frames_dir / "index.json"
index = json.loads(index_path.read_text()) if index_path.exists() else []
index.append(entry)
index_path.write_text(json.dumps(index, indent=2))
def _start_scene_readers(self, proc, start_number):
"""Read scene frames from stdout (MJPEG pipe) and timestamps from stderr.
Two threads:
- stderr: parses showinfo lines, queues pts_time values
- stdout: reads JPEG frames from pipe, pairs with queued timestamps,
writes files to disk, fires callbacks immediately
showinfo fires before the JPEG encoder, so timestamps are always
queued before the corresponding JPEG data arrives on stdout.
"""
ts_queue = Queue()
def _read_stderr():
for raw in proc.stderr:
line = raw.decode("utf-8", errors="replace").rstrip()
if not line:
continue
if "showinfo" not in line:
log.debug("[recorder] %s", line)
continue
pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
if pts_match:
ts_queue.put(float(pts_match.group(1)))
log.info("[recorder] stderr closed, exit=%s", proc.poll())
def _read_stdout():
frame_num = start_number
offset = self.current_global_offset
buf = b""
raw_fd = proc.stdout.fileno()
while True:
# os.read on the raw fd returns as soon as ANY data is available
# (no Python buffered-IO blocking waiting to fill a buffer)
chunk = os.read(raw_fd, 65536)
if not chunk:
break
buf += chunk
# Split JPEG frames by SOI (0xFFD8) and EOI (0xFFD9) markers
while True:
soi = buf.find(b"\xff\xd8")
if soi < 0:
buf = b""
break
eoi = buf.find(b"\xff\xd9", soi + 2)
if eoi < 0:
buf = buf[soi:] # keep from SOI, need more data
break
jpeg_data = buf[soi:eoi + 2]
buf = buf[eoi + 2:]
# Get timestamp (showinfo fires before encode, so it's queued)
try:
pts_time = ts_queue.get(timeout=2.0)
except Empty:
log.warning("No timestamp for scene frame %d", frame_num)
pts_time = 0.0
frame_id = f"F{frame_num:04d}"
frame_path = self.frames_dir / f"{frame_id}.jpg"
frame_path.write_bytes(jpeg_data)
entry = {
"id": frame_id,
"timestamp": pts_time + offset,
"path": str(frame_path),
"sent_to_agent": False,
}
self._append_frame_index(entry)
log.info("Scene frame: %s at %.1fs (pts=%.1f + offset=%.1f)",
frame_id, entry["timestamp"], pts_time, offset)
if self._on_new_frames:
self._on_new_frames([entry])
frame_num += 1
log.info("[recorder] stdout closed")
Thread(target=_read_stderr, daemon=True, name="recorder_stderr").start()
Thread(target=_read_stdout, daemon=True, name="recorder_stdout").start()
def _probe_safe_duration(self):
"""Probe current recording duration via ffprobe. Returns seconds or None."""
try:
import ffmpeg as ffmpeg_lib
info = ffmpeg_lib.probe(str(self.recording_path))
dur = float(info.get("format", {}).get("duration", 0))
if dur > 0:
return dur
for stream in info.get("streams", []):
sdur = float(stream.get("duration", 0))
if sdur > 0:
return sdur
except Exception:
pass
try:
return self.recording_path.stat().st_size / 65_000
except Exception:
return None
def capture_now(self, on_new_frames=None):
"""Capture a single frame from the current recording position.
Grabs the latest available frame (safe_duration - 1s) and adds it
to the index. Runs in a thread to avoid blocking the UI.
"""
def _capture():
safe_duration = self._probe_safe_duration()
if not safe_duration or safe_duration < 1:
log.warning("capture_now: recording too short")
return
local_timestamp = safe_duration - 1
timestamp = local_timestamp + self.current_global_offset
frame_num = self._next_frame_number()
frame_id = f"F{frame_num:04d}"
frame_path = self.frames_dir / f"{frame_id}.jpg"
try:
ff.extract_frame_at(self.recording_path, frame_path, local_timestamp)
except Exception as e:
log.error("capture_now failed: %s", e)
return
if not frame_path.exists():
log.warning("capture_now: frame not written")
return
entry = {
"id": frame_id,
"timestamp": timestamp,
"path": str(frame_path),
"sent_to_agent": False,
}
self._append_frame_index(entry)
log.info("Manual capture: %s at %.1fs", frame_id, timestamp)
if on_new_frames:
on_new_frames([entry])
Thread(target=_capture, daemon=True, name="capture_now").start()
# -- Audio Extraction --
def start_audio_extractor(self, on_new_audio=None):
"""Periodically extract audio from the growing recording as WAV chunks.
Same incremental pattern as scene detector: polls recording, extracts
new time range, calls back with (wav_path, start_time, duration).
Args:
on_new_audio: callback(wav_path, start_time, duration)
"""
self._on_new_audio = on_new_audio
self.audio_dir.mkdir(parents=True, exist_ok=True)
def _extract():
processed_time = 0.0
chunk_num = 0
current_segment = None
while "stop" not in self._stop_flags:
time.sleep(AUDIO_EXTRACT_INTERVAL)
seg = self.recording_path
if not seg.exists():
continue
if seg != current_segment:
current_segment = seg
processed_time = 0.0
chunk_num = 0
log.info("Audio extractor: switched to %s", seg.name)
if seg.stat().st_size < 100_000:
continue
safe_duration = self._probe_safe_duration()
if safe_duration is None or safe_duration <= 0:
continue
process_to = safe_duration - AUDIO_SAFETY_MARGIN
if process_to <= processed_time + 1.0:
continue
chunk_duration = process_to - processed_time
wav_path = self.audio_dir / f"chunk_{chunk_num:04d}.wav"
try:
ff.extract_audio_chunk(
seg, wav_path,
start_time=processed_time,
duration=chunk_duration,
)
except Exception as e:
log.error("Audio extraction failed: %s", e)
continue
if wav_path.exists() and wav_path.stat().st_size > 100:
global_start = processed_time + self.current_global_offset
log.info("Audio chunk: %s (%.1fs → %.1fs, global %.1fs)",
wav_path.name, processed_time, process_to, global_start)
if self._on_new_audio:
self._on_new_audio(
wav_path, global_start, chunk_duration,
segment_path=seg, local_start=processed_time,
)
chunk_num += 1
processed_time = process_to
log.info("Audio extractor stopped")
t = Thread(target=_extract, daemon=True, name="audio_extractor")
t.start()
self._threads["audio_extractor"] = t
# -- Lifecycle --
def stop_all(self):
log.info("Stopping all...")
self._stop_flags.add("stop")
for name, proc in self._procs.items():
log.info("Stopping %s", name)
ff.stop_proc(proc)
self._procs.clear()