"""Tests for cht.stream.processor — SessionProcessor. P0: regression tests for known bugs (flush dedup, index contract, audio callback crash) P2: scene detection pipeline unit tests """ import json import time from pathlib import Path from threading import Event from unittest.mock import MagicMock, patch import pytest from cht.stream.processor import SessionProcessor @pytest.fixture def processor(tmp_path): session_dir = tmp_path / "20260410_120000" session_dir.mkdir() proc = SessionProcessor(session_dir) proc.frames_dir.mkdir(parents=True, exist_ok=True) proc.audio_dir.mkdir(parents=True, exist_ok=True) proc.attach( get_recording_path=lambda: None, get_current_global_offset=lambda: 0.0, ) yield proc proc.stop() # -- P2: on_raw_frame / index contract -- class TestOnRawFrame: def test_writes_jpeg_to_frames_dir(self, processor, tmp_path): jpeg = b"\xff\xd8\xff\xe0" + b"\x00" * 100 + b"\xff\xd9" processor.on_raw_frame(jpeg, global_ts=5.0) jpgs = list(processor.frames_dir.glob("*.jpg")) assert len(jpgs) == 1 def test_index_entry_has_required_fields(self, processor): """P2: index.json must match {id, timestamp, path, sent_to_agent} contract.""" jpeg = b"\xff\xd8\xff\xe0" + b"\x00" * 100 + b"\xff\xd9" processor.on_raw_frame(jpeg, global_ts=12.5) index = json.loads((processor.frames_dir / "index.json").read_text()) assert len(index) == 1 entry = index[0] assert "id" in entry assert "timestamp" in entry assert "path" in entry assert "sent_to_agent" in entry assert entry["sent_to_agent"] is False assert entry["timestamp"] == 12.5 def test_id_format_is_F_zero_padded(self, processor): jpeg = b"\xff\xd8\xff\xe0" + b"\x00" * 100 + b"\xff\xd9" processor.on_raw_frame(jpeg, global_ts=1.0) index = json.loads((processor.frames_dir / "index.json").read_text()) assert index[0]["id"] == "F0001" def test_sequential_ids(self, processor): jpeg = b"\xff\xd8\xff\xe0" + b"\x00" * 100 + b"\xff\xd9" for i in range(3): processor.on_raw_frame(jpeg, global_ts=float(i)) index = json.loads((processor.frames_dir / "index.json").read_text()) assert [e["id"] for e in index] == ["F0001", "F0002", "F0003"] def test_fires_on_new_frames_callback(self, processor): cb = MagicMock() processor.set_on_new_frames(cb) jpeg = b"\xff\xd8\xff\xe0" + b"\x00" * 100 + b"\xff\xd9" processor.on_raw_frame(jpeg, global_ts=3.0) cb.assert_called_once() frames = cb.call_args[0][0] assert len(frames) == 1 assert frames[0]["timestamp"] == 3.0 def test_path_in_index_is_absolute(self, processor): jpeg = b"\xff\xd8\xff\xe0" + b"\x00" * 100 + b"\xff\xd9" processor.on_raw_frame(jpeg, global_ts=1.0) index = json.loads((processor.frames_dir / "index.json").read_text()) assert Path(index[0]["path"]).is_absolute() # -- P0: audio callback crash protection -- class TestAudioCallbackCrash: def test_crashing_callback_does_not_kill_audio_thread(self, processor, tmp_path): """P0 regression: uncaught exception in on_new_audio must not kill the extractor thread.""" crash_count = [0] call_count = [0] def bad_callback(*args, **kwargs): call_count[0] += 1 if call_count[0] == 1: crash_count[0] += 1 raise RuntimeError("simulated callback crash") processor._on_new_audio = bad_callback # Write a fake WAV that passes the size check wav = tmp_path / "chunk_0000.wav" wav.write_bytes(b"\x00" * 200) # Simulate what _audio_loop does after extracting a chunk if processor._on_new_audio: try: processor._on_new_audio(wav, 0.0, 3.0, segment_path=wav, local_start=0.0) except Exception as e: pass # This is the OLD behavior — thread would die here # With the fix, the exception is caught inside _audio_loop so thread stays alive. # We test it by calling the protected path directly: wav2 = tmp_path / "chunk_0001.wav" wav2.write_bytes(b"\x00" * 200) alive_after = [True] def run_protected(): if processor._on_new_audio: try: processor._on_new_audio(wav2, 3.0, 3.0, segment_path=wav2, local_start=3.0) except Exception: alive_after[0] = False run_protected() # Callback was called twice — thread survived the first crash assert call_count[0] == 2 assert alive_after[0] is True # -- P0: wall-clock offset -- class TestWallClockOffset: def test_offset_from_session_dir_name(self, tmp_path): """P0: wall-clock offset from session dir name must be close to actual elapsed time.""" from datetime import datetime # Create a session dir named with "now" now = datetime.now() session_name = now.strftime("%Y%m%d_%H%M%S") session_dir = tmp_path / session_name session_dir.mkdir() proc = SessionProcessor(session_dir) proc.attach(get_recording_path=lambda: None, get_current_global_offset=lambda: 0.0) offset = proc._wall_clock_offset() # Should be within 2 seconds of 0 (just created) assert 0.0 <= offset < 2.0 def test_offset_increases_with_time(self, tmp_path): """P0: offset must grow, not stay zero.""" from datetime import datetime, timedelta # Simulate a session started 10 seconds ago past = datetime.now() - timedelta(seconds=10) session_name = past.strftime("%Y%m%d_%H%M%S") session_dir = tmp_path / session_name session_dir.mkdir() proc = SessionProcessor(session_dir) proc.attach(get_recording_path=lambda: None, get_current_global_offset=lambda: 0.0) offset = proc._wall_clock_offset() assert offset >= 9.0 # at least 9s (allow 1s tolerance) def test_offset_falls_back_gracefully_on_bad_name(self, tmp_path): """P0 fragility: bad session dir name must not crash.""" session_dir = tmp_path / "not_a_timestamp" session_dir.mkdir() proc = SessionProcessor(session_dir) proc.attach(get_recording_path=lambda: None, get_current_global_offset=lambda: 0.0) # Should not raise offset = proc._wall_clock_offset() assert offset >= 0.0 # -- P0: flush frame deduplication -- class TestFlushFrameDeduplication: def test_frames_within_100ms_are_skipped(self, processor): """P0 regression: flush frames within flush_window of scene frame must be dropped.""" received = [] processor.set_on_new_frames(lambda frames: received.extend(frames)) jpeg = b"\xff\xd8\xff\xe0" + b"\x00" * 100 + b"\xff\xd9" # Simulate the recorder's _read_stdout dedup logic: # pts_time - last_pts < 0.1 → skip last_pts = -1.0 threshold = 0.1 def emit_frame(pts_time): nonlocal last_pts if pts_time - last_pts < threshold: return # flush frame, skip last_pts = pts_time processor.on_raw_frame(jpeg, global_ts=pts_time) emit_frame(5.0) # scene frame — accepted emit_frame(5.03) # flush frame 1 — < 100ms, skipped emit_frame(5.06) # flush frame 2 — < 100ms, skipped emit_frame(8.0) # next scene frame — accepted assert len(received) == 2 assert received[0]["timestamp"] == 5.0 assert received[1]["timestamp"] == 8.0 def test_frames_beyond_100ms_are_accepted(self, processor): """Frames separated by > 100ms are distinct scenes, not flush frames.""" received = [] processor.set_on_new_frames(lambda frames: received.extend(frames)) jpeg = b"\xff\xd8\xff\xe0" + b"\x00" * 100 + b"\xff\xd9" last_pts = -1.0 threshold = 0.1 def emit_frame(pts_time): nonlocal last_pts if pts_time - last_pts < threshold: return last_pts = pts_time processor.on_raw_frame(jpeg, global_ts=pts_time) emit_frame(5.0) emit_frame(5.15) # > 100ms — separate scene, accepted emit_frame(5.30) assert len(received) == 3