154 lines
5.3 KiB
Python
154 lines
5.3 KiB
Python
"""
|
|
Thin wrapper around ffmpeg-python for building and running ffmpeg pipelines.
|
|
|
|
All ffmpeg command construction goes through this module.
|
|
Uses ffmpeg-python's own run/run_async for subprocess management.
|
|
"""
|
|
|
|
import logging
|
|
import signal
|
|
import subprocess
|
|
|
|
import ffmpeg
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
GLOBAL_ARGS = ("-hide_banner",)
|
|
# Note: scene detection needs -loglevel info for showinfo filter output.
|
|
# Individual pipelines can override with .global_args()
|
|
QUIET_ARGS = ("-hide_banner", "-loglevel", "warning")
|
|
|
|
|
|
def receive_and_record(stream_url, output_path):
|
|
"""Receive mpegts stream and write to MKV file.
|
|
|
|
MKV (Matroska) is used because:
|
|
- Handles incomplete writes gracefully (like OBS default)
|
|
- Proper timestamps for seeking and duration detection
|
|
- mpv plays growing MKV files better than mpegts
|
|
"""
|
|
stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay")
|
|
return (
|
|
ffmpeg.output(
|
|
stream, str(output_path),
|
|
c="copy",
|
|
f="matroska",
|
|
flush_packets=1,
|
|
)
|
|
.global_args(*QUIET_ARGS)
|
|
)
|
|
|
|
|
|
def receive_record_and_relay(stream_url, output_path, relay_url):
|
|
"""Receive TCP stream, write to fragmented MP4, and relay to UDP loopback.
|
|
|
|
Fragmented MP4 (frag_keyframe+empty_moov) avoids MKV tail corruption:
|
|
each keyframe boundary closes a self-contained fragment, so the file is
|
|
always valid up to the last complete fragment (~1 keyframe interval ≈ 2s).
|
|
This allows the scene detector to use a 2s safety margin instead of 6s.
|
|
|
|
Uses ffmpeg tee via merge_outputs: one process, identical timestamps.
|
|
"""
|
|
stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay")
|
|
file_out = ffmpeg.output(
|
|
stream, str(output_path),
|
|
c="copy", f="mp4",
|
|
movflags="frag_keyframe+empty_moov+default_base_moof",
|
|
flush_packets=1,
|
|
**{"bsf:a": "aac_adtstoasc"},
|
|
)
|
|
relay_out = ffmpeg.output(
|
|
stream, relay_url,
|
|
c="copy", f="mpegts",
|
|
)
|
|
return ffmpeg.merge_outputs(file_out, relay_out).global_args(*QUIET_ARGS)
|
|
|
|
|
|
def extract_scene_frames(input_path, output_dir, scene_threshold=0.10,
|
|
start_number=1, start_time=0.0, duration=None):
|
|
"""Extract frames from a file on scene change only (no interval fallback).
|
|
|
|
Frames are a chronological storyboard — captured whenever content changes
|
|
meaningfully vs the previous frame. No periodic fallback so static content
|
|
produces no spurious frames.
|
|
|
|
start_time/duration: applied via the select filter expression (NOT as -ss/-t
|
|
input options, which break h264 scene detection on MKV).
|
|
Returns (stdout, stderr) as decoded strings for timestamp parsing.
|
|
"""
|
|
scene_expr = f"gt(scene,{scene_threshold})"
|
|
|
|
# Add time range filter if specified (incremental processing)
|
|
time_conditions = []
|
|
if start_time > 0:
|
|
time_conditions.append(f"gte(t,{start_time})")
|
|
if duration is not None:
|
|
time_conditions.append(f"lte(t,{start_time + duration})")
|
|
|
|
if time_conditions:
|
|
time_filter = "*".join(time_conditions)
|
|
select_expr = f"({scene_expr})*{time_filter}"
|
|
else:
|
|
select_expr = scene_expr
|
|
|
|
stream = ffmpeg.input(str(input_path))
|
|
stream = stream.filter("select", select_expr).filter("showinfo")
|
|
|
|
output = (
|
|
ffmpeg.output(
|
|
stream,
|
|
str(output_dir / "F%04d.jpg"),
|
|
vsync="vfr",
|
|
**{"q:v": "2"},
|
|
start_number=start_number,
|
|
)
|
|
.global_args(*GLOBAL_ARGS)
|
|
)
|
|
|
|
log.info("extract_scene_frames: %s", " ".join(output.compile()))
|
|
try:
|
|
stdout, stderr = output.run(capture_stdout=True, capture_stderr=True)
|
|
except ffmpeg.Error as e:
|
|
# ffmpeg may exit non-zero on growing files (corrupt tail) but still
|
|
# produce valid frames. Return the stderr for parsing anyway.
|
|
stderr = e.stderr or b""
|
|
err_text = stderr.decode("utf-8", errors="replace")
|
|
# Log the last meaningful line so we can see the real cause
|
|
for line in reversed(err_text.splitlines()):
|
|
if line.strip() and not line.startswith(" "):
|
|
log.debug("ffmpeg scene error: %s", line.strip())
|
|
break
|
|
stdout = e.stdout or b""
|
|
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
|
|
|
|
|
|
def extract_frame_at(input_path, output_path, timestamp):
|
|
"""Extract a single frame at the given timestamp."""
|
|
output = (
|
|
ffmpeg.input(str(input_path), ss=timestamp)
|
|
.output(str(output_path), vframes=1, **{"q:v": "2"})
|
|
.overwrite_output()
|
|
.global_args(*QUIET_ARGS)
|
|
)
|
|
log.info("extract_frame_at: %s", " ".join(output.compile()))
|
|
output.run(capture_stdout=True, capture_stderr=True)
|
|
|
|
|
|
def run_async(output_node, pipe_stdout=False, pipe_stderr=False):
|
|
"""Start an ffmpeg pipeline asynchronously via ffmpeg-python's run_async."""
|
|
log.info("run_async: %s", " ".join(output_node.compile()))
|
|
return output_node.run_async(
|
|
pipe_stdout=pipe_stdout,
|
|
pipe_stderr=pipe_stderr,
|
|
)
|
|
|
|
|
|
def stop_proc(proc, timeout=5):
|
|
"""Gracefully stop an ffmpeg subprocess."""
|
|
if proc and proc.poll() is None:
|
|
proc.send_signal(signal.SIGINT)
|
|
try:
|
|
proc.wait(timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|