Files
mitus/tests/test_ffmpeg.py
2026-04-01 13:59:31 -03:00

231 lines
7.9 KiB
Python

"""Tests for cht.stream.ffmpeg — command compilation and pipeline construction."""
import os
import signal
import subprocess
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from cht.stream import ffmpeg as ff
class TestReceiveAndSegment:
def test_compiles_to_valid_cmd(self, tmp_path):
node = ff.receive_and_segment(
"tcp://0.0.0.0:4444?listen", tmp_path, segment_duration=30
)
cmd = ff.compile_cmd(node)
assert cmd[0] == "ffmpeg"
assert "-hide_banner" in cmd
assert "-loglevel" in cmd
# Input options
assert "tcp://0.0.0.0:4444?listen" in cmd
# Segment output options
assert "segment" in cmd
assert "30" in cmd or 30 in cmd
assert str(tmp_path / "segment_%04d.ts") in cmd
def test_input_has_low_latency_flags(self, tmp_path):
node = ff.receive_and_segment(
"tcp://0.0.0.0:4444?listen", tmp_path
)
cmd = ff.compile_cmd(node)
cmd_str = " ".join(str(c) for c in cmd)
assert "nobuffer" in cmd_str
assert "low_delay" in cmd_str
def test_copy_codec(self, tmp_path):
node = ff.receive_and_segment(
"tcp://0.0.0.0:4444?listen", tmp_path
)
cmd = ff.compile_cmd(node)
assert "copy" in cmd
class TestReceiveAndSegmentWithMonitor:
def test_creates_fifo(self, tmp_path):
fifo = tmp_path / "monitor.pipe"
ff.receive_and_segment_with_monitor(
"tcp://0.0.0.0:4444?listen", tmp_path, fifo
)
assert fifo.exists()
assert os.path.isfile(fifo) or os.stat(fifo).st_mode & 0o010000 # is fifo
def test_does_not_recreate_existing_fifo(self, tmp_path):
fifo = tmp_path / "monitor.pipe"
os.mkfifo(str(fifo))
inode_before = fifo.stat().st_ino
ff.receive_and_segment_with_monitor(
"tcp://0.0.0.0:4444?listen", tmp_path, fifo
)
assert fifo.stat().st_ino == inode_before
def test_has_two_outputs(self, tmp_path):
fifo = tmp_path / "monitor.pipe"
node = ff.receive_and_segment_with_monitor(
"tcp://0.0.0.0:4444?listen", tmp_path, fifo
)
cmd = ff.compile_cmd(node)
cmd_str = " ".join(str(c) for c in cmd)
# Should have segment output and fifo output
assert "segment_%04d.ts" in cmd_str
assert "monitor.pipe" in cmd_str
def test_both_outputs_use_copy(self, tmp_path):
fifo = tmp_path / "monitor.pipe"
node = ff.receive_and_segment_with_monitor(
"tcp://0.0.0.0:4444?listen", tmp_path, fifo
)
cmd = ff.compile_cmd(node)
# Should have copy codec for both outputs
assert cmd.count("copy") >= 2
class TestExtractSceneFrames:
def test_compiles_select_filter(self, tmp_path):
# We can't run ffmpeg without a real file, but we can test compilation
import ffmpeg
stream = ffmpeg.input(str(tmp_path / "test.ts"))
stream = stream.filter("select", "gt(scene\\,0.3)+gte(t-prev_selected_t\\,30)")
stream = stream.filter("showinfo")
output = ffmpeg.output(
stream,
str(tmp_path / "F%04d.jpg"),
vsync="vfr",
**{"q:v": "2"},
start_number=1,
)
cmd = ff.compile_cmd(output)
cmd_str = " ".join(str(c) for c in cmd)
assert "select" in cmd_str
assert "scene" in cmd_str
assert "showinfo" in cmd_str
assert "vfr" in cmd_str
@patch("cht.stream.ffmpeg.subprocess.run")
def test_calls_subprocess_with_timeout(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(stdout="", stderr="")
ff.extract_scene_frames(
tmp_path / "test.ts", tmp_path,
scene_threshold=0.4, max_interval=20, start_number=5,
)
mock_run.assert_called_once()
call_kwargs = mock_run.call_args
assert call_kwargs.kwargs["timeout"] == 120
assert call_kwargs.kwargs["capture_output"] is True
@patch("cht.stream.ffmpeg.subprocess.run")
def test_returns_stdout_stderr(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(stdout="out", stderr="err")
stdout, stderr = ff.extract_scene_frames(
tmp_path / "test.ts", tmp_path,
)
assert stdout == "out"
assert stderr == "err"
@patch("cht.stream.ffmpeg.subprocess.run")
def test_start_number_in_cmd(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(stdout="", stderr="")
ff.extract_scene_frames(
tmp_path / "test.ts", tmp_path, start_number=42
)
cmd = mock_run.call_args.args[0]
assert "42" in [str(c) for c in cmd]
class TestExtractAudioPcm:
def test_compiles_audio_extraction(self, tmp_path):
node = ff.extract_audio_pcm(tmp_path / "test.ts")
cmd = ff.compile_cmd(node)
cmd_str = " ".join(str(c) for c in cmd)
assert "pcm_s16le" in cmd_str
assert "16000" in cmd_str
assert "pipe:" in cmd_str
assert "wav" in cmd_str
class TestCompileCmd:
def test_inserts_global_flags_after_ffmpeg(self, tmp_path):
import ffmpeg
node = ffmpeg.input(str(tmp_path / "test.ts")).output(str(tmp_path / "out.ts"))
cmd = ff.compile_cmd(node)
assert cmd[0] == "ffmpeg"
assert cmd[1] == "-hide_banner"
assert cmd[2] == "-loglevel"
assert cmd[3] == "warning"
class TestRunAsync:
@patch("cht.stream.ffmpeg.subprocess.Popen")
def test_default_no_pipes(self, mock_popen, tmp_path):
import ffmpeg
node = ffmpeg.input("test").output("out")
ff.run_async(node)
call_kwargs = mock_popen.call_args
assert call_kwargs.kwargs["stdout"] == subprocess.DEVNULL
assert call_kwargs.kwargs["stderr"] == subprocess.DEVNULL
@patch("cht.stream.ffmpeg.subprocess.Popen")
def test_pipe_stdout(self, mock_popen, tmp_path):
import ffmpeg
node = ffmpeg.input("test").output("out")
ff.run_async(node, pipe_stdout=True)
call_kwargs = mock_popen.call_args
assert call_kwargs.kwargs["stdout"] == subprocess.PIPE
@patch("cht.stream.ffmpeg.subprocess.Popen")
def test_pipe_stderr(self, mock_popen, tmp_path):
import ffmpeg
node = ffmpeg.input("test").output("out")
ff.run_async(node, pipe_stderr=True)
call_kwargs = mock_popen.call_args
assert call_kwargs.kwargs["stderr"] == subprocess.PIPE
class TestRunSync:
@patch("cht.stream.ffmpeg.subprocess.run")
def test_returns_tuple(self, mock_run):
mock_run.return_value = MagicMock(stdout="hello", stderr="world")
import ffmpeg
node = ffmpeg.input("test").output("out")
stdout, stderr = ff.run_sync(node)
assert stdout == "hello"
assert stderr == "world"
@patch("cht.stream.ffmpeg.subprocess.run")
def test_passes_timeout(self, mock_run):
mock_run.return_value = MagicMock(stdout="", stderr="")
import ffmpeg
node = ffmpeg.input("test").output("out")
ff.run_sync(node, timeout=30)
assert mock_run.call_args.kwargs["timeout"] == 30
class TestStopProc:
def test_sends_sigint_then_waits(self):
proc = MagicMock()
proc.poll.return_value = None
ff.stop_proc(proc, timeout=3)
proc.send_signal.assert_called_once_with(signal.SIGINT)
proc.wait.assert_called_once_with(timeout=3)
def test_kills_on_timeout(self):
proc = MagicMock()
proc.poll.return_value = None
proc.wait.side_effect = subprocess.TimeoutExpired("ffmpeg", 3)
ff.stop_proc(proc, timeout=3)
proc.kill.assert_called_once()
def test_noop_if_already_exited(self):
proc = MagicMock()
proc.poll.return_value = 0
ff.stop_proc(proc)
proc.send_signal.assert_not_called()
def test_noop_if_none(self):
ff.stop_proc(None) # should not raise