233 lines
8.6 KiB
Python
233 lines
8.6 KiB
Python
"""
|
|
StreamManager: orchestrates ffmpeg pipelines for receiving, recording,
|
|
frame extraction, and audio extraction from a muxed mpegts/TCP stream.
|
|
|
|
All data goes to disk. UI reads from disk.
|
|
All ffmpeg commands go through cht.stream.ffmpeg module.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from threading import Thread
|
|
|
|
from cht.config import (
|
|
STREAM_HOST,
|
|
STREAM_PORT,
|
|
SEGMENT_DURATION,
|
|
SCENE_THRESHOLD,
|
|
MAX_FRAME_INTERVAL,
|
|
SESSIONS_DIR,
|
|
)
|
|
from cht.stream import ffmpeg as ff
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamManager:
|
|
def __init__(self, session_id=None):
|
|
if session_id is None:
|
|
session_id = time.strftime("%Y%m%d_%H%M%S")
|
|
self.session_id = session_id
|
|
self.session_dir = SESSIONS_DIR / session_id
|
|
self.stream_dir = self.session_dir / "stream"
|
|
self.frames_dir = self.session_dir / "frames"
|
|
self.transcript_dir = self.session_dir / "transcript"
|
|
self.agent_dir = self.session_dir / "agent"
|
|
|
|
self._procs = {}
|
|
self._threads = {}
|
|
self._stop_flags = set()
|
|
log.info("StreamManager created: session=%s dir=%s", session_id, self.session_dir)
|
|
|
|
def setup_dirs(self):
|
|
for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.agent_dir):
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
log.info("Session directories created")
|
|
|
|
@property
|
|
def stream_url(self):
|
|
return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen"
|
|
|
|
def start_all(self):
|
|
self.setup_dirs()
|
|
self.start_recorder()
|
|
self.start_frame_extractor()
|
|
|
|
def stop_all(self):
|
|
log.info("Stopping all processes...")
|
|
self._stop_flags.add("stop")
|
|
for name, proc in self._procs.items():
|
|
log.info("Stopping %s (pid=%s)", name, proc.pid if proc else "?")
|
|
ff.stop_proc(proc)
|
|
self._procs.clear()
|
|
log.info("All processes stopped")
|
|
|
|
def start_recorder(self):
|
|
node = ff.receive_and_segment(
|
|
self.stream_url, self.stream_dir, SEGMENT_DURATION,
|
|
)
|
|
proc = ff.run_async(node, pipe_stderr=True)
|
|
self._procs["recorder"] = proc
|
|
log.info("Recorder started: pid=%s url=%s", proc.pid, self.stream_url)
|
|
self._start_stderr_reader("recorder", proc)
|
|
|
|
def start_receiver_pipe(self):
|
|
"""Receive stream, pipe stdout to mpv, save segments to disk."""
|
|
self.setup_dirs()
|
|
node = ff.receive_to_pipe(
|
|
self.stream_url,
|
|
segment_dir=self.stream_dir,
|
|
segment_duration=SEGMENT_DURATION,
|
|
)
|
|
proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True)
|
|
self._procs["receiver"] = proc
|
|
log.info("Receiver started: pid=%s url=%s (pipe + segments)", proc.pid, self.stream_url)
|
|
self._start_stderr_reader("receiver", proc)
|
|
return proc
|
|
|
|
def start_recorder_with_monitor(self):
|
|
self.setup_dirs()
|
|
fifo_path = self.session_dir / "monitor.pipe"
|
|
|
|
node = ff.receive_and_segment_with_monitor(
|
|
self.stream_url, self.stream_dir, fifo_path, SEGMENT_DURATION,
|
|
)
|
|
proc = ff.run_async(node, pipe_stderr=True)
|
|
self._procs["recorder"] = proc
|
|
log.info("Recorder+monitor started: pid=%s url=%s fifo=%s", proc.pid, self.stream_url, fifo_path)
|
|
self._start_stderr_reader("recorder", proc)
|
|
return fifo_path
|
|
|
|
def _start_stderr_reader(self, name, proc):
|
|
"""Read stderr from a process in a thread and log it."""
|
|
def _read():
|
|
try:
|
|
for line in proc.stderr:
|
|
text = line.decode("utf-8", errors="replace").rstrip()
|
|
if text:
|
|
log.info("[%s:stderr] %s", name, text)
|
|
except Exception as e:
|
|
log.warning("[%s:stderr] read error: %s", name, e)
|
|
retcode = proc.poll()
|
|
log.info("[%s] process exited: code=%s", name, retcode)
|
|
|
|
t = Thread(target=_read, daemon=True, name=f"{name}_stderr")
|
|
t.start()
|
|
|
|
def start_frame_extractor_on_recording(self, recording_path):
|
|
"""Extract frames periodically from a growing recording file."""
|
|
log.info("Starting frame extractor on recording: %s", recording_path)
|
|
self._recording_path = recording_path
|
|
self._start_recording_frame_watcher()
|
|
|
|
def _start_recording_frame_watcher(self):
|
|
def _watch():
|
|
last_size = 0
|
|
log.info("Recording frame watcher running, watching %s", self._recording_path)
|
|
while "stop" not in self._stop_flags:
|
|
if self._recording_path.exists():
|
|
size = self._recording_path.stat().st_size
|
|
if size > last_size and size > 100_000: # wait for some data
|
|
log.info("Recording grew: %d -> %d bytes, extracting frames", last_size, size)
|
|
last_size = size
|
|
self._extract_frames_from_file(self._recording_path)
|
|
time.sleep(10) # check every 10s
|
|
log.info("Recording frame watcher stopped")
|
|
|
|
t = Thread(target=_watch, daemon=True, name="recording_frame_watcher")
|
|
t.start()
|
|
self._threads["recording_frame_watcher"] = t
|
|
|
|
def start_frame_extractor(self):
|
|
log.info("Starting frame watcher...")
|
|
self._start_frame_watcher()
|
|
|
|
def _start_frame_watcher(self):
|
|
def _watch():
|
|
seen = set()
|
|
log.info("Frame watcher running, watching %s", self.stream_dir)
|
|
while "stop" not in self._stop_flags:
|
|
segments = sorted(self.stream_dir.glob("segment_*.ts"))
|
|
for seg in segments:
|
|
if seg.name not in seen and seg.stat().st_size > 0:
|
|
seen.add(seg.name)
|
|
log.info("New segment found: %s (%d bytes)", seg.name, seg.stat().st_size)
|
|
self._extract_frames_from_file(seg)
|
|
time.sleep(2)
|
|
log.info("Frame watcher stopped")
|
|
|
|
t = Thread(target=_watch, daemon=True, name="frame_watcher")
|
|
t.start()
|
|
self._threads["frame_watcher"] = t
|
|
|
|
def _extract_frames_from_file(self, segment_path):
|
|
existing = list(self.frames_dir.glob("F*.jpg"))
|
|
start_num = len(existing) + 1
|
|
log.info("Extracting frames from %s (start_num=%d)", segment_path.name, start_num)
|
|
|
|
try:
|
|
_stdout, stderr = ff.extract_scene_frames(
|
|
segment_path,
|
|
self.frames_dir,
|
|
scene_threshold=SCENE_THRESHOLD,
|
|
max_interval=MAX_FRAME_INTERVAL,
|
|
start_number=start_num,
|
|
)
|
|
if stderr:
|
|
for line in stderr.splitlines()[:10]:
|
|
log.debug("[frame_extract:stderr] %s", line)
|
|
self._parse_frame_timestamps(stderr, start_num)
|
|
new_frames = list(self.frames_dir.glob("F*.jpg"))
|
|
log.info("Frame extraction done: %d new frames", len(new_frames) - len(existing))
|
|
except Exception as e:
|
|
log.error("Frame extraction failed for %s: %s", segment_path.name, e)
|
|
|
|
def _parse_frame_timestamps(self, stderr_output, start_num):
|
|
index_path = self.frames_dir / "index.json"
|
|
if index_path.exists():
|
|
with open(index_path) as f:
|
|
index = json.load(f)
|
|
else:
|
|
index = []
|
|
|
|
frame_num = start_num
|
|
for line in stderr_output.splitlines():
|
|
if "showinfo" not in line:
|
|
continue
|
|
pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
|
|
if pts_match:
|
|
pts_time = float(pts_match.group(1))
|
|
frame_id = f"F{frame_num:04d}"
|
|
frame_path = self.frames_dir / f"{frame_id}.jpg"
|
|
if frame_path.exists():
|
|
index.append({
|
|
"id": frame_id,
|
|
"timestamp": pts_time,
|
|
"path": str(frame_path),
|
|
"sent_to_agent": False,
|
|
})
|
|
log.info("Indexed frame %s at pts=%.2f", frame_id, pts_time)
|
|
frame_num += 1
|
|
|
|
with open(index_path, "w") as f:
|
|
json.dump(index, f, indent=2)
|
|
|
|
def start_audio_extractor(self):
|
|
"""Will be implemented in Phase 3."""
|
|
pass
|
|
|
|
def get_ffplay_cmd(self):
|
|
fifo_path = self.session_dir / "monitor.pipe"
|
|
return [
|
|
"ffplay",
|
|
"-hwaccel", "cuda",
|
|
"-fflags", "nobuffer",
|
|
"-flags", "low_delay",
|
|
"-framedrop",
|
|
"-i", str(fifo_path),
|
|
], fifo_path
|