From b0bd02049a17333c1ba7fef5ca67fce810596ee9 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Fri, 3 Apr 2026 10:53:46 -0300 Subject: [PATCH] try 2 --- cht/stream/ffmpeg.py | 42 +++++++++------- cht/stream/manager.py | 109 ++++++++++++++++++++++++++---------------- 2 files changed, 93 insertions(+), 58 deletions(-) diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py index eed00da..4a951cc 100644 --- a/cht/stream/ffmpeg.py +++ b/cht/stream/ffmpeg.py @@ -64,21 +64,25 @@ def receive_record_and_relay(stream_url, output_path, relay_url): def receive_record_relay_and_detect(stream_url, output_path, relay_url, - frames_dir, scene_threshold=0.10, - start_number=1): + scene_threshold=0.10): """Single process: receive TCP → record fMP4 + relay UDP + scene detect. One ffmpeg process, three output branches from the same TCP input: - 1. File output — c=copy to fMP4 - 2. UDP relay — c=copy to mpegts for live display - 3. Scene frames — CUDA decode → select(scene) → showinfo → JPEG files + 1. File output — c=copy to fMP4 (raw packets, no decode) + 2. UDP relay — c=copy to mpegts for live display (raw packets) + 3. Scene frames — Vulkan decode + scdet_vulkan (GPU scene comparison, + sc_pass=1 drops non-scene frames on GPU) → hwdownload (only scene + frames hit CPU) → showinfo → MJPEG piped to stdout - The scene filter runs on decoded frames in-process, so detection latency - is near-zero (no polling, no file re-reading, no separate process). - Stderr must be read continuously to parse showinfo lines. + Scene frames are piped to stdout as image2pipe/mjpeg to avoid the image2 + muxer's one-frame buffering delay. The caller reads JPEG data from stdout + and writes files itself. Stderr carries showinfo lines with timestamps. + Both stdout and stderr must be read continuously. """ - stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay", - hwaccel="cuda") + stream = ffmpeg.input( + stream_url, fflags="nobuffer", flags="low_delay", + hwaccel="vulkan", hwaccel_output_format="vulkan", + ) # Copy outputs (raw packet remux, no decode) file_out = ffmpeg.output( @@ -93,13 +97,19 @@ def receive_record_relay_and_detect(stream_url, output_path, relay_url, c="copy", f="mpegts", ) - # Scene detection output (decode + filter → JPEG) - select_expr = f"gt(scene,{scene_threshold})" - scene_stream = stream.filter("select", select_expr).filter("showinfo") + # Scene detection on Vulkan GPU — only scene-change frames leave the GPU + scdet_threshold = scene_threshold * 100 # config 0-1 → scdet 0-100 + scene_stream = ( + stream + .filter("scdet_vulkan", threshold=scdet_threshold, sc_pass=1) + .filter("hwdownload") + .filter("format", "yuv420p") + .filter("showinfo") + ) scene_out = ffmpeg.output( - scene_stream, str(frames_dir / "F%04d.jpg"), - vsync="vfr", flush_packets=1, **{"q:v": "2"}, - start_number=start_number, + scene_stream, "pipe:1", + f="image2pipe", vcodec="mjpeg", + vsync="vfr", **{"q:v": "2"}, ) return ffmpeg.merge_outputs(file_out, relay_out, scene_out).global_args(*GLOBAL_ARGS) diff --git a/cht/stream/manager.py b/cht/stream/manager.py index 8d3472c..25e6629 100644 --- a/cht/stream/manager.py +++ b/cht/stream/manager.py @@ -12,6 +12,7 @@ import json import logging import re import time +from queue import Queue, Empty from threading import Thread from cht.config import ( @@ -196,14 +197,13 @@ class StreamManager: start_number = self._next_frame_number() node = ff.receive_record_relay_and_detect( self.stream_url, self.recording_path, self.relay_url, - self.frames_dir, scene_threshold=self.scene_threshold, - start_number=start_number, + scene_threshold=self.scene_threshold, ) - proc = ff.run_async(node, pipe_stderr=True) + proc = ff.run_async(node, pipe_stdout=True, pipe_stderr=True) self._procs["recorder"] = proc log.info("Recorder+scene: pid=%s → %s (threshold=%.2f, start_number=%d)", proc.pid, self.recording_path, self.scene_threshold, start_number) - self._start_scene_stderr_reader(proc, start_number) + self._start_scene_readers(proc, start_number) # -- Scene Detection -- @@ -236,16 +236,20 @@ class StreamManager: index.append(entry) index_path.write_text(json.dumps(index, indent=2)) - def _start_scene_stderr_reader(self, proc, start_number): - """Read stderr continuously, parsing showinfo lines for scene frames. + def _start_scene_readers(self, proc, start_number): + """Read scene frames from stdout (MJPEG pipe) and timestamps from stderr. - Each showinfo line corresponds to a JPEG that ffmpeg writes. We wait - briefly for the file to appear on disk (showinfo fires before the - muxer flushes), then update the index and fire the callback. + Two threads: + - stderr: parses showinfo lines, queues pts_time values + - stdout: reads JPEG frames from pipe, pairs with queued timestamps, + writes files to disk, fires callbacks immediately + + showinfo fires before the JPEG encoder, so timestamps are always + queued before the corresponding JPEG data arrives on stdout. """ - def _read(): - frame_num = start_number - offset = self.current_global_offset + ts_queue = Queue() + + def _read_stderr(): for raw in proc.stderr: line = raw.decode("utf-8", errors="replace").rstrip() if not line: @@ -254,38 +258,59 @@ class StreamManager: log.debug("[recorder] %s", line) continue pts_match = re.search(r"pts_time:\s*([\d.]+)", line) - if not pts_match: - continue - pts_time = float(pts_match.group(1)) - frame_id = f"F{frame_num:04d}" - frame_path = self.frames_dir / f"{frame_id}.jpg" - - # Wait for ffmpeg to flush the JPEG (showinfo fires before mux) - for _ in range(20): # up to ~200ms - if frame_path.exists() and frame_path.stat().st_size > 0: - break - time.sleep(0.01) - - if not frame_path.exists(): - log.warning("Scene frame %s not found on disk", frame_id) - continue - - entry = { - "id": frame_id, - "timestamp": pts_time + offset, - "path": str(frame_path), - "sent_to_agent": False, - } - self._append_frame_index(entry) - log.info("Scene frame: %s at %.1fs (pts=%.1f + offset=%.1f)", - frame_id, entry["timestamp"], pts_time, offset) - if self._on_new_frames: - self._on_new_frames([entry]) - frame_num += 1 - + if pts_match: + ts_queue.put(float(pts_match.group(1))) log.info("[recorder] stderr closed, exit=%s", proc.poll()) - Thread(target=_read, daemon=True, name="recorder_stderr").start() + def _read_stdout(): + frame_num = start_number + offset = self.current_global_offset + buf = b"" + while True: + chunk = proc.stdout.read(4096) + if not chunk: + break + buf += chunk + # Split JPEG frames by SOI (0xFFD8) and EOI (0xFFD9) markers + while True: + soi = buf.find(b"\xff\xd8") + if soi < 0: + buf = b"" + break + eoi = buf.find(b"\xff\xd9", soi + 2) + if eoi < 0: + buf = buf[soi:] # keep from SOI, need more data + break + jpeg_data = buf[soi:eoi + 2] + buf = buf[eoi + 2:] + + # Get timestamp (showinfo fires before encode, so it's queued) + try: + pts_time = ts_queue.get(timeout=2.0) + except Empty: + log.warning("No timestamp for scene frame %d", frame_num) + pts_time = 0.0 + + frame_id = f"F{frame_num:04d}" + frame_path = self.frames_dir / f"{frame_id}.jpg" + frame_path.write_bytes(jpeg_data) + + entry = { + "id": frame_id, + "timestamp": pts_time + offset, + "path": str(frame_path), + "sent_to_agent": False, + } + self._append_frame_index(entry) + log.info("Scene frame: %s at %.1fs (pts=%.1f + offset=%.1f)", + frame_id, entry["timestamp"], pts_time, offset) + if self._on_new_frames: + self._on_new_frames([entry]) + frame_num += 1 + log.info("[recorder] stdout closed") + + Thread(target=_read_stderr, daemon=True, name="recorder_stderr").start() + Thread(target=_read_stdout, daemon=True, name="recorder_stdout").start() def _probe_safe_duration(self): """Probe current recording duration via ffprobe. Returns seconds or None."""