211 lines
7.3 KiB
Python
211 lines
7.3 KiB
Python
"""Tests for cht.stream.ffmpeg — pipeline construction and execution."""
|
|
|
|
import signal
|
|
import subprocess
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
import ffmpeg as ffmpeg_lib
|
|
|
|
from cht.stream import ffmpeg as ff
|
|
|
|
|
|
class TestReceiveAndRecord:
|
|
def test_compiles_to_valid_cmd(self, tmp_path):
|
|
node = ff.receive_and_record(
|
|
"tcp://0.0.0.0:4444?listen", tmp_path / "recording.ts"
|
|
)
|
|
cmd = node.compile()
|
|
cmd_str = " ".join(str(c) for c in cmd)
|
|
assert cmd[0] == "ffmpeg"
|
|
assert "tcp://0.0.0.0:4444?listen" in cmd_str
|
|
assert "recording.ts" in cmd_str
|
|
|
|
def test_input_has_low_latency_flags(self, tmp_path):
|
|
node = ff.receive_and_record(
|
|
"tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts"
|
|
)
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
assert "nobuffer" in cmd_str
|
|
assert "low_delay" in cmd_str
|
|
|
|
def test_copy_codec(self, tmp_path):
|
|
node = ff.receive_and_record(
|
|
"tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts"
|
|
)
|
|
assert "copy" in node.compile()
|
|
|
|
def test_has_global_args(self, tmp_path):
|
|
node = ff.receive_and_record(
|
|
"tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts"
|
|
)
|
|
assert "-hide_banner" in node.compile()
|
|
|
|
def test_matroska_format(self, tmp_path):
|
|
node = ff.receive_and_record(
|
|
"tcp://0.0.0.0:4444?listen", tmp_path / "rec.mkv"
|
|
)
|
|
assert "matroska" in node.compile()
|
|
|
|
|
|
class TestExtractSceneFrames:
|
|
def test_compiles_select_filter(self, tmp_path):
|
|
stream = ffmpeg_lib.input(str(tmp_path / "test.ts"))
|
|
stream = stream.filter("select", "gt(scene,0.3)+gte(t-prev_selected_t,30)")
|
|
stream = stream.filter("showinfo")
|
|
output = ffmpeg_lib.output(
|
|
stream,
|
|
str(tmp_path / "F%04d.jpg"),
|
|
vsync="vfr",
|
|
**{"q:v": "2"},
|
|
start_number=1,
|
|
)
|
|
cmd_str = " ".join(str(c) for c in output.compile())
|
|
assert "select" in cmd_str
|
|
assert "scene" in cmd_str
|
|
assert "showinfo" in cmd_str
|
|
assert "vfr" in cmd_str
|
|
|
|
def test_returns_decoded_strings(self, tmp_path):
|
|
mock_proc = MagicMock()
|
|
mock_proc.communicate.return_value = (b"out", b"err")
|
|
mock_proc.poll.return_value = 0
|
|
with patch("ffmpeg._run.run_async", return_value=mock_proc):
|
|
stdout, stderr = ff.extract_scene_frames(
|
|
tmp_path / "test.ts", tmp_path,
|
|
)
|
|
assert stdout == "out"
|
|
assert stderr == "err"
|
|
|
|
|
|
class TestRunAsync:
|
|
def test_compiles_valid_command(self, tmp_path):
|
|
node = ff.receive_and_record("tcp://0.0.0.0:4444?listen", tmp_path / "rec.ts")
|
|
cmd = node.compile()
|
|
assert cmd[0] == "ffmpeg"
|
|
assert "tcp://0.0.0.0:4444?listen" in " ".join(cmd)
|
|
|
|
|
|
class TestReceiveRecordRelayAndDetect:
|
|
"""P0 regression: single-process pipeline with 3 outputs + scene detection."""
|
|
|
|
def test_compiles_three_outputs(self, tmp_path):
|
|
node = ff.receive_record_relay_and_detect(
|
|
"tcp://0.0.0.0:4444?listen",
|
|
tmp_path / "rec.mp4",
|
|
"udp://127.0.0.1:4445",
|
|
)
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
# fMP4 recording
|
|
assert "rec.mp4" in cmd_str
|
|
# UDP relay
|
|
assert "udp://127.0.0.1:4445" in cmd_str
|
|
# MJPEG pipe (scene detection output)
|
|
assert "pipe:1" in cmd_str
|
|
|
|
def test_fmp4_flags(self, tmp_path):
|
|
node = ff.receive_record_relay_and_detect(
|
|
"tcp://0.0.0.0:4444?listen",
|
|
tmp_path / "rec.mp4",
|
|
"udp://127.0.0.1:4445",
|
|
)
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
assert "frag_keyframe" in cmd_str
|
|
assert "empty_moov" in cmd_str
|
|
|
|
def test_scene_filter_uses_threshold(self, tmp_path):
|
|
node = ff.receive_record_relay_and_detect(
|
|
"tcp://0.0.0.0:4444?listen",
|
|
tmp_path / "rec.mp4",
|
|
"udp://127.0.0.1:4445",
|
|
scene_threshold=0.25,
|
|
)
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
assert "0.25" in cmd_str
|
|
assert "scene" in cmd_str
|
|
assert "showinfo" in cmd_str
|
|
|
|
def test_flush_expression_included_when_flush_frames_gt_0(self, tmp_path):
|
|
"""P0 regression: flush trick must be present to push real frame through buffer."""
|
|
node = ff.receive_record_relay_and_detect(
|
|
"tcp://0.0.0.0:4444?listen",
|
|
tmp_path / "rec.mp4",
|
|
"udp://127.0.0.1:4445",
|
|
flush_frames=2,
|
|
)
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
# Flush expression: eq(n,prev_selected_n+1)*mod(selected_n,N)
|
|
assert "prev_selected_n" in cmd_str
|
|
assert "mod" in cmd_str
|
|
|
|
def test_no_flush_expression_when_flush_frames_zero(self, tmp_path):
|
|
"""P0 regression: flush=0 should produce a clean scene-only filter."""
|
|
node = ff.receive_record_relay_and_detect(
|
|
"tcp://0.0.0.0:4444?listen",
|
|
tmp_path / "rec.mp4",
|
|
"udp://127.0.0.1:4445",
|
|
flush_frames=0,
|
|
)
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
assert "prev_selected_n" not in cmd_str
|
|
|
|
def test_flush_mod_value_matches_flush_frames(self, tmp_path):
|
|
"""P0 regression: mod value must be flush_frames+1 to prevent chaining.
|
|
|
|
ffmpeg-python escapes commas in filtergraph as \\, so we check the
|
|
escaped form in the compiled command.
|
|
"""
|
|
for n in [1, 2, 3]:
|
|
node = ff.detect_scenes_from_pipe(flush_frames=n)
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
# Commas in filter expressions are escaped as \, in filtergraph
|
|
assert f"mod(selected_n\\,{n + 1})" in cmd_str
|
|
|
|
|
|
class TestDetectScenesFromPipe:
|
|
def test_reads_from_stdin(self):
|
|
node = ff.detect_scenes_from_pipe()
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
assert "pipe:0" in cmd_str
|
|
|
|
def test_writes_mjpeg_to_stdout(self):
|
|
node = ff.detect_scenes_from_pipe()
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
assert "pipe:1" in cmd_str
|
|
assert "mjpeg" in cmd_str
|
|
|
|
def test_includes_flush_expression(self):
|
|
node = ff.detect_scenes_from_pipe(flush_frames=2)
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
assert "prev_selected_n" in cmd_str
|
|
|
|
def test_h264_input_format(self):
|
|
node = ff.detect_scenes_from_pipe()
|
|
cmd_str = " ".join(str(c) for c in node.compile())
|
|
assert "h264" in cmd_str
|
|
|
|
|
|
class TestStopProc:
|
|
def test_sends_sigint_then_waits(self):
|
|
proc = MagicMock()
|
|
proc.poll.return_value = None
|
|
ff.stop_proc(proc, timeout=3)
|
|
proc.send_signal.assert_called_once_with(signal.SIGINT)
|
|
proc.wait.assert_called_once_with(timeout=3)
|
|
|
|
def test_kills_on_timeout(self):
|
|
proc = MagicMock()
|
|
proc.poll.return_value = None
|
|
proc.wait.side_effect = subprocess.TimeoutExpired("ffmpeg", 3)
|
|
ff.stop_proc(proc, timeout=3)
|
|
proc.kill.assert_called_once()
|
|
|
|
def test_noop_if_already_exited(self):
|
|
proc = MagicMock()
|
|
proc.poll.return_value = 0
|
|
ff.stop_proc(proc)
|
|
proc.send_signal.assert_not_called()
|
|
|
|
def test_noop_if_none(self):
|
|
ff.stop_proc(None)
|