#!/usr/bin/env python3 """Post-hoc session benchmark — extract timing metrics from session logs. Usage: python ctrl/bench.py --session data/sessions/20260410_160441 python ctrl/bench.py --latest # pick most recent session python ctrl/bench.py --latest --json # machine-readable output Parses telemetry.jsonl, session.log, frames/index.json, and (if present) media/logs/server.log. No live session required — works on finished sessions. Metrics: M1 Full startup Connect → first transcript M1a → first duration update M1b → first scene frame M1c → first audio chunk M1d → first transcript M5 Audio lag How far audio extraction trails real-time M6 Transcript lag Time from audio ready to transcript done M7 Frame throughput Scene frames per minute M9 Recorder health Unexpected restarts / segment rotations """ import argparse import json import logging import re import sys from datetime import datetime from pathlib import Path log = logging.getLogger("bench") PROJECT_DIR = Path(__file__).resolve().parent.parent DATA_DIR = PROJECT_DIR / "data" SESSIONS_DIR = DATA_DIR / "sessions" def parse_log_time(line: str) -> float | None: """Parse HH:MM:SS from session.log line → seconds since midnight.""" m = re.match(r"(\d{2}):(\d{2}):(\d{2})", line) if m: return int(m[1]) * 3600 + int(m[2]) * 60 + int(m[3]) return None def load_telemetry(session_dir: Path) -> list[dict]: path = session_dir / "telemetry.jsonl" if not path.exists(): return [] entries = [] for line in path.read_text().splitlines(): if line.strip(): try: entries.append(json.loads(line)) except json.JSONDecodeError: pass return entries def load_session_log(session_dir: Path) -> list[str]: path = session_dir / "session.log" return path.read_text().splitlines() if path.exists() else [] def load_frames_index(session_dir: Path) -> list[dict]: path = session_dir / "frames" / "index.json" if not path.exists(): return [] try: return json.loads(path.read_text()) except (json.JSONDecodeError, ValueError): return [] def find_first_line(lines: list[str], pattern: str) -> tuple[float | None, str | None]: """Find first line matching pattern. Returns (time_seconds, full_line).""" for line in lines: if pattern in line: return parse_log_time(line), line return None, None def extract_metrics(session_dir: Path) -> dict: tel = load_telemetry(session_dir) log_lines = load_session_log(session_dir) frames = load_frames_index(session_dir) metrics = {} # Session start time (from telemetry t=0 wall clock, or first log line) session_start_t = None for entry in tel: if entry.get("name") == "session_start": session_start_t = parse_log_time(log_lines[0]) if log_lines else None break # Session end session_duration = None for entry in tel: if entry.get("name") == "session_end": session_duration = entry.get("t") metrics["session_duration_s"] = session_duration # M1a: start → first duration update t_start = session_start_t t_duration, _ = find_first_line(log_lines, "Duration:") if t_start is not None and t_duration is not None: metrics["M1a_first_duration_s"] = t_duration - t_start # M1b: start → first scene frame t_frame, _ = find_first_line(log_lines, "Scene frame:") if t_start is not None and t_frame is not None: metrics["M1b_first_scene_frame_s"] = t_frame - t_start # M1c: start → first audio chunk t_audio, _ = find_first_line(log_lines, "Audio chunk:") if t_start is not None and t_audio is not None: metrics["M1c_first_audio_chunk_s"] = t_audio - t_start # M1d: start → first transcript (whisper processing) t_transcript, _ = find_first_line(log_lines, "faster_whisper: Processing audio") if t_start is not None and t_transcript is not None: metrics["M1d_first_transcript_s"] = t_transcript - t_start # M1: full startup = start → first transcript if "M1d_first_transcript_s" in metrics: metrics["M1_full_startup_s"] = metrics["M1d_first_transcript_s"] # Going LIVE time t_live, _ = find_first_line(log_lines, "Going LIVE") if t_start is not None and t_live is not None: metrics["going_live_s"] = t_live - t_start # M5: Audio extraction lag # Parse "Audio chunk: chunk_N (Xs → Ys, global Zs)" lines audio_lags = [] for line in log_lines: m = re.search(r"Audio chunk: \S+ \(([\d.]+)s → ([\d.]+)s, global ([\d.]+)s\)", line) if m: end_time = float(m[2]) log_t = parse_log_time(line) if log_t is not None and t_start is not None: wall_elapsed = log_t - t_start lag = wall_elapsed - end_time if lag >= 0: audio_lags.append(lag) if audio_lags: metrics["M5_audio_lag_avg_s"] = round(sum(audio_lags) / len(audio_lags), 1) metrics["M5_audio_lag_max_s"] = round(max(audio_lags), 1) metrics["M5_audio_lag_min_s"] = round(min(audio_lags), 1) metrics["M5_audio_chunk_count"] = len(audio_lags) # M6: Transcription lag # Parse faster_whisper "Processing audio with duration MM:SS.mmm" or "HH:MM:SS.mmm" transcript_durations = [] for line in log_lines: # MM:SS.mmm format (e.g., 00:06.145) m = re.search(r"faster_whisper: Processing audio with duration (\d+):([\d.]+)$", line) if m: dur = int(m[1]) * 60 + float(m[2]) transcript_durations.append(dur) continue # HH:MM:SS.mmm format m = re.search(r"faster_whisper: Processing audio with duration (\d+):(\d+):([\d.]+)", line) if m: dur = int(m[1]) * 3600 + int(m[2]) * 60 + float(m[3]) transcript_durations.append(dur) if transcript_durations: metrics["M6_whisper_processing_avg_s"] = round(sum(transcript_durations) / len(transcript_durations), 1) metrics["M6_transcript_count"] = len(transcript_durations) # M7: Frame throughput if frames and session_duration and session_duration > 0: minutes = session_duration / 60 metrics["M7_frame_throughput_per_min"] = round(len(frames) / minutes, 1) metrics["M7_total_frames"] = len(frames) # M9: Recorder health restarts = sum(1 for l in log_lines if "Recorder died" in l) segments = sum(1 for l in log_lines if "Restarting recorder" in l) metrics["M9_recorder_restarts"] = restarts metrics["M9_segment_rotations"] = segments # Scene detection mode if any("Scene detector: connecting" in l for l in log_lines): metrics["scene_mode"] = "rust_relay" elif any("Recorder+scene: pid=" in l for l in log_lines): metrics["scene_mode"] = "python_single_process" else: metrics["scene_mode"] = "unknown" # Transport mode — check for Rust-specific markers if any("Rust session dir" in l or "Attached to Rust session" in l for l in log_lines): metrics["transport"] = "rust" elif any("Recorder+scene: pid=" in l for l in log_lines): metrics["transport"] = "python" else: # Check file signatures: Rust writes audio.aac separately, Python muxes into fMP4 aac = session_dir / "stream" / "audio.aac" if aac.exists(): metrics["transport"] = "rust" elif any("run_async:" in l for l in log_lines): metrics["transport"] = "python" else: metrics["transport"] = "unknown" # Scene mode from log markers if metrics.get("scene_mode") == "unknown": if any("Recorder+scene: pid=" in l for l in log_lines): metrics["scene_mode"] = "python_single_process" elif any("run_async:" in l for l in log_lines): metrics["scene_mode"] = "python_single_process" return metrics def print_report(session_dir: Path, metrics: dict): log.info("=" * 60) log.info(" CHT Benchmark Report") log.info(" Session: %s", session_dir.name) log.info(" Transport: %s", metrics.get("transport", "?")) log.info(" Scene mode: %s", metrics.get("scene_mode", "?")) log.info(" Duration: %ss", metrics.get("session_duration_s", "?")) log.info("=" * 60) rows = [ ("M1", "Full startup", "M1_full_startup_s", "s"), ("M1a", " → first duration", "M1a_first_duration_s", "s"), ("M1b", " → first scene frame", "M1b_first_scene_frame_s", "s"), ("M1c", " → first audio chunk", "M1c_first_audio_chunk_s", "s"), ("M1d", " → first transcript", "M1d_first_transcript_s", "s"), ("", " → going live", "going_live_s", "s"), ("M5", "Audio lag (avg)", "M5_audio_lag_avg_s", "s"), ("M5", "Audio lag (max)", "M5_audio_lag_max_s", "s"), ("M5", "Audio chunks", "M5_audio_chunk_count", ""), ("M6", "Whisper processing (avg)", "M6_whisper_processing_avg_s", "s"), ("M6", "Transcripts produced", "M6_transcript_count", ""), ("M7", "Frame throughput", "M7_frame_throughput_per_min", "/min"), ("M7", "Total frames", "M7_total_frames", ""), ("M9", "Recorder restarts", "M9_recorder_restarts", ""), ("M9", "Segment rotations", "M9_segment_rotations", ""), ] for code, label, key, unit in rows: val = metrics.get(key) if val is not None: log.info(" %4s %28s %s%s", code, label, val, unit) else: log.info(" %4s %28s -", code, label) def compare_ground_truth(session_dir: Path, gt: dict) -> dict: """Compare detected scene frames against ground truth scene changes.""" frames = load_frames_index(session_dir) gt_scenes = gt.get("scenes", []) if not frames or not gt_scenes: return {"error": "no frames or no ground truth scenes"} detected_ts = sorted(f["timestamp"] for f in frames) expected_ts = sorted(s["timestamp_s"] for s in gt_scenes) # For each expected scene change, find the closest detected frame matches = [] for exp_ts in expected_ts: best = None best_delta = float("inf") for det_ts in detected_ts: delta = det_ts - exp_ts if abs(delta) < abs(best_delta): best_delta = delta best = det_ts matches.append({ "expected_s": exp_ts, "detected_s": best, "delta_s": round(best_delta, 3) if best is not None else None, }) deltas = [m["delta_s"] for m in matches if m["delta_s"] is not None] return { "expected_scenes": len(expected_ts), "detected_frames": len(detected_ts), "matches": matches, "avg_delta_s": round(sum(deltas) / len(deltas), 3) if deltas else None, "max_delta_s": round(max(abs(d) for d in deltas), 3) if deltas else None, "missed": sum(1 for m in matches if m["delta_s"] is None or abs(m["delta_s"]) > 10), } def print_ground_truth_report(gt: dict): log.info("") log.info(" Scene detection vs ground truth:") log.info(" Expected scenes: %d", gt.get("expected_scenes", 0)) log.info(" Detected frames: %d", gt.get("detected_frames", 0)) if gt.get("avg_delta_s") is not None: log.info(" Avg detection delta: %ss", gt["avg_delta_s"]) log.info(" Max detection delta: %ss", gt["max_delta_s"]) if gt.get("missed", 0) > 0: log.warning(" Missed scenes: %d", gt["missed"]) for m in gt.get("matches", []): status = "OK" if m["delta_s"] is not None and abs(m["delta_s"]) < 5 else "MISS" det = f"{m['detected_s']:.1f}s" if m["detected_s"] is not None else "---" delta = f"+{m['delta_s']:.1f}s" if m["delta_s"] is not None else "" log.info(" %4s expected=%5.1fs detected=%s %s", status, m["expected_s"], det, delta) def find_latest_session() -> Path | None: if not SESSIONS_DIR.exists(): return None dirs = sorted(SESSIONS_DIR.iterdir(), reverse=True) for d in dirs: if d.is_dir() and (d / "telemetry.jsonl").exists(): return d return None def main(): parser = argparse.ArgumentParser(description="CHT session benchmark") parser.add_argument("--session", type=Path, help="Path to session directory") parser.add_argument("--latest", action="store_true", help="Use most recent session") parser.add_argument("--json", action="store_true", help="Output JSON instead of table") parser.add_argument("--ground-truth", type=Path, help="Ground truth JSON for scene comparison") args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s: %(message)s", datefmt="%H:%M:%S", ) if args.latest: session_dir = find_latest_session() if not session_dir: log.error("No sessions found") sys.exit(1) elif args.session: session_dir = args.session else: parser.print_help() sys.exit(1) if not session_dir.exists(): log.error("Session not found: %s", session_dir) sys.exit(1) metrics = extract_metrics(session_dir) metrics["session_id"] = session_dir.name # Ground truth comparison if args.ground_truth and args.ground_truth.exists(): gt = json.loads(args.ground_truth.read_text()) comparison = compare_ground_truth(session_dir, gt) metrics["ground_truth"] = comparison if args.json: sys.stdout.write(json.dumps(metrics, indent=2) + "\n") else: print_report(session_dir, metrics) if "ground_truth" in metrics: print_ground_truth_report(metrics["ground_truth"]) # Save report bench_dir = DATA_DIR / "bench" bench_dir.mkdir(parents=True, exist_ok=True) report_path = bench_dir / f"{session_dir.name}.json" report_path.write_text(json.dumps(metrics, indent=2)) if __name__ == "__main__": main()