commit 453601c07291012738765b63cba14ed6aeddb91c Author: buenosairesam Date: Wed Apr 1 13:53:09 2026 -0300 init commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c7b2807 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +def +*.egg-info/ +__pycache__/ +.venv/ +.pytest_cache/ diff --git a/cht/__init__.py b/cht/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cht/agent/__init__.py b/cht/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cht/app.py b/cht/app.py new file mode 100644 index 0000000..3707bc7 --- /dev/null +++ b/cht/app.py @@ -0,0 +1,41 @@ +import logging +import sys +import gi + +gi.require_version("Gtk", "4.0") +gi.require_version("Adw", "1") + +from gi.repository import Gtk, Adw, Gio + +from cht.config import APP_ID, APP_NAME +from cht.window import ChtWindow + + +class ChtApp(Adw.Application): + def __init__(self): + super().__init__( + application_id=APP_ID, + flags=Gio.ApplicationFlags.DEFAULT_FLAGS, + ) + + def do_activate(self): + win = self.props.active_window + if not win: + win = ChtWindow(application=self) + win.present() + + +def main(): + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(levelname)-7s %(name)s: %(message)s", + datefmt="%H:%M:%S", + ) + log = logging.getLogger("cht") + log.info("CHT starting") + app = ChtApp() + return app.run(sys.argv) + + +if __name__ == "__main__": + main() diff --git a/cht/config.py b/cht/config.py new file mode 100644 index 0000000..1df1ab9 --- /dev/null +++ b/cht/config.py @@ -0,0 +1,19 @@ +from pathlib import Path + +APP_ID = "com.cht.StreamAgent" +APP_NAME = "CHT" + +# Default session data location +DATA_DIR = Path.home() / ".local" / "share" / "cht" +SESSIONS_DIR = DATA_DIR / "sessions" + +# Stream defaults +STREAM_HOST = "0.0.0.0" +STREAM_PORT = 4444 + +# Frame extraction +SCENE_THRESHOLD = 0.3 # 0-1, lower = more sensitive +MAX_FRAME_INTERVAL = 30 # seconds, fallback if no scene change + +# Segment recording +SEGMENT_DURATION = 60 # seconds per .ts segment diff --git a/cht/index/__init__.py b/cht/index/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cht/stream/__init__.py b/cht/stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cht/stream/ffmpeg.py b/cht/stream/ffmpeg.py new file mode 100644 index 0000000..eda53fe --- /dev/null +++ b/cht/stream/ffmpeg.py @@ -0,0 +1,148 @@ +""" +Thin wrapper around ffmpeg-python for building and running ffmpeg pipelines. + +All ffmpeg command construction goes through this module so manager.py +and other consumers never build raw CLI arg lists. +""" + +import logging +import os +import signal +import subprocess +from pathlib import Path + +import ffmpeg + +log = logging.getLogger(__name__) + + +def receive_and_segment(stream_url, segment_dir, segment_duration=60): + """Receive mpegts stream and save as segmented .ts files. + + Returns an ffmpeg-python output node (not yet running). + """ + stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") + return ffmpeg.output( + stream, + str(segment_dir / "segment_%04d.ts"), + c="copy", + f="segment", + segment_time=segment_duration, + reset_timestamps=1, + ) + + +def receive_and_segment_with_monitor(stream_url, segment_dir, fifo_path, segment_duration=60): + """Receive stream, save segments AND tee to a named pipe for monitoring. + + Returns an ffmpeg-python merged output node. + """ + if not fifo_path.exists(): + os.mkfifo(str(fifo_path)) + + stream = ffmpeg.input(stream_url, fflags="nobuffer", flags="low_delay") + + out_segments = ffmpeg.output( + stream, + str(segment_dir / "segment_%04d.ts"), + c="copy", + f="segment", + segment_time=segment_duration, + reset_timestamps=1, + ) + + out_monitor = ffmpeg.output( + stream, + str(fifo_path), + c="copy", + f="mpegts", + ) + + return ffmpeg.merge_outputs(out_segments, out_monitor) + + +def extract_scene_frames(input_path, output_dir, scene_threshold=0.3, + max_interval=30, start_number=1): + """Extract frames from a file on scene change. + + Uses ffmpeg select filter with scene detection and a max-interval fallback. + Returns (process_result, stderr) for timestamp parsing. + """ + select_expr = ( + f"gt(scene\\,{scene_threshold})" + f"+gte(t-prev_selected_t\\,{max_interval})" + ) + stream = ffmpeg.input(str(input_path)) + stream = stream.filter("select", select_expr).filter("showinfo") + + output = ffmpeg.output( + stream, + str(output_dir / "F%04d.jpg"), + vsync="vfr", + **{"q:v": "2"}, + start_number=start_number, + ) + + return run_sync(output, timeout=120) + + +def extract_audio_pcm(input_path): + """Extract audio as 16kHz mono PCM wav, returning an output node for piping. + + Use run_async with pipe_stdout=True to stream PCM data. + """ + stream = ffmpeg.input(str(input_path)) + return ffmpeg.output( + stream.audio, + "pipe:", + vn=None, + acodec="pcm_s16le", + ar=16000, + ac=1, + f="wav", + ) + + +def run_async(output_node, pipe_stdout=False, pipe_stderr=False): + """Start an ffmpeg pipeline asynchronously. Returns subprocess.Popen.""" + cmd = compile_cmd(output_node) + log.info("run_async: %s", " ".join(str(c) for c in cmd)) + return subprocess.Popen( + cmd, + stdout=subprocess.PIPE if pipe_stdout else subprocess.DEVNULL, + stderr=subprocess.PIPE if pipe_stderr else subprocess.DEVNULL, + ) + + +def run_sync(output_node, timeout=None): + """Run an ffmpeg pipeline synchronously. Returns (stdout, stderr) as strings.""" + cmd = compile_cmd(output_node) + log.info("run_sync: %s", " ".join(str(c) for c in cmd)) + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + ) + return result.stdout, result.stderr + + +def compile_cmd(output_node): + """Compile an ffmpeg-python node to a command list, adding global flags.""" + cmd = output_node.compile() + # Insert global flags after 'ffmpeg' + idx = 1 + for flag in ["-hide_banner", "-loglevel", "warning"]: + cmd.insert(idx, flag) + idx += 1 + return cmd + + +def stop_proc(proc, timeout=5): + """Gracefully stop an ffmpeg subprocess.""" + if proc and proc.poll() is None: + proc.send_signal(signal.SIGINT) + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() diff --git a/cht/stream/manager.py b/cht/stream/manager.py new file mode 100644 index 0000000..294130d --- /dev/null +++ b/cht/stream/manager.py @@ -0,0 +1,194 @@ +""" +StreamManager: orchestrates ffmpeg pipelines for receiving, recording, +frame extraction, and audio extraction from a muxed mpegts/TCP stream. + +All data goes to disk. UI reads from disk. +All ffmpeg commands go through cht.stream.ffmpeg module. +""" + +import json +import logging +import os +import re +import time +from pathlib import Path +from threading import Thread + +from cht.config import ( + STREAM_HOST, + STREAM_PORT, + SEGMENT_DURATION, + SCENE_THRESHOLD, + MAX_FRAME_INTERVAL, + SESSIONS_DIR, +) +from cht.stream import ffmpeg as ff + +log = logging.getLogger(__name__) + + +class StreamManager: + def __init__(self, session_id=None): + if session_id is None: + session_id = time.strftime("%Y%m%d_%H%M%S") + self.session_id = session_id + self.session_dir = SESSIONS_DIR / session_id + self.stream_dir = self.session_dir / "stream" + self.frames_dir = self.session_dir / "frames" + self.transcript_dir = self.session_dir / "transcript" + self.agent_dir = self.session_dir / "agent" + + self._procs = {} + self._threads = {} + self._stop_flags = set() + log.info("StreamManager created: session=%s dir=%s", session_id, self.session_dir) + + def setup_dirs(self): + for d in (self.stream_dir, self.frames_dir, self.transcript_dir, self.agent_dir): + d.mkdir(parents=True, exist_ok=True) + log.info("Session directories created") + + @property + def stream_url(self): + return f"tcp://{STREAM_HOST}:{STREAM_PORT}?listen" + + def start_all(self): + self.setup_dirs() + self.start_recorder() + self.start_frame_extractor() + + def stop_all(self): + log.info("Stopping all processes...") + self._stop_flags.add("stop") + for name, proc in self._procs.items(): + log.info("Stopping %s (pid=%s)", name, proc.pid if proc else "?") + ff.stop_proc(proc) + self._procs.clear() + log.info("All processes stopped") + + def start_recorder(self): + node = ff.receive_and_segment( + self.stream_url, self.stream_dir, SEGMENT_DURATION, + ) + proc = ff.run_async(node, pipe_stderr=True) + self._procs["recorder"] = proc + log.info("Recorder started: pid=%s url=%s", proc.pid, self.stream_url) + self._start_stderr_reader("recorder", proc) + + def start_recorder_with_monitor(self): + self.setup_dirs() + fifo_path = self.session_dir / "monitor.pipe" + + node = ff.receive_and_segment_with_monitor( + self.stream_url, self.stream_dir, fifo_path, SEGMENT_DURATION, + ) + proc = ff.run_async(node, pipe_stderr=True) + self._procs["recorder"] = proc + log.info("Recorder+monitor started: pid=%s url=%s fifo=%s", proc.pid, self.stream_url, fifo_path) + self._start_stderr_reader("recorder", proc) + return fifo_path + + def _start_stderr_reader(self, name, proc): + """Read stderr from a process in a thread and log it.""" + def _read(): + try: + for line in proc.stderr: + text = line.decode("utf-8", errors="replace").rstrip() + if text: + log.info("[%s:stderr] %s", name, text) + except Exception as e: + log.warning("[%s:stderr] read error: %s", name, e) + retcode = proc.poll() + log.info("[%s] process exited: code=%s", name, retcode) + + t = Thread(target=_read, daemon=True, name=f"{name}_stderr") + t.start() + + def start_frame_extractor(self): + log.info("Starting frame watcher...") + self._start_frame_watcher() + + def _start_frame_watcher(self): + def _watch(): + seen = set() + log.info("Frame watcher running, watching %s", self.stream_dir) + while "stop" not in self._stop_flags: + segments = sorted(self.stream_dir.glob("segment_*.ts")) + for seg in segments: + if seg.name not in seen and seg.stat().st_size > 0: + seen.add(seg.name) + log.info("New segment found: %s (%d bytes)", seg.name, seg.stat().st_size) + self._extract_frames_from_file(seg) + time.sleep(2) + log.info("Frame watcher stopped") + + t = Thread(target=_watch, daemon=True, name="frame_watcher") + t.start() + self._threads["frame_watcher"] = t + + def _extract_frames_from_file(self, segment_path): + existing = list(self.frames_dir.glob("F*.jpg")) + start_num = len(existing) + 1 + log.info("Extracting frames from %s (start_num=%d)", segment_path.name, start_num) + + try: + _stdout, stderr = ff.extract_scene_frames( + segment_path, + self.frames_dir, + scene_threshold=SCENE_THRESHOLD, + max_interval=MAX_FRAME_INTERVAL, + start_number=start_num, + ) + if stderr: + for line in stderr.splitlines()[:10]: + log.debug("[frame_extract:stderr] %s", line) + self._parse_frame_timestamps(stderr, start_num) + new_frames = list(self.frames_dir.glob("F*.jpg")) + log.info("Frame extraction done: %d new frames", len(new_frames) - len(existing)) + except Exception as e: + log.error("Frame extraction failed for %s: %s", segment_path.name, e) + + def _parse_frame_timestamps(self, stderr_output, start_num): + index_path = self.frames_dir / "index.json" + if index_path.exists(): + with open(index_path) as f: + index = json.load(f) + else: + index = [] + + frame_num = start_num + for line in stderr_output.splitlines(): + if "showinfo" not in line: + continue + pts_match = re.search(r"pts_time:\s*([\d.]+)", line) + if pts_match: + pts_time = float(pts_match.group(1)) + frame_id = f"F{frame_num:04d}" + frame_path = self.frames_dir / f"{frame_id}.jpg" + if frame_path.exists(): + index.append({ + "id": frame_id, + "timestamp": pts_time, + "path": str(frame_path), + "sent_to_agent": False, + }) + log.info("Indexed frame %s at pts=%.2f", frame_id, pts_time) + frame_num += 1 + + with open(index_path, "w") as f: + json.dump(index, f, indent=2) + + def start_audio_extractor(self): + """Will be implemented in Phase 3.""" + pass + + def get_ffplay_cmd(self): + fifo_path = self.session_dir / "monitor.pipe" + return [ + "ffplay", + "-hwaccel", "cuda", + "-fflags", "nobuffer", + "-flags", "low_delay", + "-framedrop", + "-i", str(fifo_path), + ], fifo_path diff --git a/cht/transcriber/__init__.py b/cht/transcriber/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cht/ui/__init__.py b/cht/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cht/ui/monitor.py b/cht/ui/monitor.py new file mode 100644 index 0000000..ea0f819 --- /dev/null +++ b/cht/ui/monitor.py @@ -0,0 +1,159 @@ +""" +MonitorWidget: embeds ffplay into the GTK4 window for live stream monitoring. + +On X11, uses the --wid flag of mpv for native embedding, or reparents +ffplay's window via xdotool. GTK4 dropped GtkSocket so we use X11-level tricks. +""" + +import logging +import subprocess +import signal +from threading import Timer + +import gi +gi.require_version("Gtk", "4.0") +from gi.repository import Gtk, GLib, Gdk + +log = logging.getLogger(__name__) + + +class MonitorWidget(Gtk.Box): + """Widget that embeds an ffplay/mpv window for live stream monitoring.""" + + def __init__(self, **kwargs): + super().__init__(orientation=Gtk.Orientation.VERTICAL, **kwargs) + self._proc = None + self._xid = None + + self._drawing_area = Gtk.DrawingArea() + self._drawing_area.set_hexpand(True) + self._drawing_area.set_vexpand(True) + self._drawing_area.set_content_height(250) + self.append(self._drawing_area) + log.info("MonitorWidget initialized") + + def start_ffplay(self, input_path): + log.info("Starting ffplay with input: %s", input_path) + cmd = [ + "ffplay", + "-fflags", "nobuffer", + "-flags", "low_delay", + "-framedrop", + "-noborder", + "-i", str(input_path), + ] + log.info("ffplay cmd: %s", " ".join(cmd)) + + self._proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + log.info("ffplay started: pid=%s", self._proc.pid) + self._start_stderr_log("ffplay") + Timer(1.0, self._try_reparent).start() + + def start_mpv(self, input_path): + log.info("Starting mpv with input: %s", input_path) + + surface = self._drawing_area.get_native().get_surface() + xid = None + if hasattr(surface, "get_xid"): + xid = surface.get_xid() + log.info("Got X11 window ID: %s", xid) + else: + log.warning("No get_xid on surface (type=%s), mpv will open separate window", type(surface).__name__) + + cmd = [ + "mpv", + "--no-terminal", + "--no-osc", + "--no-input-default-bindings", + "--profile=low-latency", + "--demuxer-lavf-o=fflags=+nobuffer", + ] + + if xid: + cmd.append(f"--wid={xid}") + + cmd.append(str(input_path)) + log.info("mpv cmd: %s", " ".join(cmd)) + + self._proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + log.info("mpv started: pid=%s", self._proc.pid) + self._start_stderr_log("mpv") + + def _start_stderr_log(self, name): + """Read stderr in background thread and log it.""" + import threading + + def _read(): + try: + for line in self._proc.stderr: + text = line.decode("utf-8", errors="replace").rstrip() + if text: + log.info("[%s:stderr] %s", name, text) + except Exception as e: + log.warning("[%s:stderr] read error: %s", name, e) + retcode = self._proc.poll() if self._proc else None + log.info("[%s] process exited: code=%s", name, retcode) + + t = threading.Thread(target=_read, daemon=True, name=f"{name}_stderr") + t.start() + + def _try_reparent(self): + if self._proc is None or self._proc.poll() is not None: + log.warning("ffplay not running, cannot reparent") + return + + log.info("Attempting to reparent ffplay window (pid=%s)", self._proc.pid) + try: + result = subprocess.run( + ["xdotool", "search", "--pid", str(self._proc.pid)], + capture_output=True, + text=True, + timeout=5, + ) + windows = result.stdout.strip().split("\n") + log.info("xdotool found windows: %s", windows) + if windows and windows[0]: + ffplay_wid = windows[0] + surface = self._drawing_area.get_native().get_surface() + if hasattr(surface, "get_xid"): + parent_xid = surface.get_xid() + log.info("Reparenting ffplay %s into %s", ffplay_wid, parent_xid) + subprocess.run( + ["xdotool", "windowreparent", ffplay_wid, str(parent_xid)], + timeout=5, + ) + subprocess.run( + ["xdotool", "windowsize", ffplay_wid, "100%", "100%"], + timeout=5, + ) + log.info("Reparenting done") + else: + log.warning("No get_xid on surface, cannot reparent") + else: + log.warning("No windows found for ffplay pid=%s", self._proc.pid) + except FileNotFoundError: + log.error("xdotool not found, cannot reparent ffplay window") + except subprocess.TimeoutExpired: + log.error("xdotool timed out") + + def stop(self): + if self._proc and self._proc.poll() is None: + log.info("Stopping monitor process pid=%s", self._proc.pid) + self._proc.send_signal(signal.SIGINT) + try: + self._proc.wait(timeout=3) + log.info("Monitor process stopped gracefully") + except subprocess.TimeoutExpired: + self._proc.kill() + log.warning("Monitor process killed (did not stop in 3s)") + self._proc = None + else: + log.info("Monitor process already stopped or never started") diff --git a/cht/window.py b/cht/window.py new file mode 100644 index 0000000..9950178 --- /dev/null +++ b/cht/window.py @@ -0,0 +1,292 @@ +import logging + +import gi +gi.require_version("Gtk", "4.0") +gi.require_version("Adw", "1") +from gi.repository import Gtk, Adw, GLib, Pango + +from cht.config import APP_NAME +from cht.ui.monitor import MonitorWidget +from cht.stream.manager import StreamManager + +log = logging.getLogger(__name__) + + +class ChtWindow(Adw.ApplicationWindow): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.set_title(APP_NAME) + self.set_default_size(1400, 900) + self._streaming = False + + # Main horizontal paned: agent output (left) | right panels + self._main_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) + self._main_paned.set_shrink_start_child(False) + self._main_paned.set_shrink_end_child(False) + self._main_paned.set_position(450) + + # Left: Agent output panel + self._agent_output = self._build_agent_output() + self._main_paned.set_start_child(self._agent_output) + + # Right: vertical stack of panels + right_box = self._build_right_panels() + self._main_paned.set_end_child(right_box) + + # Wrap in toolbar view with header + toolbar = Adw.ToolbarView() + header = Adw.HeaderBar() + header.set_title_widget(Gtk.Label(label=APP_NAME)) + + self._connect_btn = Gtk.Button(label="Connect") + self._connect_btn.add_css_class("suggested-action") + self._connect_btn.connect("clicked", self._on_connect_clicked) + header.pack_start(self._connect_btn) + + toolbar.add_top_bar(header) + toolbar.set_content(self._main_paned) + + self.set_content(toolbar) + + # Stream manager + self._stream_mgr = None + + # Connect window close to cleanup + self.connect("close-request", self._on_close) + log.info("Window initialized") + + def _on_connect_clicked(self, button): + if self._streaming: + self._stop_stream() + else: + self._start_stream() + + def _start_stream(self): + log.info("Starting stream...") + self._connect_btn.set_label("Disconnect") + self._connect_btn.remove_css_class("suggested-action") + self._connect_btn.add_css_class("destructive-action") + self._streaming = True + + self._stream_mgr = StreamManager() + log.info("Session: %s", self._stream_mgr.session_id) + log.info("Session dir: %s", self._stream_mgr.session_dir) + + fifo_path = self._stream_mgr.start_recorder_with_monitor() + log.info("FIFO path: %s", fifo_path) + log.info("Stream URL: %s", self._stream_mgr.stream_url) + log.info("Recorder started, waiting for sender connection...") + + self._monitor.start_mpv(fifo_path) + log.info("Monitor (mpv) started") + + self._stream_mgr.start_frame_extractor() + log.info("Frame extractor started") + + def _stop_stream(self): + log.info("Stopping stream...") + self._monitor.stop() + log.info("Monitor stopped") + + if self._stream_mgr: + self._stream_mgr.stop_all() + log.info("Stream manager stopped") + self._stream_mgr = None + + self._connect_btn.set_label("Connect") + self._connect_btn.remove_css_class("destructive-action") + self._connect_btn.add_css_class("suggested-action") + self._streaming = False + log.info("Stream stopped, ready to reconnect") + + def _on_close(self, *args): + log.info("Window closing, cleaning up...") + self._monitor.stop() + if self._stream_mgr: + self._stream_mgr.stop_all() + + def _build_agent_output(self): + """Left panel: agent output log.""" + box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) + + label = Gtk.Label(label="Agent Output") + label.add_css_class("heading") + label.set_margin_top(8) + label.set_margin_bottom(8) + box.append(label) + + self._agent_output_view = Gtk.TextView() + self._agent_output_view.set_editable(False) + self._agent_output_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) + self._agent_output_view.set_cursor_visible(False) + self._agent_output_view.set_left_margin(8) + self._agent_output_view.set_right_margin(8) + self._agent_output_view.set_top_margin(4) + self._agent_output_view.set_bottom_margin(4) + + scroll = Gtk.ScrolledWindow() + scroll.set_vexpand(True) + scroll.set_child(self._agent_output_view) + box.append(scroll) + + frame = Gtk.Frame() + frame.set_child(box) + return frame + + def _build_right_panels(self): + right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) + + top_paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) + top_paned.set_shrink_start_child(False) + top_paned.set_shrink_end_child(False) + + self._monitor = MonitorWidget() + self._monitor.set_hexpand(True) + stream_frame = Gtk.Frame() + stream_frame.set_child(self._monitor) + top_paned.set_start_child(stream_frame) + + self._waveform_area = self._build_panel("Waveform", height=250, width=200) + top_paned.set_end_child(self._waveform_area) + + top_paned.set_position(650) + right_box.append(top_paned) + + self._frames_panel = self._build_frames_panel() + right_box.append(self._frames_panel) + + self._transcript_panel = self._build_transcript_panel() + right_box.append(self._transcript_panel) + + self._agent_input = self._build_agent_input() + right_box.append(self._agent_input) + + return right_box + + def _build_panel(self, title, height=200, width=-1): + box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) + label = Gtk.Label(label=title) + label.add_css_class("heading") + label.set_margin_top(4) + label.set_margin_bottom(4) + box.append(label) + + area = Gtk.DrawingArea() + area.set_content_height(height) + if width > 0: + area.set_content_width(width) + area.set_vexpand(False) + area.set_hexpand(True) + box.append(area) + + frame = Gtk.Frame() + frame.set_child(box) + return frame + + def _build_frames_panel(self): + box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) + label = Gtk.Label(label="Frames Extracted") + label.add_css_class("heading") + label.set_margin_top(4) + label.set_margin_bottom(4) + box.append(label) + + self._frames_flow = Gtk.FlowBox() + self._frames_flow.set_orientation(Gtk.Orientation.HORIZONTAL) + self._frames_flow.set_max_children_per_line(20) + self._frames_flow.set_min_children_per_line(1) + self._frames_flow.set_selection_mode(Gtk.SelectionMode.MULTIPLE) + self._frames_flow.set_homogeneous(True) + + scroll = Gtk.ScrolledWindow() + scroll.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) + scroll.set_min_content_height(120) + scroll.set_child(self._frames_flow) + box.append(scroll) + + frame = Gtk.Frame() + frame.set_child(box) + return frame + + def _build_transcript_panel(self): + box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) + label = Gtk.Label(label="Transcript") + label.add_css_class("heading") + label.set_margin_top(4) + label.set_margin_bottom(4) + box.append(label) + + self._transcript_view = Gtk.TextView() + self._transcript_view.set_editable(False) + self._transcript_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) + self._transcript_view.set_cursor_visible(False) + self._transcript_view.set_left_margin(8) + self._transcript_view.set_right_margin(8) + + scroll = Gtk.ScrolledWindow() + scroll.set_vexpand(True) + scroll.set_min_content_height(150) + scroll.set_child(self._transcript_view) + box.append(scroll) + + frame = Gtk.Frame() + frame.set_child(box) + return frame + + def _build_agent_input(self): + box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) + box.set_margin_start(4) + box.set_margin_end(4) + box.set_margin_top(4) + box.set_margin_bottom(4) + + self._input_entry = Gtk.Entry() + self._input_entry.set_hexpand(True) + self._input_entry.set_placeholder_text("Message agent... (use @ to reference frames/transcripts)") + self._input_entry.connect("activate", self._on_input_activate) + box.append(self._input_entry) + + send_btn = Gtk.Button(label="Send") + send_btn.add_css_class("suggested-action") + send_btn.connect("clicked", self._on_send_clicked) + box.append(send_btn) + + frame = Gtk.Frame() + frame.set_child(box) + return frame + + def _on_input_activate(self, entry): + self._send_message() + + def _on_send_clicked(self, button): + self._send_message() + + def _send_message(self): + text = self._input_entry.get_text().strip() + if not text: + return + buf = self._agent_output_view.get_buffer() + end_iter = buf.get_end_iter() + buf.insert(end_iter, f"\n> {text}\n") + self._input_entry.set_text("") + + def append_agent_output(self, text): + buf = self._agent_output_view.get_buffer() + end_iter = buf.get_end_iter() + buf.insert(end_iter, text + "\n") + + def append_transcript(self, entry_id, text): + buf = self._transcript_view.get_buffer() + end_iter = buf.get_end_iter() + buf.insert(end_iter, f"[{entry_id}] {text}\n") + + def add_frame_thumbnail(self, frame_id, pixbuf): + img = Gtk.Image.new_from_pixbuf(pixbuf) + overlay = Gtk.Overlay() + overlay.set_child(img) + label = Gtk.Label(label=frame_id) + label.set_halign(Gtk.Align.START) + label.set_valign(Gtk.Align.END) + label.add_css_class("caption") + overlay.add_overlay(label) + self._frames_flow.append(overlay) diff --git a/ctrl/app.sh b/ctrl/app.sh new file mode 100755 index 0000000..3cf16a3 --- /dev/null +++ b/ctrl/app.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# Start the CHT desktop app on this machine +# Usage: ./app.sh +set -euo pipefail + +PROJECT_DIR="$(cd "$(dirname "$0")/.." && pwd)" +cd "$PROJECT_DIR" + +# Setup venv and install deps if needed +if [ ! -d .venv ]; then + uv venv --system-site-packages + uv pip install -e ".[dev]" +elif [ pyproject.toml -nt .venv/.installed ]; then + uv pip install -e ".[dev]" +fi +touch .venv/.installed + +exec .venv/bin/python -m cht.app "$@" diff --git a/ctrl/sender.sh b/ctrl/sender.sh new file mode 100755 index 0000000..639d528 --- /dev/null +++ b/ctrl/sender.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# Start the sender on this machine +# Usage: ./sender.sh RECEIVER_IP [PORT] +set -euo pipefail + +PROJECT_DIR="$(cd "$(dirname "$0")/.." && pwd)" + +exec sudo "$PROJECT_DIR/sender/stream_av.sh" "$@" diff --git a/ctrl/sync.sh b/ctrl/sync.sh new file mode 100755 index 0000000..7a458d7 --- /dev/null +++ b/ctrl/sync.sh @@ -0,0 +1,14 @@ +#!/bin/bash +# Sync project to receiver machine via rsync +# Usage: ./sync.sh [user@host] [remote_path] +set -euo pipefail + +REMOTE="${1:-mariano@mcrndeb}" +REMOTE_PATH="${2:-~/wdir/cht/}" +PROJECT_DIR="$(cd "$(dirname "$0")/.." && pwd)" + +rsync -avz --delete \ + --exclude='.git/' \ + --filter=':- .gitignore' \ + "$PROJECT_DIR/" \ + "${REMOTE}:${REMOTE_PATH}" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..08849d0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "cht" +version = "0.1.0" +description = "Stream viewer with Claude agent integration" +requires-python = ">=3.13" +dependencies = [ + "ffmpeg-python", +] + +[tool.setuptools.packages.find] +include = ["cht*"] + +[project.optional-dependencies] +dev = ["pytest"] + +[project.scripts] +cht = "cht.app:main" diff --git a/sender/stream_av.sh b/sender/stream_av.sh new file mode 100755 index 0000000..b0d7551 --- /dev/null +++ b/sender/stream_av.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# CHT Sender: muxed video + audio over TCP/mpegts +# Source: Wayland (kmsgrab) + desktop audio + webcam mic +# Usage: ./stream_av.sh [RECEIVER_IP] [PORT] +# +# Requires: sudo for kmsgrab, PulseAudio for audio capture +# Audio is non-blocking (monitor source = passive tap) + +set -uo pipefail + +RECEIVER_IP="${1:-mcrndeb}" +PORT="${2:-4444}" + +# Let root access the user's PulseAudio session +REAL_UID="${SUDO_UID:-$(id -u)}" +export PULSE_SERVER="unix:/run/user/${REAL_UID}/pulse/native" + +# Find the default sink's monitor source (desktop audio - what you hear) +MONITOR=$(PULSE_SERVER="$PULSE_SERVER" pactl info 2>/dev/null | grep "Default Sink" | awk '{print $3}').monitor +# Webcam mic - find by partial match (serial number varies) +WEBCAM_MIC=$(PULSE_SERVER="$PULSE_SERVER" pactl list short sources 2>/dev/null | grep -i "C922" | awk '{print $2}' || true) + +echo "Monitor source: $MONITOR" +echo "Webcam mic: ${WEBCAM_MIC:-not found}" +echo "Streaming to: ${RECEIVER_IP}:${PORT}" + +if [ -n "$WEBCAM_MIC" ]; then + echo "Webcam mic found, mixing desktop + mic" + # Two pulse inputs: desktop monitor + webcam mic, mixed into one audio stream + exec ffmpeg \ + -init_hw_device drm=drm:/dev/dri/card0 \ + -init_hw_device vaapi=va@drm \ + -device /dev/dri/card0 -f kmsgrab -i - \ + -f pulse -i "$MONITOR" \ + -f pulse -i "$WEBCAM_MIC" \ + -filter_complex "[1:a][2:a]amix=inputs=2:duration=longest[aout]" \ + -map 0:v -map "[aout]" \ + -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12' \ + -c:v h264_vaapi -qp 20 -g 30 -keyint_min 30 -bf 0 \ + -c:a aac -b:a 128k \ + -flush_packets 1 -fflags nobuffer -muxdelay 0 -muxpreload 0 \ + -f mpegts "tcp://${RECEIVER_IP}:${PORT}" \ + -hide_banner +else + echo "No webcam mic, desktop audio only" + exec ffmpeg \ + -init_hw_device drm=drm:/dev/dri/card0 \ + -init_hw_device vaapi=va@drm \ + -device /dev/dri/card0 -f kmsgrab -i - \ + -f pulse -i "$MONITOR" \ + -vf 'hwmap=derive_device=vaapi,scale_vaapi=w=1920:h=1080:format=nv12' \ + -c:v h264_vaapi -qp 20 -g 30 -keyint_min 30 -bf 0 \ + -c:a aac -b:a 128k \ + -flush_packets 1 -fflags nobuffer -muxdelay 0 -muxpreload 0 \ + -f mpegts "tcp://${RECEIVER_IP}:${PORT}" \ + -hide_banner +fi diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..1ff068c --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,49 @@ +"""Tests for cht.config — verify defaults are sane.""" + +from pathlib import Path + +from cht.config import ( + APP_ID, + APP_NAME, + DATA_DIR, + SESSIONS_DIR, + STREAM_HOST, + STREAM_PORT, + SCENE_THRESHOLD, + MAX_FRAME_INTERVAL, + SEGMENT_DURATION, +) + + +def test_app_id_format(): + assert "." in APP_ID + assert APP_ID.startswith("com.") + + +def test_data_dir_under_home(): + assert str(DATA_DIR).startswith(str(Path.home())) + + +def test_sessions_dir_under_data(): + assert str(SESSIONS_DIR).startswith(str(DATA_DIR)) + + +def test_stream_host_binds_all(): + assert STREAM_HOST == "0.0.0.0" + + +def test_stream_port_is_int(): + assert isinstance(STREAM_PORT, int) + assert 1024 <= STREAM_PORT <= 65535 + + +def test_scene_threshold_range(): + assert 0 < SCENE_THRESHOLD < 1 + + +def test_max_frame_interval_positive(): + assert MAX_FRAME_INTERVAL > 0 + + +def test_segment_duration_positive(): + assert SEGMENT_DURATION > 0 diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py new file mode 100644 index 0000000..a940a92 --- /dev/null +++ b/tests/test_ffmpeg.py @@ -0,0 +1,230 @@ +"""Tests for cht.stream.ffmpeg — command compilation and pipeline construction.""" + +import os +import signal +import subprocess +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +from cht.stream import ffmpeg as ff + + +class TestReceiveAndSegment: + def test_compiles_to_valid_cmd(self, tmp_path): + node = ff.receive_and_segment( + "tcp://0.0.0.0:4444?listen", tmp_path, segment_duration=30 + ) + cmd = ff.compile_cmd(node) + + assert cmd[0] == "ffmpeg" + assert "-hide_banner" in cmd + assert "-loglevel" in cmd + # Input options + assert "tcp://0.0.0.0:4444?listen" in cmd + # Segment output options + assert "segment" in cmd + assert "30" in cmd or 30 in cmd + assert str(tmp_path / "segment_%04d.ts") in cmd + + def test_input_has_low_latency_flags(self, tmp_path): + node = ff.receive_and_segment( + "tcp://0.0.0.0:4444?listen", tmp_path + ) + cmd = ff.compile_cmd(node) + cmd_str = " ".join(str(c) for c in cmd) + assert "nobuffer" in cmd_str + assert "low_delay" in cmd_str + + def test_copy_codec(self, tmp_path): + node = ff.receive_and_segment( + "tcp://0.0.0.0:4444?listen", tmp_path + ) + cmd = ff.compile_cmd(node) + assert "copy" in cmd + + +class TestReceiveAndSegmentWithMonitor: + def test_creates_fifo(self, tmp_path): + fifo = tmp_path / "monitor.pipe" + ff.receive_and_segment_with_monitor( + "tcp://0.0.0.0:4444?listen", tmp_path, fifo + ) + assert fifo.exists() + assert os.path.isfile(fifo) or os.stat(fifo).st_mode & 0o010000 # is fifo + + def test_does_not_recreate_existing_fifo(self, tmp_path): + fifo = tmp_path / "monitor.pipe" + os.mkfifo(str(fifo)) + inode_before = fifo.stat().st_ino + + ff.receive_and_segment_with_monitor( + "tcp://0.0.0.0:4444?listen", tmp_path, fifo + ) + assert fifo.stat().st_ino == inode_before + + def test_has_two_outputs(self, tmp_path): + fifo = tmp_path / "monitor.pipe" + node = ff.receive_and_segment_with_monitor( + "tcp://0.0.0.0:4444?listen", tmp_path, fifo + ) + cmd = ff.compile_cmd(node) + cmd_str = " ".join(str(c) for c in cmd) + # Should have segment output and fifo output + assert "segment_%04d.ts" in cmd_str + assert "monitor.pipe" in cmd_str + + def test_both_outputs_use_copy(self, tmp_path): + fifo = tmp_path / "monitor.pipe" + node = ff.receive_and_segment_with_monitor( + "tcp://0.0.0.0:4444?listen", tmp_path, fifo + ) + cmd = ff.compile_cmd(node) + # Should have copy codec for both outputs + assert cmd.count("copy") >= 2 + + +class TestExtractSceneFrames: + def test_compiles_select_filter(self, tmp_path): + # We can't run ffmpeg without a real file, but we can test compilation + import ffmpeg + stream = ffmpeg.input(str(tmp_path / "test.ts")) + stream = stream.filter("select", "gt(scene\\,0.3)+gte(t-prev_selected_t\\,30)") + stream = stream.filter("showinfo") + output = ffmpeg.output( + stream, + str(tmp_path / "F%04d.jpg"), + vsync="vfr", + **{"q:v": "2"}, + start_number=1, + ) + cmd = ff.compile_cmd(output) + cmd_str = " ".join(str(c) for c in cmd) + assert "select" in cmd_str + assert "scene" in cmd_str + assert "showinfo" in cmd_str + assert "vfr" in cmd_str + + @patch("cht.stream.ffmpeg.subprocess.run") + def test_calls_subprocess_with_timeout(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(stdout="", stderr="") + ff.extract_scene_frames( + tmp_path / "test.ts", tmp_path, + scene_threshold=0.4, max_interval=20, start_number=5, + ) + mock_run.assert_called_once() + call_kwargs = mock_run.call_args + assert call_kwargs.kwargs["timeout"] == 120 + assert call_kwargs.kwargs["capture_output"] is True + + @patch("cht.stream.ffmpeg.subprocess.run") + def test_returns_stdout_stderr(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(stdout="out", stderr="err") + stdout, stderr = ff.extract_scene_frames( + tmp_path / "test.ts", tmp_path, + ) + assert stdout == "out" + assert stderr == "err" + + @patch("cht.stream.ffmpeg.subprocess.run") + def test_start_number_in_cmd(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(stdout="", stderr="") + ff.extract_scene_frames( + tmp_path / "test.ts", tmp_path, start_number=42 + ) + cmd = mock_run.call_args.args[0] + assert "42" in [str(c) for c in cmd] + + +class TestExtractAudioPcm: + def test_compiles_audio_extraction(self, tmp_path): + node = ff.extract_audio_pcm(tmp_path / "test.ts") + cmd = ff.compile_cmd(node) + cmd_str = " ".join(str(c) for c in cmd) + assert "pcm_s16le" in cmd_str + assert "16000" in cmd_str + assert "pipe:" in cmd_str + assert "wav" in cmd_str + + +class TestCompileCmd: + def test_inserts_global_flags_after_ffmpeg(self, tmp_path): + import ffmpeg + node = ffmpeg.input(str(tmp_path / "test.ts")).output(str(tmp_path / "out.ts")) + cmd = ff.compile_cmd(node) + assert cmd[0] == "ffmpeg" + assert cmd[1] == "-hide_banner" + assert cmd[2] == "-loglevel" + assert cmd[3] == "warning" + + +class TestRunAsync: + @patch("cht.stream.ffmpeg.subprocess.Popen") + def test_default_no_pipes(self, mock_popen, tmp_path): + import ffmpeg + node = ffmpeg.input("test").output("out") + ff.run_async(node) + call_kwargs = mock_popen.call_args + assert call_kwargs.kwargs["stdout"] == subprocess.DEVNULL + assert call_kwargs.kwargs["stderr"] == subprocess.DEVNULL + + @patch("cht.stream.ffmpeg.subprocess.Popen") + def test_pipe_stdout(self, mock_popen, tmp_path): + import ffmpeg + node = ffmpeg.input("test").output("out") + ff.run_async(node, pipe_stdout=True) + call_kwargs = mock_popen.call_args + assert call_kwargs.kwargs["stdout"] == subprocess.PIPE + + @patch("cht.stream.ffmpeg.subprocess.Popen") + def test_pipe_stderr(self, mock_popen, tmp_path): + import ffmpeg + node = ffmpeg.input("test").output("out") + ff.run_async(node, pipe_stderr=True) + call_kwargs = mock_popen.call_args + assert call_kwargs.kwargs["stderr"] == subprocess.PIPE + + +class TestRunSync: + @patch("cht.stream.ffmpeg.subprocess.run") + def test_returns_tuple(self, mock_run): + mock_run.return_value = MagicMock(stdout="hello", stderr="world") + import ffmpeg + node = ffmpeg.input("test").output("out") + stdout, stderr = ff.run_sync(node) + assert stdout == "hello" + assert stderr == "world" + + @patch("cht.stream.ffmpeg.subprocess.run") + def test_passes_timeout(self, mock_run): + mock_run.return_value = MagicMock(stdout="", stderr="") + import ffmpeg + node = ffmpeg.input("test").output("out") + ff.run_sync(node, timeout=30) + assert mock_run.call_args.kwargs["timeout"] == 30 + + +class TestStopProc: + def test_sends_sigint_then_waits(self): + proc = MagicMock() + proc.poll.return_value = None + ff.stop_proc(proc, timeout=3) + proc.send_signal.assert_called_once_with(signal.SIGINT) + proc.wait.assert_called_once_with(timeout=3) + + def test_kills_on_timeout(self): + proc = MagicMock() + proc.poll.return_value = None + proc.wait.side_effect = subprocess.TimeoutExpired("ffmpeg", 3) + ff.stop_proc(proc, timeout=3) + proc.kill.assert_called_once() + + def test_noop_if_already_exited(self): + proc = MagicMock() + proc.poll.return_value = 0 + ff.stop_proc(proc) + proc.send_signal.assert_not_called() + + def test_noop_if_none(self): + ff.stop_proc(None) # should not raise diff --git a/tests/test_manager.py b/tests/test_manager.py new file mode 100644 index 0000000..2392bff --- /dev/null +++ b/tests/test_manager.py @@ -0,0 +1,270 @@ +"""Tests for cht.stream.manager — StreamManager orchestration.""" + +import json +import os +import time +from pathlib import Path +from unittest.mock import patch, MagicMock, call + +import pytest + +from cht.stream.manager import StreamManager + + +@pytest.fixture +def manager(tmp_path): + """StreamManager with session dir in tmp_path.""" + with patch("cht.stream.manager.SESSIONS_DIR", tmp_path): + mgr = StreamManager(session_id="test_session") + yield mgr + mgr.stop_all() + + +class TestInit: + def test_session_id_default(self, tmp_path): + with patch("cht.stream.manager.SESSIONS_DIR", tmp_path): + mgr = StreamManager() + assert mgr.session_id # should be a timestamp string + + def test_session_id_custom(self, manager): + assert manager.session_id == "test_session" + + def test_dirs_not_created_on_init(self, manager): + assert not manager.stream_dir.exists() + + +class TestSetupDirs: + def test_creates_all_subdirs(self, manager): + manager.setup_dirs() + assert manager.stream_dir.is_dir() + assert manager.frames_dir.is_dir() + assert manager.transcript_dir.is_dir() + assert manager.agent_dir.is_dir() + + def test_idempotent(self, manager): + manager.setup_dirs() + manager.setup_dirs() + assert manager.stream_dir.is_dir() + + +class TestStreamUrl: + def test_default_url(self, manager): + assert "0.0.0.0" in manager.stream_url + assert "4444" in manager.stream_url + assert "listen" in manager.stream_url + + +class TestStartRecorder: + @patch("cht.stream.manager.ff.run_async") + @patch("cht.stream.manager.ff.receive_and_segment") + def test_calls_ffmpeg_module(self, mock_segment, mock_async, manager): + manager.setup_dirs() + mock_node = MagicMock() + mock_segment.return_value = mock_node + + manager.start_recorder() + + mock_segment.assert_called_once_with( + manager.stream_url, manager.stream_dir, 60, + ) + mock_async.assert_called_once_with(mock_node) + assert "recorder" in manager._procs + + +class TestStartRecorderWithMonitor: + @patch("cht.stream.manager.ff.run_async") + @patch("cht.stream.manager.ff.receive_and_segment_with_monitor") + def test_creates_fifo_and_starts(self, mock_monitor, mock_async, manager): + mock_node = MagicMock() + mock_monitor.return_value = mock_node + + fifo = manager.start_recorder_with_monitor() + + assert fifo == manager.session_dir / "monitor.pipe" + mock_monitor.assert_called_once() + mock_async.assert_called_once_with(mock_node) + + @patch("cht.stream.manager.ff.run_async") + @patch("cht.stream.manager.ff.receive_and_segment_with_monitor") + def test_setup_dirs_called(self, mock_monitor, mock_async, manager): + mock_monitor.return_value = MagicMock() + manager.start_recorder_with_monitor() + assert manager.stream_dir.is_dir() + + +class TestStopAll: + @patch("cht.stream.manager.ff.stop_proc") + def test_stops_all_procs(self, mock_stop, manager): + proc1, proc2 = MagicMock(), MagicMock() + manager._procs = {"a": proc1, "b": proc2} + + manager.stop_all() + + mock_stop.assert_any_call(proc1) + mock_stop.assert_any_call(proc2) + assert len(manager._procs) == 0 + + def test_sets_stop_flag(self, manager): + manager.stop_all() + assert "stop" in manager._stop_flags + + +class TestParseFrameTimestamps: + def test_parses_showinfo_output(self, manager): + manager.setup_dirs() + # Create fake frame files + for i in range(1, 4): + (manager.frames_dir / f"F{i:04d}.jpg").touch() + + stderr = ( + "[Parsed_showinfo_1 @ 0x1234] n:0 pts:1000 pts_time:10.5 other stuff\n" + "some other line\n" + "[Parsed_showinfo_1 @ 0x1234] n:1 pts:2000 pts_time:20.0 other stuff\n" + "[Parsed_showinfo_1 @ 0x1234] n:2 pts:3000 pts_time:35.7 other stuff\n" + ) + + manager._parse_frame_timestamps(stderr, start_num=1) + + index_path = manager.frames_dir / "index.json" + assert index_path.exists() + + with open(index_path) as f: + index = json.load(f) + + assert len(index) == 3 + assert index[0]["id"] == "F0001" + assert index[0]["timestamp"] == 10.5 + assert index[0]["sent_to_agent"] is False + assert index[1]["id"] == "F0002" + assert index[1]["timestamp"] == 20.0 + assert index[2]["id"] == "F0003" + assert index[2]["timestamp"] == 35.7 + + def test_appends_to_existing_index(self, manager): + manager.setup_dirs() + index_path = manager.frames_dir / "index.json" + + # Pre-existing index + existing = [{"id": "F0001", "timestamp": 5.0, "path": "/old", "sent_to_agent": True}] + with open(index_path, "w") as f: + json.dump(existing, f) + + # New frame + (manager.frames_dir / "F0002.jpg").touch() + stderr = "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:15.0 stuff\n" + manager._parse_frame_timestamps(stderr, start_num=2) + + with open(index_path) as f: + index = json.load(f) + + assert len(index) == 2 + assert index[0]["id"] == "F0001" # preserved + assert index[1]["id"] == "F0002" # new + + def test_skips_missing_frame_files(self, manager): + manager.setup_dirs() + # Don't create the frame file + stderr = "[Parsed_showinfo_1 @ 0x1] n:0 pts:100 pts_time:10.0 stuff\n" + manager._parse_frame_timestamps(stderr, start_num=1) + + index_path = manager.frames_dir / "index.json" + with open(index_path) as f: + index = json.load(f) + assert len(index) == 0 + + def test_ignores_non_showinfo_lines(self, manager): + manager.setup_dirs() + (manager.frames_dir / "F0001.jpg").touch() + + stderr = ( + "frame= 100 fps=30 q=28.0 size= 1024kB\n" + "video:500kB audio:200kB subtitle:0kB other streams:0kB\n" + ) + manager._parse_frame_timestamps(stderr, start_num=1) + + index_path = manager.frames_dir / "index.json" + with open(index_path) as f: + index = json.load(f) + assert len(index) == 0 + + +class TestExtractFramesFromFile: + @patch("cht.stream.manager.ff.extract_scene_frames") + def test_calls_ffmpeg_with_correct_args(self, mock_extract, manager): + manager.setup_dirs() + seg = manager.stream_dir / "segment_0001.ts" + seg.touch() + + mock_extract.return_value = ("", "") + manager._extract_frames_from_file(seg) + + mock_extract.assert_called_once_with( + seg, + manager.frames_dir, + scene_threshold=0.3, + max_interval=30, + start_number=1, + ) + + @patch("cht.stream.manager.ff.extract_scene_frames") + def test_continues_numbering(self, mock_extract, manager): + manager.setup_dirs() + # Pre-existing frames + (manager.frames_dir / "F0001.jpg").touch() + (manager.frames_dir / "F0002.jpg").touch() + + seg = manager.stream_dir / "segment_0002.ts" + seg.touch() + + mock_extract.return_value = ("", "") + manager._extract_frames_from_file(seg) + + assert mock_extract.call_args.kwargs["start_number"] == 3 + + @patch("cht.stream.manager.ff.extract_scene_frames") + def test_handles_ffmpeg_failure(self, mock_extract, manager): + manager.setup_dirs() + seg = manager.stream_dir / "segment_0001.ts" + seg.touch() + + mock_extract.side_effect = RuntimeError("ffmpeg died") + # Should not raise + manager._extract_frames_from_file(seg) + + +class TestFrameWatcher: + @patch("cht.stream.manager.ff.extract_scene_frames") + def test_detects_new_segments(self, mock_extract, manager): + manager.setup_dirs() + mock_extract.return_value = ("", "") + + manager.start_frame_extractor() + + # Create a segment file + seg = manager.stream_dir / "segment_0001.ts" + seg.write_bytes(b"\x00" * 100) + + # Wait for watcher to pick it up + time.sleep(3) + manager.stop_all() + + mock_extract.assert_called() + + @patch("cht.stream.manager.ff.extract_scene_frames") + def test_skips_already_seen(self, mock_extract, manager): + manager.setup_dirs() + mock_extract.return_value = ("", "") + + # Pre-create segment + seg = manager.stream_dir / "segment_0001.ts" + seg.write_bytes(b"\x00" * 100) + + manager.start_frame_extractor() + time.sleep(3) + call_count = mock_extract.call_count + + # Wait another cycle — should not re-process + time.sleep(3) + manager.stop_all() + + assert mock_extract.call_count == call_count