asdfasdfawds

This commit is contained in:
2026-04-03 10:21:51 -03:00
parent 3f76670169
commit fbf9984a5d
7 changed files with 92 additions and 35 deletions

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
def def
data/ data/
bin/
*.egg-info/ *.egg-info/
__pycache__/ __pycache__/
.venv/ .venv/

View File

@@ -13,6 +13,7 @@ SESSIONS_DIR = Path(os.environ.get("CHT_SESSIONS_DIR", DATA_DIR / "sessions"))
STREAM_HOST = "0.0.0.0" STREAM_HOST = "0.0.0.0"
STREAM_PORT = 4444 STREAM_PORT = 4444
RELAY_PORT = 4445 # UDP loopback relay for live display RELAY_PORT = 4445 # UDP loopback relay for live display
SCENE_RELAY_PORT = 4446 # UDP loopback relay for scene detector
# Frame extraction — scene-only, no interval fallback # Frame extraction — scene-only, no interval fallback
SCENE_THRESHOLD = 0.10 # 0-1, lower = more sensitive; 0.1 catches slide/window changes SCENE_THRESHOLD = 0.10 # 0-1, lower = more sensitive; 0.1 catches slide/window changes

View File

@@ -36,7 +36,7 @@ def generate_proxy(segment_path: Path, output_path: Path,
""" """
output_path.parent.mkdir(parents=True, exist_ok=True) output_path.parent.mkdir(parents=True, exist_ok=True)
stream = ffmpeg_lib.input(str(segment_path)) stream = ffmpeg_lib.input(str(segment_path), hwaccel="cuda")
output = ( output = (
ffmpeg_lib.output( ffmpeg_lib.output(
stream, str(output_path), stream, str(output_path),

View File

@@ -108,7 +108,8 @@ def load_frame_index(frames_dir: Path) -> list[dict]:
return [] return []
try: try:
index = json.loads(index_path.read_text()) index = json.loads(index_path.read_text())
except (json.JSONDecodeError, IOError): except (json.JSONDecodeError, IOError) as e:
log.debug("Failed to read frame index %s: %s", index_path, e)
return [] return []
result = [] result = []
for entry in index: for entry in index:

View File

@@ -39,15 +39,15 @@ def receive_and_record(stream_url, output_path):
) )
def receive_record_and_relay(stream_url, output_path, relay_url): def receive_record_and_relay(stream_url, output_path, relay_url, scene_relay_url=None):
"""Receive TCP stream, write to fragmented MP4, and relay to UDP loopback. """Receive TCP stream, write to fragmented MP4, and relay to UDP loopback.
Fragmented MP4 (frag_keyframe+empty_moov) avoids MKV tail corruption: Fragmented MP4 (frag_keyframe+empty_moov) avoids MKV tail corruption:
each keyframe boundary closes a self-contained fragment, so the file is each keyframe boundary closes a self-contained fragment, so the file is
always valid up to the last complete fragment (~1 keyframe interval ≈ 2s). always valid up to the last complete fragment (~1 keyframe interval ≈ 2s).
This allows the scene detector to use a 2s safety margin instead of 6s.
Uses ffmpeg tee via merge_outputs: one process, identical timestamps. Uses ffmpeg tee via merge_outputs: one process, identical timestamps.
Optionally sends a second relay for the scene detector.
""" """
stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay")
file_out = ffmpeg.output( file_out = ffmpeg.output(
@@ -61,7 +61,50 @@ def receive_record_and_relay(stream_url, output_path, relay_url):
stream, relay_url, stream, relay_url,
c="copy", f="mpegts", c="copy", f="mpegts",
) )
return ffmpeg.merge_outputs(file_out, relay_out).global_args(*QUIET_ARGS) outputs = [file_out, relay_out]
if scene_relay_url:
scene_out = ffmpeg.output(
stream, scene_relay_url,
c="copy", f="mpegts",
)
outputs.append(scene_out)
return ffmpeg.merge_outputs(*outputs).global_args(*QUIET_ARGS)
def start_live_scene_detector(stream_url, output_dir, scene_threshold=0.10,
start_number=1):
"""Start a persistent ffmpeg process that detects scenes from a live stream.
Reads from the UDP relay in real-time — no file seeking, no restart overhead.
Writes frame JPEGs and emits showinfo on stderr as scenes are detected.
Returns the async process (stderr must be read continuously).
"""
select_expr = f"gt(scene,{scene_threshold})"
stream = ffmpeg.input(
stream_url,
fflags="nobuffer+flush_packets",
flags="low_delay",
probesize="32000",
analyzeduration="0",
hwaccel="cuda",
)
stream = stream.filter("select", select_expr).filter("showinfo")
output = (
ffmpeg.output(
stream,
str(output_dir / "F%04d.jpg"),
vsync="vfr",
flush_packets=1,
**{"q:v": "2"},
start_number=start_number,
)
.global_args(*GLOBAL_ARGS)
)
log.info("start_live_scene_detector: %s", " ".join(output.compile()))
return run_async(output, pipe_stderr=True)
def extract_scene_frames(input_path, output_dir, scene_threshold=0.10, def extract_scene_frames(input_path, output_dir, scene_threshold=0.10,
@@ -72,23 +115,28 @@ def extract_scene_frames(input_path, output_dir, scene_threshold=0.10,
meaningfully vs the previous frame. No periodic fallback so static content meaningfully vs the previous frame. No periodic fallback so static content
produces no spurious frames. produces no spurious frames.
Uses -ss input seeking for O(1) startup regardless of file size. start_time/duration: applied via the select filter expression (NOT as -ss/-t
pts_time in showinfo output is relative to the seek point. input options, which break scene detection on fragmented MP4).
Returns (stdout, stderr) as decoded strings for timestamp parsing. Returns (stdout, stderr) as decoded strings for timestamp parsing.
""" """
scene_expr = f"gt(scene,{scene_threshold})" scene_expr = f"gt(scene,{scene_threshold})"
# With -ss input seeking, t starts at 0 from the seek point. time_conditions = []
# Only need end boundary (duration), start is handled by -ss.
if duration is not None:
scene_expr = f"({scene_expr})*lte(t,{duration})"
input_kwargs = {}
if start_time > 0: if start_time > 0:
input_kwargs["ss"] = start_time time_conditions.append(f"gte(t,{start_time})")
if duration is not None:
time_conditions.append(f"lte(t,{start_time + duration})")
stream = ffmpeg.input(str(input_path), **input_kwargs) if time_conditions:
stream = stream.filter("select", scene_expr).filter("showinfo") time_filter = "*".join(time_conditions)
select_expr = f"({scene_expr})*{time_filter}"
else:
select_expr = scene_expr
# CUDA hardware decode — GPU does h264 parsing, frames auto-transfer
# to CPU for the scene filter. Falls back to software if unavailable.
stream = ffmpeg.input(str(input_path), hwaccel="cuda")
stream = stream.filter("select", select_expr).filter("showinfo")
output = ( output = (
ffmpeg.output( ffmpeg.output(
@@ -150,7 +198,7 @@ def extract_audio_chunk(input_path, output_path, start_time=0.0, duration=None):
def extract_frame_at(input_path, output_path, timestamp): def extract_frame_at(input_path, output_path, timestamp):
"""Extract a single frame at the given timestamp.""" """Extract a single frame at the given timestamp."""
output = ( output = (
ffmpeg.input(str(input_path), ss=timestamp) ffmpeg.input(str(input_path), ss=timestamp, hwaccel="cuda")
.output(str(output_path), vframes=1, **{"q:v": "2"}) .output(str(output_path), vframes=1, **{"q:v": "2"})
.overwrite_output() .overwrite_output()
.global_args(*QUIET_ARGS) .global_args(*QUIET_ARGS)

View File

@@ -18,6 +18,7 @@ from cht.config import (
STREAM_HOST, STREAM_HOST,
STREAM_PORT, STREAM_PORT,
RELAY_PORT, RELAY_PORT,
SCENE_RELAY_PORT,
SCENE_THRESHOLD, SCENE_THRESHOLD,
SESSIONS_DIR, SESSIONS_DIR,
AUDIO_EXTRACT_INTERVAL, AUDIO_EXTRACT_INTERVAL,
@@ -155,6 +156,10 @@ class StreamManager:
def relay_url(self): def relay_url(self):
return f"udp://127.0.0.1:{RELAY_PORT}" return f"udp://127.0.0.1:{RELAY_PORT}"
@property
def scene_relay_url(self):
return f"udp://127.0.0.1:{SCENE_RELAY_PORT}"
@property @property
def recording_path(self): def recording_path(self):
"""Current recording segment path.""" """Current recording segment path."""
@@ -193,7 +198,9 @@ class StreamManager:
return proc is not None and proc.poll() is None return proc is not None and proc.poll() is None
def _launch_recorder(self): def _launch_recorder(self):
node = ff.receive_record_and_relay(self.stream_url, self.recording_path, self.relay_url) node = ff.receive_record_and_relay(
self.stream_url, self.recording_path, self.relay_url,
)
proc = ff.run_async(node, pipe_stderr=True) proc = ff.run_async(node, pipe_stderr=True)
self._procs["recorder"] = proc self._procs["recorder"] = proc
log.info("Recorder: pid=%s%s", proc.pid, self.recording_path) log.info("Recorder: pid=%s%s", proc.pid, self.recording_path)
@@ -211,38 +218,38 @@ class StreamManager:
def _detect(): def _detect():
processed_time = 0.0 processed_time = 0.0
idle_cycles = 0
current_segment = None current_segment = None
last_threshold = self.scene_threshold
while "stop" not in self._stop_flags: while "stop" not in self._stop_flags:
# Adaptive sleep: faster at lower thresholds (more sensitive) time.sleep(1.0)
# threshold 0.01→1s base, 0.10→1s, 0.50→2s
base = max(1.0, min(2.0, self.scene_threshold * 10)) # Threshold changed — reset to re-process recent content
sleep_secs = base if idle_cycles == 0 else min(base * 2, base * (2 ** idle_cycles)) if self.scene_threshold != last_threshold:
time.sleep(sleep_secs) log.info("Threshold changed %.2f%.2f, resetting",
last_threshold, self.scene_threshold)
last_threshold = self.scene_threshold
# Back up a bit to re-scan with new sensitivity
processed_time = max(0.0, processed_time - 10)
seg = self.recording_path seg = self.recording_path
if not seg.exists(): if not seg.exists():
continue continue
# New segment started — reset per-segment progress
if seg != current_segment: if seg != current_segment:
current_segment = seg current_segment = seg
processed_time = 0.0 processed_time = 0.0
idle_cycles = 0
log.info("Scene detector: switched to %s", seg.name) log.info("Scene detector: switched to %s", seg.name)
size = seg.stat().st_size size = seg.stat().st_size
if size < 100_000: if size < 100_000:
continue continue
# Probe current segment duration directly (not total across segments)
safe_duration = self._estimate_safe_duration() safe_duration = self._estimate_safe_duration()
if safe_duration is None or safe_duration <= 0: if safe_duration is None or safe_duration <= 0:
continue continue
# 2s safety margin for incomplete tail fragments process_to = safe_duration - 1
process_to = safe_duration - 2
if process_to <= processed_time + 0.5: if process_to <= processed_time + 0.5:
continue continue
@@ -253,13 +260,10 @@ class StreamManager:
) )
if new_frames: if new_frames:
idle_cycles = 0
log.info("Found %d new scene frames (total: %d)", log.info("Found %d new scene frames (total: %d)",
len(new_frames), self._next_frame_number() - 1) len(new_frames), self._next_frame_number() - 1)
if self._on_new_frames: if self._on_new_frames:
self._on_new_frames(new_frames) self._on_new_frames(new_frames)
else:
idle_cycles += 1
processed_time = process_to processed_time = process_to
@@ -338,8 +342,7 @@ class StreamManager:
continue continue
pts_match = re.search(r"pts_time:\s*([\d.]+)", line) pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
if pts_match: if pts_match:
# pts_time is relative to -ss seek point, add start_time for local offset pts_time = float(pts_match.group(1))
pts_time = float(pts_match.group(1)) + start_time
frame_id = f"F{frame_num:04d}" frame_id = f"F{frame_num:04d}"
frame_path = self.frames_dir / f"{frame_id}.jpg" frame_path = self.frames_dir / f"{frame_id}.jpg"
if frame_path.exists(): if frame_path.exists():

View File

@@ -682,7 +682,10 @@ class ChtWindow(Adw.ApplicationWindow):
def _poll_frames(self): def _poll_frames(self):
if not self._lifecycle.stream_mgr: if not self._lifecycle.stream_mgr:
return False return False
for entry in load_frame_index(self._lifecycle.stream_mgr.frames_dir): entries = load_frame_index(self._lifecycle.stream_mgr.frames_dir)
if entries and not self._known_frames:
log.info("Poll: found %d frames, known %d", len(entries), len(self._known_frames))
for entry in entries:
fid = entry["id"] fid = entry["id"]
if fid in self._known_frames: if fid in self._known_frames:
continue continue