""" RecordingTracker: monitors the growing recording file and reports duration. Probes with ffprobe every cycle. No bitrate estimation — initial burst frames make calibration unreliable. Falls back to file-size heuristic only when ffprobe returns nothing (e.g. file too new). Supports multi-segment recording: sums completed segment durations and adds the current segment's growing duration. """ import logging import time from pathlib import Path from threading import Thread import ffmpeg as ffmpeg_lib log = logging.getLogger(__name__) class RecordingTracker: """Tracks growing recording segments and reports total duration.""" def __init__(self, get_segments, on_duration_update=None): """ Args: get_segments: callable returning list of Path objects (all segments, ordered) on_duration_update: callback(duration_float) """ self._get_segments = get_segments self._on_duration = on_duration_update self._duration = 0.0 self._segment_cache = {} # path → finalized duration (only for completed segments) self._stop = False self._thread = None @property def duration(self): return self._duration def start(self): self._stop = False self._thread = Thread(target=self._poll_loop, daemon=True, name="rec_tracker") self._thread.start() log.info("RecordingTracker started") def stop(self): self._stop = True log.info("RecordingTracker stopped") def _poll_loop(self): while not self._stop: time.sleep(2) segments = self._get_segments() if not segments: continue total = 0.0 for i, seg in enumerate(segments): is_last = (i == len(segments) - 1) if not is_last and seg in self._segment_cache: # Completed segment — use cached duration total += self._segment_cache[seg] continue if not seg.exists(): continue size = seg.stat().st_size if size < 10_000: continue dur = self._probe_duration(seg) if dur: if not is_last: # Segment is done growing — cache it self._segment_cache[seg] = dur total += dur if total > self._duration: self._duration = total log.info("Duration: %.1fs (%d segments)", total, len(segments)) if self._on_duration: self._on_duration(self._duration) def _probe_duration(self, path): """Probe recording duration via ffprobe.""" try: info = ffmpeg_lib.probe(str(path)) dur = float(info.get("format", {}).get("duration", 0)) if dur > 0: return dur for stream in info.get("streams", []): sdur = float(stream.get("duration", 0)) if sdur > 0: return sdur except Exception as e: log.debug("ffprobe failed for %s: %s", path, e) try: return path.stat().st_size / 65_000 except Exception: return None