working the player

This commit is contained in:
2026-04-01 19:23:17 -03:00
parent 68802db15c
commit 0f7e4424bc
13 changed files with 1013 additions and 571 deletions

View File

@@ -13,40 +13,78 @@ import ffmpeg
log = logging.getLogger(__name__)
GLOBAL_ARGS = ("-hide_banner", "-loglevel", "warning")
GLOBAL_ARGS = ("-hide_banner",)
# Note: scene detection needs -loglevel info for showinfo filter output.
# Individual pipelines can override with .global_args()
QUIET_ARGS = ("-hide_banner", "-loglevel", "warning")
def receive_and_record(stream_url, output_path):
"""Receive mpegts stream and write to a single growing file.
"""Receive mpegts stream and write to MKV file.
mpv reads this file for DVR-style playback.
ffmpeg scene detection runs on this file for frame extraction.
Audio is preserved in the recording (muxed mpegts).
MKV (Matroska) is used because:
- Handles incomplete writes gracefully (like OBS default)
- Proper timestamps for seeking and duration detection
- mpv plays growing MKV files better than mpegts
"""
stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay")
return (
ffmpeg.output(stream, str(output_path), c="copy", f="mpegts")
.global_args(*GLOBAL_ARGS)
ffmpeg.output(
stream, str(output_path),
c="copy",
f="matroska",
flush_packets=1,
)
.global_args(*QUIET_ARGS)
)
def extract_scene_frames(input_path, output_dir, scene_threshold=0.3,
max_interval=30, start_number=1, start_time=0.0):
"""Extract frames from a file on scene change.
def receive_record_and_relay(stream_url, output_path, relay_url):
"""Receive TCP stream, write to MKV, and relay to UDP loopback for live display.
Uses ffmpeg select filter with scene detection and a max-interval fallback.
start_time: skip to this position before processing (avoids re-scanning).
Uses ffmpeg tee via merge_outputs: one ffmpeg process handles both outputs
from the same decoded input, keeping them in sync with identical timestamps.
"""
stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay")
file_out = ffmpeg.output(
stream, str(output_path),
c="copy", f="matroska", flush_packets=1,
)
relay_out = ffmpeg.output(
stream, relay_url,
c="copy", f="mpegts",
)
return ffmpeg.merge_outputs(file_out, relay_out).global_args(*QUIET_ARGS)
def extract_scene_frames(input_path, output_dir, scene_threshold=0.10,
start_number=1, start_time=0.0, duration=None):
"""Extract frames from a file on scene change only (no interval fallback).
Frames are a chronological storyboard — captured whenever content changes
meaningfully vs the previous frame. No periodic fallback so static content
produces no spurious frames.
start_time/duration: applied via the select filter expression (NOT as -ss/-t
input options, which break h264 scene detection on MKV).
Returns (stdout, stderr) as decoded strings for timestamp parsing.
"""
select_expr = (
f"gt(scene,{scene_threshold})"
f"+gte(t-prev_selected_t,{max_interval})"
)
input_opts = {}
if start_time > 0:
input_opts["ss"] = str(start_time)
scene_expr = f"gt(scene,{scene_threshold})"
stream = ffmpeg.input(str(input_path), **input_opts)
# Add time range filter if specified (incremental processing)
time_conditions = []
if start_time > 0:
time_conditions.append(f"gte(t,{start_time})")
if duration is not None:
time_conditions.append(f"lte(t,{start_time + duration})")
if time_conditions:
time_filter = "*".join(time_conditions)
select_expr = f"({scene_expr})*{time_filter}"
else:
select_expr = scene_expr
stream = ffmpeg.input(str(input_path))
stream = stream.filter("select", select_expr).filter("showinfo")
output = (
@@ -61,7 +99,14 @@ def extract_scene_frames(input_path, output_dir, scene_threshold=0.3,
)
log.info("extract_scene_frames: %s", " ".join(output.compile()))
stdout, stderr = output.run(capture_stdout=True, capture_stderr=True)
try:
stdout, stderr = output.run(capture_stdout=True, capture_stderr=True)
except ffmpeg.Error as e:
# ffmpeg may exit non-zero on growing files (corrupt tail) but still
# produce valid frames. Return the stderr for parsing anyway.
log.debug("ffmpeg exited with error (may still have valid frames)")
stdout = e.stdout or b""
stderr = e.stderr or b""
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")

View File

@@ -1,11 +1,10 @@
"""
StreamManager: orchestrates ffmpeg pipelines for receiving, recording,
and frame extraction from a muxed mpegts/TCP stream.
StreamManager: orchestrates ffmpeg for recording and scene detection.
Architecture:
sender → TCP:4444 → ffmpeg (writes growing recording.ts)
└→ mpv plays recording.ts (DVR: live edge + scrub)
→ ffmpeg scene detection (periodic on recording)
sender → TCP:4444 → ffmpeg (writes recording.ts)
recording.ts → mpv (plays via Timeline)
recording.ts → ffmpeg scene detection (periodic, incremental)
"""
import json
@@ -18,8 +17,8 @@ from threading import Thread
from cht.config import (
STREAM_HOST,
STREAM_PORT,
RELAY_PORT,
SCENE_THRESHOLD,
MAX_FRAME_INTERVAL,
SESSIONS_DIR,
)
from cht.stream import ffmpeg as ff
@@ -41,71 +40,83 @@ class StreamManager:
self._procs = {}
self._threads = {}
self._stop_flags = set()
log.info("StreamManager created: session=%s dir=%s", session_id, self.session_dir)
log.info("Session: %s", session_id)
def setup_dirs(self):
for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.agent_dir):
d.mkdir(parents=True, exist_ok=True)
log.info("Session directories created")
@property
def stream_url(self):
return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen"
@property
def relay_url(self):
return f"udp://127.0.0.1:{RELAY_PORT}"
@property
def recording_path(self):
return self.stream_dir / "recording.ts"
return self.stream_dir / "recording.mkv"
# -- Recording --
def start_recorder(self):
"""Start ffmpeg to receive TCP stream and write to recording.ts."""
node = ff.receive_and_record(self.stream_url, self.recording_path)
"""Start ffmpeg to receive TCP stream, write to MKV, and relay to UDP."""
node = ff.receive_record_and_relay(self.stream_url, self.recording_path, self.relay_url)
proc = ff.run_async(node, pipe_stderr=True)
self._procs["recorder"] = proc
log.info("Recorder started: pid=%s url=%s%s", proc.pid, self.stream_url, self.recording_path)
log.info("Recorder: pid=%s%s", proc.pid, self.recording_path)
self._start_stderr_reader("recorder", proc)
# -- Scene detection --
# -- Scene Detection --
def start_scene_detector(self):
"""Periodically run ffmpeg scene detection on the growing recording.
def start_scene_detector(self, on_new_frames=None):
"""Periodically run scene detection on new portions of the recording.
Tracks how far we've processed to avoid re-scanning from the start.
Args:
on_new_frames: callback(list of {id, timestamp, path}) for new frames
"""
log.info("Starting scene detector (threshold=%.2f, interval=%ds)",
SCENE_THRESHOLD, MAX_FRAME_INTERVAL)
self._on_new_frames = on_new_frames
def _detect():
last_processed_size = 0
processed_duration = 0.0 # seconds already processed
processed_time = 0.0
frame_count = 0
while "stop" not in self._stop_flags:
time.sleep(10)
time.sleep(5)
if not self.recording_path.exists():
continue
size = self.recording_path.stat().st_size
if size <= last_processed_size or size < 100_000:
if size < 100_000:
continue
log.info("Recording grew: %d%d bytes, scanning from %.1fs",
last_processed_size, size, processed_duration)
last_processed_size = size
# Get current duration. Use a 6s safety margin — MKV tail can
# be corrupt for several seconds after the last flush, causing
# ffmpeg to crash even with a 3s margin.
safe_duration = self._estimate_safe_duration()
if safe_duration is None or safe_duration <= processed_time + 8:
continue
try:
new_count, new_duration = self._extract_new_frames(
self.recording_path,
start_time=processed_duration,
start_number=frame_count + 1,
)
if new_count > 0:
frame_count += new_count
log.info("Found %d new frames (total: %d)", new_count, frame_count)
if new_duration > processed_duration:
processed_duration = new_duration
except Exception as e:
log.error("Scene detection failed: %s", e)
# Process from last checkpoint to safe point
process_to = safe_duration - 6 # 6s safety margin for MKV tail
if process_to <= processed_time:
continue
log.info("Scene detection: %.1fs → %.1fs", processed_time, process_to)
new_frames = self._detect_scenes(
start_time=processed_time,
end_time=process_to,
start_number=frame_count + 1,
)
if new_frames:
frame_count += len(new_frames)
log.info("Found %d new scene frames (total: %d)", len(new_frames), frame_count)
if self._on_new_frames:
self._on_new_frames(new_frames)
processed_time = process_to
log.info("Scene detector stopped")
@@ -113,39 +124,46 @@ class StreamManager:
t.start()
self._threads["scene_detector"] = t
def _extract_new_frames(self, path, start_time=0.0, start_number=1):
"""Extract scene-change frames starting from a given timestamp.
def _estimate_safe_duration(self):
"""Estimate recording duration. Uses ffprobe, falls back to file size."""
try:
import ffmpeg as ffmpeg_lib
info = ffmpeg_lib.probe(str(self.recording_path))
dur = float(info.get("format", {}).get("duration", 0))
if dur > 0:
return dur
except Exception:
pass
Returns (new_frame_count, max_timestamp_seen).
"""
# Fallback: rough estimate from file size (~500kbit/s typical for this stream)
try:
size = self.recording_path.stat().st_size
return size / 65_000 # ~500kbps → 62.5 KB/s
except Exception:
return None
def _detect_scenes(self, start_time, end_time, start_number):
"""Run ffmpeg scene detection on a time range. Returns list of new frame entries."""
duration = end_time - start_time
existing_before = set(f.name for f in self.frames_dir.glob("F*.jpg"))
try:
_stdout, stderr = ff.extract_scene_frames(
path,
self.recording_path,
self.frames_dir,
scene_threshold=SCENE_THRESHOLD,
max_interval=MAX_FRAME_INTERVAL,
start_number=start_number,
start_time=start_time,
duration=duration,
)
except Exception as e:
log.error("ffmpeg scene extraction error: %s", e)
return 0, start_time
log.error("Scene detection failed: %s", e)
return []
if stderr:
for line in stderr.splitlines()[:5]:
log.debug("[scene_detect:stderr] %s", line)
# Parse timestamps and update index
max_ts = start_time
new_count = 0
# Parse new frames from showinfo output
new_frames = []
index_path = self.frames_dir / "index.json"
if index_path.exists():
with open(index_path) as f:
index = json.load(f)
else:
index = []
index = json.loads(index_path.read_text()) if index_path.exists() else []
frame_num = start_number
for line in stderr.splitlines():
@@ -157,45 +175,35 @@ class StreamManager:
frame_id = f"F{frame_num:04d}"
frame_path = self.frames_dir / f"{frame_id}.jpg"
if frame_path.exists() and frame_path.name not in existing_before:
index.append({
entry = {
"id": frame_id,
"timestamp": pts_time,
"path": str(frame_path),
"sent_to_agent": False,
})
log.info("Indexed frame %s at pts=%.2f", frame_id, pts_time)
new_count += 1
if pts_time > max_ts:
max_ts = pts_time
}
index.append(entry)
new_frames.append(entry)
frame_num += 1
with open(index_path, "w") as f:
json.dump(index, f, indent=2)
return new_count, max_ts
index_path.write_text(json.dumps(index, indent=2))
return new_frames
# -- Lifecycle --
def stop_all(self):
log.info("Stopping all processes...")
log.info("Stopping all...")
self._stop_flags.add("stop")
for name, proc in self._procs.items():
log.info("Stopping %s (pid=%s)", name, proc.pid if proc else "?")
log.info("Stopping %s", name)
ff.stop_proc(proc)
self._procs.clear()
log.info("All processes stopped")
def _start_stderr_reader(self, name, proc):
def _read():
try:
for line in proc.stderr:
text = line.decode("utf-8", errors="replace").rstrip()
if text:
log.info("[%s:stderr] %s", name, text)
except Exception as e:
log.warning("[%s:stderr] read error: %s", name, e)
retcode = proc.poll()
log.info("[%s] process exited: code=%s", name, retcode)
for line in proc.stderr:
text = line.decode("utf-8", errors="replace").rstrip()
if text:
log.debug("[%s] %s", name, text)
log.info("[%s] exited: %s", name, proc.poll())
t = Thread(target=_read, daemon=True, name=f"{name}_stderr")
t.start()
Thread(target=_read, daemon=True, name=f"{name}_stderr").start()

83
cht/stream/tracker.py Normal file
View File

@@ -0,0 +1,83 @@
"""
RecordingTracker: monitors the growing recording file and estimates duration.
Polls file size periodically. Uses ffprobe occasionally for accurate
duration calibration. Feeds duration updates to the Timeline.
"""
import json
import logging
import subprocess
import time
from pathlib import Path
from threading import Thread
import ffmpeg as ffmpeg_lib
log = logging.getLogger(__name__)
class RecordingTracker:
"""Tracks a growing recording file and estimates its duration."""
def __init__(self, recording_path, on_duration_update=None):
self._path = recording_path
self._on_duration = on_duration_update
self._duration = 0.0
self._avg_bitrate = None # bytes per second, calibrated by ffprobe
self._stop = False
self._thread = None
@property
def duration(self):
return self._duration
def start(self):
self._stop = False
self._thread = Thread(target=self._poll_loop, daemon=True, name="rec_tracker")
self._thread.start()
log.info("RecordingTracker started: %s", self._path)
def stop(self):
self._stop = True
log.info("RecordingTracker stopped")
def _poll_loop(self):
probe_interval = 0 # probe on first data
cycles = 0
while not self._stop:
time.sleep(2)
if not self._path.exists():
continue
size = self._path.stat().st_size
if size < 10_000:
continue
# Calibrate with ffprobe every ~30s or on first data
cycles += 1
if self._avg_bitrate is None or cycles % 15 == 0:
probed = self._probe_duration()
if probed and probed > 0 and size > 0:
self._avg_bitrate = size / probed
self._duration = probed
log.info("Probed duration: %.1fs (bitrate: %.0f B/s)",
probed, self._avg_bitrate)
elif self._avg_bitrate:
# Estimate from file size between probes
self._duration = size / self._avg_bitrate
if self._on_duration and self._duration > 0:
self._on_duration(self._duration)
def _probe_duration(self):
"""Use ffprobe to get accurate duration of the recording."""
try:
info = ffmpeg_lib.probe(str(self._path))
duration = float(info.get("format", {}).get("duration", 0))
return duration
except Exception as e:
log.debug("ffprobe failed (file still growing): %s", e)
return None