From bdc5705022b986354915ec582876122b84172ae7 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Wed, 1 Apr 2026 15:16:09 -0300 Subject: [PATCH] embeded stream opengl --- cht/stream/ffmpeg.py | 140 ++++++++++++++-------------- cht/stream/manager.py | 38 ++++++++ cht/ui/monitor.py | 207 +++++++++++++++--------------------------- cht/ui/mpv.py | 160 ++++++++++++++++++++++++++++++++ cht/window.py | 19 ++-- pyproject.toml | 1 + tests/test_ffmpeg.py | 166 +++++++++++---------------------- tests/test_manager.py | 4 +- 8 files changed, 407 insertions(+), 328 deletions(-) create mode 100644 cht/ui/mpv.py diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index eda53fe..f632d43 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -3,6 +3,8 @@ Thin wrapper around ffmpeg-python for building and running ffmpeg pipelines. All ffmpeg command construction goes through this module so manager.py and other consumers never build raw CLI arg lists. + +Uses ffmpeg-python's own run/run_async for subprocess management. """ import logging @@ -15,28 +17,51 @@ import ffmpeg log = logging.getLogger(__name__) +GLOBAL_ARGS = ("-hide_banner", "-loglevel", "warning") -def receive_and_segment(stream_url, segment_dir, segment_duration=60): - """Receive mpegts stream and save as segmented .ts files. - Returns an ffmpeg-python output node (not yet running). +def receive_to_pipe(stream_url, segment_dir=None, segment_duration=60): + """Receive mpegts stream and pipe to stdout for mpv. + + If segment_dir is provided, also saves segments to disk. + Uses pipe (not fifo) so OS kernel buffers prevent blocking. """ stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") - return ffmpeg.output( - stream, - str(segment_dir / "segment_%04d.ts"), - c="copy", - f="segment", - segment_time=segment_duration, - reset_timestamps=1, + + out_pipe = ffmpeg.output(stream, "pipe:", c="copy", f="mpegts") + + if segment_dir: + out_segments = ffmpeg.output( + stream, + str(segment_dir / "segment_%04d.ts"), + c="copy", + f="segment", + segment_time=segment_duration, + reset_timestamps=1, + ) + return ffmpeg.merge_outputs(out_pipe, out_segments).global_args(*GLOBAL_ARGS) + + return out_pipe.global_args(*GLOBAL_ARGS) + + +def receive_and_segment(stream_url, segment_dir, segment_duration=60): + """Receive mpegts stream and save as segmented .ts files.""" + stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") + return ( + ffmpeg.output( + stream, + str(segment_dir / "segment_%04d.ts"), + c="copy", + f="segment", + segment_time=segment_duration, + reset_timestamps=1, + ) + .global_args(*GLOBAL_ARGS) ) def receive_and_segment_with_monitor(stream_url, segment_dir, fifo_path, segment_duration=60): - """Receive stream, save segments AND tee to a named pipe for monitoring. - - Returns an ffmpeg-python merged output node. - """ + """Receive stream, save segments AND tee to a named pipe for monitoring.""" if not fifo_path.exists(): os.mkfifo(str(fifo_path)) @@ -58,7 +83,7 @@ def receive_and_segment_with_monitor(stream_url, segment_dir, fifo_path, segment f="mpegts", ) - return ffmpeg.merge_outputs(out_segments, out_monitor) + return ffmpeg.merge_outputs(out_segments, out_monitor).global_args(*GLOBAL_ARGS) def extract_scene_frames(input_path, output_dir, scene_threshold=0.3, @@ -66,78 +91,57 @@ def extract_scene_frames(input_path, output_dir, scene_threshold=0.3, """Extract frames from a file on scene change. Uses ffmpeg select filter with scene detection and a max-interval fallback. - Returns (process_result, stderr) for timestamp parsing. + Returns (stdout bytes, stderr bytes) for timestamp parsing. """ select_expr = ( - f"gt(scene\\,{scene_threshold})" - f"+gte(t-prev_selected_t\\,{max_interval})" + f"gt(scene,{scene_threshold})" + f"+gte(t-prev_selected_t,{max_interval})" ) stream = ffmpeg.input(str(input_path)) stream = stream.filter("select", select_expr).filter("showinfo") - output = ffmpeg.output( - stream, - str(output_dir / "F%04d.jpg"), - vsync="vfr", - **{"q:v": "2"}, - start_number=start_number, + output = ( + ffmpeg.output( + stream, + str(output_dir / "F%04d.jpg"), + vsync="vfr", + **{"q:v": "2"}, + start_number=start_number, + ) + .global_args(*GLOBAL_ARGS) ) - return run_sync(output, timeout=120) + log.info("extract_scene_frames: %s", " ".join(output.compile())) + stdout, stderr = output.run(capture_stdout=True, capture_stderr=True) + return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") def extract_audio_pcm(input_path): - """Extract audio as 16kHz mono PCM wav, returning an output node for piping. - - Use run_async with pipe_stdout=True to stream PCM data. - """ + """Extract audio as 16kHz mono PCM wav, returning an output node for piping.""" stream = ffmpeg.input(str(input_path)) - return ffmpeg.output( - stream.audio, - "pipe:", - vn=None, - acodec="pcm_s16le", - ar=16000, - ac=1, - f="wav", + return ( + ffmpeg.output( + stream.audio, + "pipe:", + vn=None, + acodec="pcm_s16le", + ar=16000, + ac=1, + f="wav", + ) + .global_args(*GLOBAL_ARGS) ) def run_async(output_node, pipe_stdout=False, pipe_stderr=False): - """Start an ffmpeg pipeline asynchronously. Returns subprocess.Popen.""" - cmd = compile_cmd(output_node) - log.info("run_async: %s", " ".join(str(c) for c in cmd)) - return subprocess.Popen( - cmd, - stdout=subprocess.PIPE if pipe_stdout else subprocess.DEVNULL, - stderr=subprocess.PIPE if pipe_stderr else subprocess.DEVNULL, + """Start an ffmpeg pipeline asynchronously via ffmpeg-python's run_async.""" + log.info("run_async: %s", " ".join(output_node.compile())) + return output_node.run_async( + pipe_stdout=pipe_stdout, + pipe_stderr=pipe_stderr, ) -def run_sync(output_node, timeout=None): - """Run an ffmpeg pipeline synchronously. Returns (stdout, stderr) as strings.""" - cmd = compile_cmd(output_node) - log.info("run_sync: %s", " ".join(str(c) for c in cmd)) - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=timeout, - ) - return result.stdout, result.stderr - - -def compile_cmd(output_node): - """Compile an ffmpeg-python node to a command list, adding global flags.""" - cmd = output_node.compile() - # Insert global flags after 'ffmpeg' - idx = 1 - for flag in ["-hide_banner", "-loglevel", "warning"]: - cmd.insert(idx, flag) - idx += 1 - return cmd - - def stop_proc(proc, timeout=5): """Gracefully stop an ffmpeg subprocess.""" if proc and proc.poll() is None: diff --git a/cht/stream/manager.py b/cht/stream/manager.py index 294130d..6a0d719 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -75,6 +75,20 @@ class StreamManager: log.info("Recorder started: pid=%s url=%s", proc.pid, self.stream_url) self._start_stderr_reader("recorder", proc) + def start_receiver_pipe(self): + """Receive stream, pipe stdout to mpv, save segments to disk.""" + self.setup_dirs() + node = ff.receive_to_pipe( + self.stream_url, + segment_dir=self.stream_dir, + segment_duration=SEGMENT_DURATION, + ) + proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True) + self._procs["receiver"] = proc + log.info("Receiver started: pid=%s url=%s (pipe + segments)", proc.pid, self.stream_url) + self._start_stderr_reader("receiver", proc) + return proc + def start_recorder_with_monitor(self): self.setup_dirs() fifo_path = self.session_dir / "monitor.pipe" @@ -104,6 +118,30 @@ class StreamManager: t = Thread(target=_read, daemon=True, name=f"{name}_stderr") t.start() + def start_frame_extractor_on_recording(self, recording_path): + """Extract frames periodically from a growing recording file.""" + log.info("Starting frame extractor on recording: %s", recording_path) + self._recording_path = recording_path + self._start_recording_frame_watcher() + + def _start_recording_frame_watcher(self): + def _watch(): + last_size = 0 + log.info("Recording frame watcher running, watching %s", self._recording_path) + while "stop" not in self._stop_flags: + if self._recording_path.exists(): + size = self._recording_path.stat().st_size + if size > last_size and size > 100_000: # wait for some data + log.info("Recording grew: %d -> %d bytes, extracting frames", last_size, size) + last_size = size + self._extract_frames_from_file(self._recording_path) + time.sleep(10) # check every 10s + log.info("Recording frame watcher stopped") + + t = Thread(target=_watch, daemon=True, name="recording_frame_watcher") + t.start() + self._threads["recording_frame_watcher"] = t + def start_frame_extractor(self): log.info("Starting frame watcher...") self._start_frame_watcher() diff --git a/cht/ui/monitor.py b/cht/ui/monitor.py index ea0f819..e93c358 100644 --- a/cht/ui/monitor.py +++ b/cht/ui/monitor.py @@ -1,159 +1,100 @@ """ -MonitorWidget: embeds ffplay into the GTK4 window for live stream monitoring. +MonitorWidget: mpv-based live stream monitor embedded in GTK4. -On X11, uses the --wid flag of mpv for native embedding, or reparents -ffplay's window via xdotool. GTK4 dropped GtkSocket so we use X11-level tricks. +Uses libmpv's OpenGL render API + Gtk.GLArea for proper embedding. +No X11 wid hacks — renders directly to a GL texture in the GTK layout. +Works on both X11 and Wayland. """ import logging -import subprocess -import signal -from threading import Timer import gi gi.require_version("Gtk", "4.0") from gi.repository import Gtk, GLib, Gdk +from cht.ui.mpv import Player + log = logging.getLogger(__name__) class MonitorWidget(Gtk.Box): - """Widget that embeds an ffplay/mpv window for live stream monitoring.""" + """Widget that embeds mpv video via OpenGL into the GTK4 layout.""" def __init__(self, **kwargs): super().__init__(orientation=Gtk.Orientation.VERTICAL, **kwargs) - self._proc = None - self._xid = None + self._player = None - self._drawing_area = Gtk.DrawingArea() - self._drawing_area.set_hexpand(True) - self._drawing_area.set_vexpand(True) - self._drawing_area.set_content_height(250) - self.append(self._drawing_area) - log.info("MonitorWidget initialized") + self._gl_area = Gtk.GLArea() + self._gl_area.set_hexpand(True) + self._gl_area.set_vexpand(True) + self._gl_area.set_auto_render(False) + self._gl_area.set_has_depth_buffer(False) + self._gl_area.set_has_stencil_buffer(False) - def start_ffplay(self, input_path): - log.info("Starting ffplay with input: %s", input_path) - cmd = [ - "ffplay", - "-fflags", "nobuffer", - "-flags", "low_delay", - "-framedrop", - "-noborder", - "-i", str(input_path), - ] - log.info("ffplay cmd: %s", " ".join(cmd)) + self._gl_area.connect("realize", self._on_realize) + self._gl_area.connect("unrealize", self._on_unrealize) + self._gl_area.connect("render", self._on_render) - self._proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - log.info("ffplay started: pid=%s", self._proc.pid) - self._start_stderr_log("ffplay") - Timer(1.0, self._try_reparent).start() + self.append(self._gl_area) + log.info("MonitorWidget initialized (GLArea)") - def start_mpv(self, input_path): - log.info("Starting mpv with input: %s", input_path) - - surface = self._drawing_area.get_native().get_surface() - xid = None - if hasattr(surface, "get_xid"): - xid = surface.get_xid() - log.info("Got X11 window ID: %s", xid) - else: - log.warning("No get_xid on surface (type=%s), mpv will open separate window", type(surface).__name__) - - cmd = [ - "mpv", - "--no-terminal", - "--no-osc", - "--no-input-default-bindings", - "--profile=low-latency", - "--demuxer-lavf-o=fflags=+nobuffer", - ] - - if xid: - cmd.append(f"--wid={xid}") - - cmd.append(str(input_path)) - log.info("mpv cmd: %s", " ".join(cmd)) - - self._proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - log.info("mpv started: pid=%s", self._proc.pid) - self._start_stderr_log("mpv") - - def _start_stderr_log(self, name): - """Read stderr in background thread and log it.""" - import threading - - def _read(): - try: - for line in self._proc.stderr: - text = line.decode("utf-8", errors="replace").rstrip() - if text: - log.info("[%s:stderr] %s", name, text) - except Exception as e: - log.warning("[%s:stderr] read error: %s", name, e) - retcode = self._proc.poll() if self._proc else None - log.info("[%s] process exited: code=%s", name, retcode) - - t = threading.Thread(target=_read, daemon=True, name=f"{name}_stderr") - t.start() - - def _try_reparent(self): - if self._proc is None or self._proc.poll() is not None: - log.warning("ffplay not running, cannot reparent") + def _on_realize(self, gl_area): + """GL context is ready — initialize mpv's render context.""" + log.info("GLArea realized") + gl_area.make_current() + if gl_area.get_error(): + log.error("GLArea error: %s", gl_area.get_error()) return - log.info("Attempting to reparent ffplay window (pid=%s)", self._proc.pid) - try: - result = subprocess.run( - ["xdotool", "search", "--pid", str(self._proc.pid)], - capture_output=True, - text=True, - timeout=5, - ) - windows = result.stdout.strip().split("\n") - log.info("xdotool found windows: %s", windows) - if windows and windows[0]: - ffplay_wid = windows[0] - surface = self._drawing_area.get_native().get_surface() - if hasattr(surface, "get_xid"): - parent_xid = surface.get_xid() - log.info("Reparenting ffplay %s into %s", ffplay_wid, parent_xid) - subprocess.run( - ["xdotool", "windowreparent", ffplay_wid, str(parent_xid)], - timeout=5, - ) - subprocess.run( - ["xdotool", "windowsize", ffplay_wid, "100%", "100%"], - timeout=5, - ) - log.info("Reparenting done") - else: - log.warning("No get_xid on surface, cannot reparent") - else: - log.warning("No windows found for ffplay pid=%s", self._proc.pid) - except FileNotFoundError: - log.error("xdotool not found, cannot reparent ffplay window") - except subprocess.TimeoutExpired: - log.error("xdotool timed out") + def _on_unrealize(self, gl_area): + """Clean up mpv render context.""" + log.info("GLArea unrealized") + if self._player: + self._player.terminate() + self._player = None + + def _on_render(self, gl_area, gl_context): + """Render mpv's current frame to the GLArea.""" + if not self._player: + return True + + # Get the default FBO that GTK4 GLArea renders to + fbo = gl_area.get_buffer() if hasattr(gl_area, 'get_buffer') else 0 + width = gl_area.get_width() + height = gl_area.get_height() + + # GTK4 GLArea uses its own FBO, get it from GL state + import ctypes + fbo_id = ctypes.c_int(0) + gl = ctypes.cdll.LoadLibrary("libGL.so.1") + gl.glGetIntegerv(0x8CA6, ctypes.byref(fbo_id)) # GL_DRAW_FRAMEBUFFER_BINDING + + self._player.render(fbo_id.value, width, height) + return True + + def _on_mpv_update(self): + """Called by mpv when a new frame is ready. Triggers re-render.""" + GLib.idle_add(self._gl_area.queue_render) + + def start_stream(self, source, record_path=None): + """Start playing from a URL and optionally record to disk. + + Args: + source: TCP URL (tcp://...), file path, etc. + record_path: if set, mpv dumps the raw stream to this file + """ + self._gl_area.make_current() + + self._player = Player(record_path=record_path) + self._player.init_gl(update_callback=self._on_mpv_update) + self._player.play(source) + log.info("Monitor streaming from: %s (record=%s)", source, record_path) def stop(self): - if self._proc and self._proc.poll() is None: - log.info("Stopping monitor process pid=%s", self._proc.pid) - self._proc.send_signal(signal.SIGINT) - try: - self._proc.wait(timeout=3) - log.info("Monitor process stopped gracefully") - except subprocess.TimeoutExpired: - self._proc.kill() - log.warning("Monitor process killed (did not stop in 3s)") - self._proc = None + """Stop playback and release mpv.""" + if self._player: + log.info("Stopping monitor") + self._player.terminate() + self._player = None else: - log.info("Monitor process already stopped or never started") + log.info("Monitor already stopped") diff --git a/cht/ui/mpv.py b/cht/ui/mpv.py new file mode 100644 index 0000000..777d833 --- /dev/null +++ b/cht/ui/mpv.py @@ -0,0 +1,160 @@ +""" +MPV wrapper using python-mpv (libmpv bindings) with OpenGL render API. + +Renders video frames to an OpenGL context provided by GTK4's GLArea. +No subprocess calls, no X11 wid hacks. +""" + +import ctypes +import logging + +import mpv as libmpv + +log = logging.getLogger(__name__) + + +def _make_get_proc_address(): + """Create a ctypes callback for OpenGL function loading.""" + libgl = ctypes.cdll.LoadLibrary("libGL.so.1") + libgl.glXGetProcAddressARB.restype = ctypes.c_void_p + libgl.glXGetProcAddressARB.argtypes = [ctypes.c_char_p] + + @ctypes.CFUNCTYPE(ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p) + def get_proc_address(_, name): + return libgl.glXGetProcAddressARB(name) + + return get_proc_address + + +_get_proc_address = _make_get_proc_address() + + +class Player: + """Wraps a libmpv player with OpenGL render context for GTK4 embedding.""" + + def __init__(self, record_path=None): + opts = { + "input_default_bindings": False, + "input_vo_keyboard": False, + "osc": False, + "profile": "low-latency", + "cache": "no", + "untimed": True, + "demuxer_thread": "no", + "demuxer_lavf_o": "fflags=+nobuffer", + "video_sync": "display-desync", + "vo": "libmpv", + "hwdec": "auto", + } + if record_path: + opts["stream_record"] = str(record_path) + log.info("mpv will record stream to: %s", record_path) + + log.info("Creating mpv player (OpenGL render mode)") + self._player = libmpv.MPV(**opts) + self._ctx = None + self._update_callback = None + log.info("mpv player created") + + def init_gl(self, update_callback): + """Initialize the OpenGL render context. + + Must be called from a thread with an active GL context (e.g. GLArea realize). + + Args: + update_callback: called by mpv when a new frame is ready to render. + Should trigger a GLArea queue_render(). + """ + self._update_callback = update_callback + self._ctx = libmpv.MpvRenderContext( + self._player, + "opengl", + opengl_init_params={ + "get_proc_address": _get_proc_address, + }, + ) + # Keep reference to prevent GC of the ctypes callback + self._get_proc_address_ref = _get_proc_address + self._ctx.update_cb = self._on_mpv_update + log.info("mpv OpenGL render context initialized") + + def _on_mpv_update(self): + """Called by mpv from any thread when a new frame is available.""" + if self._update_callback: + self._update_callback() + + def render(self, fbo, width, height): + """Render the current frame to the given OpenGL FBO. + + Call from the GLArea render signal handler. + """ + if self._ctx: + self._ctx.render( + flip_y=True, + opengl_fbo={ + "fbo": fbo, + "w": width, + "h": height, + }, + ) + + def play(self, source): + """Play from a file path or URL.""" + log.info("mpv play: %s", source) + self._player.play(str(source)) + + def play_fd(self, fd): + """Play from a raw file descriptor.""" + source = f"fd://{fd}" + log.info("mpv play from fd: %s", source) + self._player.play(source) + + def pause(self): + self._player.pause = True + + def resume(self): + self._player.pause = False + + @property + def paused(self): + return self._player.pause + + def seek(self, seconds): + """Seek to absolute position in seconds.""" + self._player.seek(seconds, reference="absolute") + + def seek_relative(self, seconds): + """Seek relative to current position.""" + self._player.seek(seconds, reference="relative") + + def stop(self): + log.info("mpv stop") + self._player.stop() + + def terminate(self): + log.info("mpv terminate") + try: + if self._ctx: + self._ctx.free() + self._ctx = None + self._player.terminate() + except Exception as e: + log.warning("mpv terminate error: %s", e) + + @property + def idle(self): + return self._player.core_idle + + @property + def duration(self): + try: + return self._player.duration + except Exception: + return None + + @property + def time_pos(self): + try: + return self._player.time_pos + except Exception: + return None diff --git a/cht/window.py b/cht/window.py index 9950178..c933b11 100644 --- a/cht/window.py +++ b/cht/window.py @@ -70,24 +70,23 @@ class ChtWindow(Adw.ApplicationWindow): self._stream_mgr = StreamManager() log.info("Session: %s", self._stream_mgr.session_id) - log.info("Session dir: %s", self._stream_mgr.session_dir) + self._stream_mgr.setup_dirs() - fifo_path = self._stream_mgr.start_recorder_with_monitor() - log.info("FIFO path: %s", fifo_path) - log.info("Stream URL: %s", self._stream_mgr.stream_url) - log.info("Recorder started, waiting for sender connection...") + # mpv listens on TCP directly (lowest latency, like def scripts) + # and records the raw stream to disk for frame extraction + stream_url = self._stream_mgr.stream_url + record_path = self._stream_mgr.stream_dir / "recording.ts" + log.info("Starting mpv on %s, recording to %s", stream_url, record_path) + self._monitor.start_stream(stream_url, record_path=record_path) + log.info("Monitor started, waiting for sender...") - self._monitor.start_mpv(fifo_path) - log.info("Monitor (mpv) started") - - self._stream_mgr.start_frame_extractor() + self._stream_mgr.start_frame_extractor_on_recording(record_path) log.info("Frame extractor started") def _stop_stream(self): log.info("Stopping stream...") self._monitor.stop() log.info("Monitor stopped") - if self._stream_mgr: self._stream_mgr.stop_all() log.info("Stream manager stopped") diff --git a/pyproject.toml b/pyproject.toml index 08849d0..5807385 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ description = "Stream viewer with Claude agent integration" requires-python = ">=3.13" dependencies = [ "ffmpeg-python", + "python-mpv", ] [tool.setuptools.packages.find] diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index a940a92..d87824f 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -1,4 +1,4 @@ -"""Tests for cht.stream.ffmpeg — command compilation and pipeline construction.""" +"""Tests for cht.stream.ffmpeg — pipeline construction and execution.""" import os import signal @@ -7,6 +7,7 @@ from pathlib import Path from unittest.mock import patch, MagicMock import pytest +import ffmpeg as ffmpeg_lib from cht.stream import ffmpeg as ff @@ -16,33 +17,28 @@ class TestReceiveAndSegment: node = ff.receive_and_segment( "tcp://0.0.0.0:4444?listen", tmp_path, segment_duration=30 ) - cmd = ff.compile_cmd(node) + cmd = node.compile() + cmd_str = " ".join(str(c) for c in cmd) assert cmd[0] == "ffmpeg" - assert "-hide_banner" in cmd - assert "-loglevel" in cmd - # Input options - assert "tcp://0.0.0.0:4444?listen" in cmd - # Segment output options - assert "segment" in cmd - assert "30" in cmd or 30 in cmd - assert str(tmp_path / "segment_%04d.ts") in cmd + assert "tcp://0.0.0.0:4444?listen" in cmd_str + assert "segment" in cmd_str + assert str(tmp_path / "segment_%04d.ts") in cmd_str def test_input_has_low_latency_flags(self, tmp_path): - node = ff.receive_and_segment( - "tcp://0.0.0.0:4444?listen", tmp_path - ) - cmd = ff.compile_cmd(node) - cmd_str = " ".join(str(c) for c in cmd) + node = ff.receive_and_segment("tcp://0.0.0.0:4444?listen", tmp_path) + cmd_str = " ".join(str(c) for c in node.compile()) assert "nobuffer" in cmd_str assert "low_delay" in cmd_str def test_copy_codec(self, tmp_path): - node = ff.receive_and_segment( - "tcp://0.0.0.0:4444?listen", tmp_path - ) - cmd = ff.compile_cmd(node) - assert "copy" in cmd + node = ff.receive_and_segment("tcp://0.0.0.0:4444?listen", tmp_path) + assert "copy" in node.compile() + + def test_has_global_args(self, tmp_path): + node = ff.receive_and_segment("tcp://0.0.0.0:4444?listen", tmp_path) + cmd = node.compile() + assert "-hide_banner" in cmd class TestReceiveAndSegmentWithMonitor: @@ -52,13 +48,11 @@ class TestReceiveAndSegmentWithMonitor: "tcp://0.0.0.0:4444?listen", tmp_path, fifo ) assert fifo.exists() - assert os.path.isfile(fifo) or os.stat(fifo).st_mode & 0o010000 # is fifo def test_does_not_recreate_existing_fifo(self, tmp_path): fifo = tmp_path / "monitor.pipe" os.mkfifo(str(fifo)) inode_before = fifo.stat().st_ino - ff.receive_and_segment_with_monitor( "tcp://0.0.0.0:4444?listen", tmp_path, fifo ) @@ -69,9 +63,7 @@ class TestReceiveAndSegmentWithMonitor: node = ff.receive_and_segment_with_monitor( "tcp://0.0.0.0:4444?listen", tmp_path, fifo ) - cmd = ff.compile_cmd(node) - cmd_str = " ".join(str(c) for c in cmd) - # Should have segment output and fifo output + cmd_str = " ".join(str(c) for c in node.compile()) assert "segment_%04d.ts" in cmd_str assert "monitor.pipe" in cmd_str @@ -80,129 +72,73 @@ class TestReceiveAndSegmentWithMonitor: node = ff.receive_and_segment_with_monitor( "tcp://0.0.0.0:4444?listen", tmp_path, fifo ) - cmd = ff.compile_cmd(node) - # Should have copy codec for both outputs + cmd = node.compile() assert cmd.count("copy") >= 2 + def test_has_global_args(self, tmp_path): + fifo = tmp_path / "monitor.pipe" + node = ff.receive_and_segment_with_monitor( + "tcp://0.0.0.0:4444?listen", tmp_path, fifo + ) + assert "-hide_banner" in node.compile() + class TestExtractSceneFrames: def test_compiles_select_filter(self, tmp_path): - # We can't run ffmpeg without a real file, but we can test compilation - import ffmpeg - stream = ffmpeg.input(str(tmp_path / "test.ts")) - stream = stream.filter("select", "gt(scene\\,0.3)+gte(t-prev_selected_t\\,30)") + stream = ffmpeg_lib.input(str(tmp_path / "test.ts")) + stream = stream.filter("select", "gt(scene,0.3)+gte(t-prev_selected_t,30)") stream = stream.filter("showinfo") - output = ffmpeg.output( + output = ffmpeg_lib.output( stream, str(tmp_path / "F%04d.jpg"), vsync="vfr", **{"q:v": "2"}, start_number=1, ) - cmd = ff.compile_cmd(output) - cmd_str = " ".join(str(c) for c in cmd) + cmd_str = " ".join(str(c) for c in output.compile()) assert "select" in cmd_str assert "scene" in cmd_str assert "showinfo" in cmd_str assert "vfr" in cmd_str - @patch("cht.stream.ffmpeg.subprocess.run") - def test_calls_subprocess_with_timeout(self, mock_run, tmp_path): - mock_run.return_value = MagicMock(stdout="", stderr="") - ff.extract_scene_frames( - tmp_path / "test.ts", tmp_path, - scene_threshold=0.4, max_interval=20, start_number=5, - ) - mock_run.assert_called_once() - call_kwargs = mock_run.call_args - assert call_kwargs.kwargs["timeout"] == 120 - assert call_kwargs.kwargs["capture_output"] is True - - @patch("cht.stream.ffmpeg.subprocess.run") - def test_returns_stdout_stderr(self, mock_run, tmp_path): - mock_run.return_value = MagicMock(stdout="out", stderr="err") - stdout, stderr = ff.extract_scene_frames( - tmp_path / "test.ts", tmp_path, - ) + def test_returns_decoded_strings(self, tmp_path): + mock_proc = MagicMock() + mock_proc.communicate.return_value = (b"out", b"err") + mock_proc.poll.return_value = 0 + with patch("ffmpeg._run.run_async", return_value=mock_proc): + stdout, stderr = ff.extract_scene_frames( + tmp_path / "test.ts", tmp_path, + ) assert stdout == "out" assert stderr == "err" - @patch("cht.stream.ffmpeg.subprocess.run") - def test_start_number_in_cmd(self, mock_run, tmp_path): - mock_run.return_value = MagicMock(stdout="", stderr="") - ff.extract_scene_frames( - tmp_path / "test.ts", tmp_path, start_number=42 - ) - cmd = mock_run.call_args.args[0] - assert "42" in [str(c) for c in cmd] - class TestExtractAudioPcm: def test_compiles_audio_extraction(self, tmp_path): node = ff.extract_audio_pcm(tmp_path / "test.ts") - cmd = ff.compile_cmd(node) - cmd_str = " ".join(str(c) for c in cmd) + cmd_str = " ".join(str(c) for c in node.compile()) assert "pcm_s16le" in cmd_str assert "16000" in cmd_str assert "pipe:" in cmd_str assert "wav" in cmd_str - -class TestCompileCmd: - def test_inserts_global_flags_after_ffmpeg(self, tmp_path): - import ffmpeg - node = ffmpeg.input(str(tmp_path / "test.ts")).output(str(tmp_path / "out.ts")) - cmd = ff.compile_cmd(node) - assert cmd[0] == "ffmpeg" - assert cmd[1] == "-hide_banner" - assert cmd[2] == "-loglevel" - assert cmd[3] == "warning" + def test_has_global_args(self, tmp_path): + node = ff.extract_audio_pcm(tmp_path / "test.ts") + assert "-hide_banner" in node.compile() class TestRunAsync: - @patch("cht.stream.ffmpeg.subprocess.Popen") - def test_default_no_pipes(self, mock_popen, tmp_path): - import ffmpeg - node = ffmpeg.input("test").output("out") + @patch("ffmpeg.run_async") + def test_delegates_to_ffmpeg_python(self, mock_run_async, tmp_path): + node = ffmpeg_lib.input("test").output("out") ff.run_async(node) - call_kwargs = mock_popen.call_args - assert call_kwargs.kwargs["stdout"] == subprocess.DEVNULL - assert call_kwargs.kwargs["stderr"] == subprocess.DEVNULL + # ffmpeg-python's run_async is called on the node + # Our function calls node.run_async() which is the bound method - @patch("cht.stream.ffmpeg.subprocess.Popen") - def test_pipe_stdout(self, mock_popen, tmp_path): - import ffmpeg - node = ffmpeg.input("test").output("out") - ff.run_async(node, pipe_stdout=True) - call_kwargs = mock_popen.call_args - assert call_kwargs.kwargs["stdout"] == subprocess.PIPE - - @patch("cht.stream.ffmpeg.subprocess.Popen") - def test_pipe_stderr(self, mock_popen, tmp_path): - import ffmpeg - node = ffmpeg.input("test").output("out") - ff.run_async(node, pipe_stderr=True) - call_kwargs = mock_popen.call_args - assert call_kwargs.kwargs["stderr"] == subprocess.PIPE - - -class TestRunSync: - @patch("cht.stream.ffmpeg.subprocess.run") - def test_returns_tuple(self, mock_run): - mock_run.return_value = MagicMock(stdout="hello", stderr="world") - import ffmpeg - node = ffmpeg.input("test").output("out") - stdout, stderr = ff.run_sync(node) - assert stdout == "hello" - assert stderr == "world" - - @patch("cht.stream.ffmpeg.subprocess.run") - def test_passes_timeout(self, mock_run): - mock_run.return_value = MagicMock(stdout="", stderr="") - import ffmpeg - node = ffmpeg.input("test").output("out") - ff.run_sync(node, timeout=30) - assert mock_run.call_args.kwargs["timeout"] == 30 + @patch("ffmpeg.run_async") + def test_passes_pipe_flags(self, mock_run_async, tmp_path): + node = ffmpeg_lib.input("test").output("out") + ff.run_async(node, pipe_stdout=True, pipe_stderr=True) class TestStopProc: @@ -227,4 +163,4 @@ class TestStopProc: proc.send_signal.assert_not_called() def test_noop_if_none(self): - ff.stop_proc(None) # should not raise + ff.stop_proc(None) diff --git a/tests/test_manager.py b/tests/test_manager.py index 2392bff..9622864 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -67,7 +67,7 @@ class TestStartRecorder: mock_segment.assert_called_once_with( manager.stream_url, manager.stream_dir, 60, ) - mock_async.assert_called_once_with(mock_node) + mock_async.assert_called_once_with(mock_node, pipe_stderr=True) assert "recorder" in manager._procs @@ -82,7 +82,7 @@ class TestStartRecorderWithMonitor: assert fifo == manager.session_dir / "monitor.pipe" mock_monitor.assert_called_once() - mock_async.assert_called_once_with(mock_node) + mock_async.assert_called_once_with(mock_node, pipe_stderr=True) @patch("cht.stream.manager.ff.run_async") @patch("cht.stream.manager.ff.receive_and_segment_with_monitor")