scrub optimization

This commit is contained in:
2026-04-03 06:40:08 -03:00
parent 9dfa252727
commit 84dc1405dc
13 changed files with 813 additions and 68 deletions

View File

@@ -6,6 +6,7 @@ from threading import Thread
from gi.repository import GLib
from cht.config import TRANSCRIBE_MIN_CHUNK_S
from cht.session import rebuild_manifest
from cht.stream.manager import StreamManager
from cht.stream.tracker import RecordingTracker
@@ -84,11 +85,19 @@ class StreamLifecycle:
self._tracker = None
readonly = self._stream_mgr.readonly if self._stream_mgr else True
session_dir = self._stream_mgr.session_dir if self._stream_mgr else None
if self._stream_mgr:
if not readonly:
self._stream_mgr.stop_all()
self._stream_mgr = None
# Rebuild manifest now that all segments are finalized
if session_dir and not readonly:
try:
rebuild_manifest(session_dir)
except Exception as e:
log.error("Failed to rebuild manifest on stop: %s", e)
self._streaming = False
self._gone_live = False
self._pending_transcript_audio.clear()
@@ -129,40 +138,51 @@ class StreamLifecycle:
GLib.idle_add(self._on_scene_marker, f["timestamp"])
self._on_new_frames(frames)
def _handle_new_audio(self, wav_path, start_time, duration):
def _handle_new_audio(self, wav_path, start_time, duration,
segment_path=None, local_start=None):
if not self._stream_mgr:
return
# start_time is global; waveform uses global time
self._waveform_engine.append_chunk(wav_path, start_time)
peaks = self._waveform_engine.peaks
bucket_dur = self._waveform_engine.bucket_duration
GLib.idle_add(self._on_waveform_update, peaks.copy(), bucket_dur)
self._pending_transcript_audio.append((wav_path, start_time, duration))
self._pending_transcript_audio.append({
"wav": wav_path, "global_start": start_time, "duration": duration,
"segment_path": segment_path or self._stream_mgr.recording_path,
"local_start": local_start if local_start is not None else start_time,
})
self._pending_transcript_duration += duration
if self._pending_transcript_duration < TRANSCRIBE_MIN_CHUNK_S:
return
first_start = self._pending_transcript_audio[0][1]
first = self._pending_transcript_audio[0]
first_global = first["global_start"]
first_local = first["local_start"]
seg_path = first["segment_path"]
total_dur = self._pending_transcript_duration
self._pending_transcript_audio.clear()
self._pending_transcript_duration = 0.0
mgr = self._stream_mgr
chunk_wav = mgr.audio_dir / f"transcript_{int(first_start):06d}.wav"
chunk_wav = mgr.audio_dir / f"transcript_{int(first_global):06d}.wav"
def _transcribe():
from cht.stream import ffmpeg as ff
try:
# Extract audio using local time within the segment file
ff.extract_audio_chunk(
mgr.recording_path, chunk_wav,
start_time=first_start, duration=total_dur,
seg_path, chunk_wav,
start_time=first_local, duration=total_dur,
)
except Exception as e:
log.error("Transcript audio extraction failed: %s", e)
return
if not chunk_wav.exists():
return
new_segs = self._transcriber.transcribe_chunk(chunk_wav, time_offset=first_start)
# Transcribe with global time offset so segment timestamps are global
new_segs = self._transcriber.transcribe_chunk(chunk_wav, time_offset=first_global)
self._transcriber.save_index(mgr.transcript_dir / "index.json")
if new_segs:
GLib.idle_add(self._on_transcript_ready, new_segs)

View File

@@ -65,6 +65,7 @@ class StreamManager:
self._threads = {}
self._stop_flags = set()
self._segment = 0
self._segment_offsets = {0: 0.0} # segment_index → global_offset
self.scene_threshold = SCENE_THRESHOLD
self.readonly = False # True when loaded from existing session
log.info("Session: %s", session_id)
@@ -72,6 +73,7 @@ class StreamManager:
@classmethod
def from_existing(cls, session_id):
"""Load an existing session without starting any ffmpeg processes."""
from cht.session import rebuild_manifest
mgr = cls(session_id=session_id)
if not mgr.session_dir.exists():
raise FileNotFoundError(f"Session not found: {session_id}")
@@ -80,10 +82,35 @@ class StreamManager:
segments = mgr.recording_segments
if segments:
mgr._segment = len(segments) - 1
mgr._rebuild_offsets()
rebuild_manifest(mgr.session_dir)
log.info("Loaded existing session: %s (%d segments, %d frames)",
session_id, len(segments), mgr.frame_count)
return mgr
@property
def current_global_offset(self) -> float:
"""Global time offset for the current recording segment."""
return self._segment_offsets.get(self._segment, 0.0)
def _rebuild_offsets(self):
"""Compute global offsets from all segments on disk."""
from cht.session import probe_duration
offset = 0.0
self._segment_offsets = {}
for i, seg in enumerate(self.recording_segments):
self._segment_offsets[i] = offset
offset += probe_duration(seg)
def _advance_segment_offset(self, completed_segment_path):
"""Update offsets after a segment completes and a new one begins."""
from cht.session import probe_duration
dur = probe_duration(completed_segment_path)
prev_offset = self._segment_offsets.get(self._segment, 0.0)
self._segment_offsets[self._segment + 1] = prev_offset + dur
log.info("Segment %d completed (%.1fs), next offset: %.1fs",
self._segment, dur, prev_offset + dur)
@property
def frame_count(self):
index_path = self.frames_dir / "index.json"
@@ -144,6 +171,7 @@ class StreamManager:
# Start after existing segments (for resumed sessions)
existing = self.recording_segments
self._segment = len(existing)
self._rebuild_offsets()
self._launch_recorder()
def restart_recorder(self):
@@ -151,8 +179,11 @@ class StreamManager:
old = self._procs.pop("recorder", None)
if old:
ff.stop_proc(old)
completed_path = self.recording_path
self._advance_segment_offset(completed_path)
self._segment += 1
log.info("Restarting recorder → segment %d", self._segment)
log.info("Restarting recorder → segment %d (offset %.1fs)",
self._segment, self.current_global_offset)
self._launch_recorder()
def recorder_alive(self):
@@ -297,6 +328,7 @@ class StreamManager:
index_path = self.frames_dir / "index.json"
index = json.loads(index_path.read_text()) if index_path.exists() else []
offset = self.current_global_offset
frame_num = start_number
for line in stderr.splitlines():
if "showinfo" not in line:
@@ -309,7 +341,7 @@ class StreamManager:
if frame_path.exists():
entry = {
"id": frame_id,
"timestamp": pts_time,
"timestamp": pts_time + offset,
"path": str(frame_path),
"sent_to_agent": False,
}
@@ -332,7 +364,8 @@ class StreamManager:
log.warning("capture_now: recording too short")
return
timestamp = safe_duration - 1
local_timestamp = safe_duration - 1
timestamp = local_timestamp + self.current_global_offset
index_path = self.frames_dir / "index.json"
index = json.loads(index_path.read_text()) if index_path.exists() else []
frame_num = len(index) + 1
@@ -340,7 +373,7 @@ class StreamManager:
frame_path = self.frames_dir / f"{frame_id}.jpg"
try:
ff.extract_frame_at(self.recording_path, frame_path, timestamp)
ff.extract_frame_at(self.recording_path, frame_path, local_timestamp)
except Exception as e:
log.error("capture_now failed: %s", e)
return
@@ -421,10 +454,14 @@ class StreamManager:
continue
if wav_path.exists() and wav_path.stat().st_size > 100:
log.info("Audio chunk: %s (%.1fs → %.1fs)",
wav_path.name, processed_time, process_to)
global_start = processed_time + self.current_global_offset
log.info("Audio chunk: %s (%.1fs → %.1fs, global %.1fs)",
wav_path.name, processed_time, process_to, global_start)
if self._on_new_audio:
self._on_new_audio(wav_path, processed_time, chunk_duration)
self._on_new_audio(
wav_path, global_start, chunk_duration,
segment_path=seg, local_start=processed_time,
)
chunk_num += 1
processed_time = process_to