312 lines
12 KiB
Python
312 lines
12 KiB
Python
"""Stream lifecycle — manages recording, scene detection, audio extraction, and transcription buffering."""
|
|
|
|
import logging
|
|
import time
|
|
from threading import Thread
|
|
|
|
from gi.repository import GLib
|
|
|
|
from cht.config import TRANSCRIBE_MIN_CHUNK_S
|
|
from cht.session import rebuild_manifest
|
|
from cht.stream.manager import StreamManager
|
|
from cht.stream.tracker import RecordingTracker
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamLifecycle:
|
|
"""Owns the streaming process state and coordinates background tasks.
|
|
|
|
The window provides UI-facing callbacks; this class handles the
|
|
process-management side (recorder, tracker, scene detection, audio
|
|
extraction, transcription buffering).
|
|
"""
|
|
|
|
def __init__(self, *, timeline, waveform_engine, transcriber,
|
|
on_new_frames, on_waveform_update, on_transcript_ready,
|
|
on_scene_marker, on_recorder_restarted,
|
|
on_manifest_updated=None):
|
|
self._timeline = timeline
|
|
self._waveform_engine = waveform_engine
|
|
self._transcriber = transcriber
|
|
|
|
# Callbacks
|
|
self._on_new_frames = on_new_frames
|
|
self._on_waveform_update = on_waveform_update
|
|
self._on_transcript_ready = on_transcript_ready
|
|
self._on_scene_marker = on_scene_marker
|
|
self._on_recorder_restarted = on_recorder_restarted
|
|
self._on_manifest_updated = on_manifest_updated
|
|
|
|
# State
|
|
self._streaming = False
|
|
self._gone_live = False
|
|
self._stream_mgr: StreamManager | None = None
|
|
self._tracker: RecordingTracker | None = None
|
|
self._pending_transcript_audio: list[tuple] = []
|
|
self._pending_transcript_duration = 0.0
|
|
|
|
@property
|
|
def is_streaming(self) -> bool:
|
|
return self._streaming
|
|
|
|
@property
|
|
def stream_mgr(self) -> StreamManager | None:
|
|
return self._stream_mgr
|
|
|
|
@property
|
|
def tracker(self) -> RecordingTracker | None:
|
|
return self._tracker
|
|
|
|
def start(self, session_id=None, rust_transport=False) -> StreamManager:
|
|
"""Start recording and all background processes. Returns the StreamManager.
|
|
|
|
rust_transport=True: skip StreamRecorder (Rust cht-server handles TCP +
|
|
fMP4 + UDP relay). Session dir is discovered from data/active-session
|
|
written by cht-server on first client connection.
|
|
"""
|
|
self._streaming = True
|
|
self._gone_live = False
|
|
self._start_monotonic = time.monotonic()
|
|
self._rust_transport = rust_transport
|
|
|
|
if rust_transport:
|
|
# Wait for cht-server to write the active session path.
|
|
session_dir = self._wait_for_rust_session()
|
|
if session_dir is None:
|
|
log.error("Timed out waiting for cht-server session")
|
|
self._streaming = False
|
|
return None
|
|
self._stream_mgr = StreamManager.from_rust_session(session_dir)
|
|
else:
|
|
self._stream_mgr = StreamManager(session_id=session_id)
|
|
self._stream_mgr.setup_dirs()
|
|
self._stream_mgr.start_recorder()
|
|
|
|
self._tracker = RecordingTracker(
|
|
get_segments=lambda: self._stream_mgr.recording_segments if self._stream_mgr else [],
|
|
on_duration_update=self._on_duration_update,
|
|
)
|
|
self._tracker.start()
|
|
|
|
self._stream_mgr.start_scene_detector(on_new_frames=self._handle_new_scene_frames)
|
|
self._stream_mgr.start_audio_extractor(on_new_audio=self._handle_new_audio)
|
|
|
|
GLib.timeout_add(1000, self._tick_live)
|
|
if not rust_transport:
|
|
GLib.timeout_add(2000, self._check_recorder)
|
|
|
|
return self._stream_mgr
|
|
|
|
def _wait_for_rust_session(self, timeout=30, poll_interval=0.5):
|
|
"""Poll data/active-session until cht-server writes it."""
|
|
import time
|
|
from pathlib import Path
|
|
from cht.config import DATA_DIR
|
|
marker = DATA_DIR / "active-session"
|
|
|
|
# If marker exists, check liveness via data/scene.sock (fixed path).
|
|
if marker.exists():
|
|
try:
|
|
session_dir = Path(marker.read_text().strip())
|
|
scene_sock = DATA_DIR / "scene.sock"
|
|
if session_dir.exists() and scene_sock.exists():
|
|
import socket
|
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
try:
|
|
s.connect(str(scene_sock))
|
|
s.close()
|
|
log.info("Rust session dir (already active): %s", session_dir)
|
|
return session_dir
|
|
except OSError:
|
|
log.info("Stale scene.sock, cleaning up")
|
|
scene_sock.unlink(missing_ok=True)
|
|
marker.unlink(missing_ok=True)
|
|
log.info("Cleared stale active-session marker")
|
|
except Exception:
|
|
marker.unlink(missing_ok=True)
|
|
|
|
elapsed = 0.0
|
|
while elapsed < timeout:
|
|
if marker.exists():
|
|
session_dir = Path(marker.read_text().strip())
|
|
if session_dir.exists():
|
|
log.info("Rust session dir: %s", session_dir)
|
|
return session_dir
|
|
time.sleep(poll_interval)
|
|
elapsed += poll_interval
|
|
return None
|
|
|
|
def stop(self):
|
|
"""Stop all processes and reset state. Does NOT touch UI — caller handles that."""
|
|
if self._tracker:
|
|
self._tracker.stop()
|
|
self._tracker = None
|
|
|
|
readonly = self._stream_mgr.readonly if self._stream_mgr else True
|
|
session_dir = self._stream_mgr.session_dir if self._stream_mgr else None
|
|
if self._stream_mgr:
|
|
if not readonly:
|
|
self._stream_mgr.stop_all()
|
|
self._stream_mgr = None
|
|
|
|
# Rebuild manifest now that all segments are finalized
|
|
if session_dir and not readonly:
|
|
try:
|
|
rebuild_manifest(session_dir)
|
|
except Exception as e:
|
|
log.error("Failed to rebuild manifest on stop: %s", e)
|
|
|
|
self._streaming = False
|
|
self._gone_live = False
|
|
self._pending_transcript_audio.clear()
|
|
self._pending_transcript_duration = 0.0
|
|
|
|
def set_manager_readonly(self, mgr: StreamManager):
|
|
"""Set a read-only stream manager (for loaded sessions, no streaming)."""
|
|
self._stream_mgr = mgr
|
|
|
|
def clear_manager(self):
|
|
"""Clear the stream manager without stopping processes."""
|
|
self._stream_mgr = None
|
|
|
|
# -- Internal callbacks --
|
|
|
|
def _on_duration_update(self, duration):
|
|
GLib.idle_add(self._timeline.set_duration, duration)
|
|
if not self._gone_live:
|
|
self._gone_live = True
|
|
GLib.idle_add(self._go_live_once)
|
|
if self._stream_mgr:
|
|
self._stream_mgr.capture_now(on_new_frames=self._handle_new_scene_frames)
|
|
|
|
def _go_live_once(self):
|
|
if self._stream_mgr:
|
|
elapsed = time.monotonic() - self._start_monotonic
|
|
log.info("Going LIVE (startup delay elapsed)")
|
|
self._timeline.go_live()
|
|
if self._stream_mgr.telemetry:
|
|
self._stream_mgr.telemetry.metric("first_live", {
|
|
"elapsed_s": round(elapsed, 2),
|
|
})
|
|
return False
|
|
|
|
def _tick_live(self):
|
|
if not self._streaming:
|
|
return False
|
|
self._timeline.tick_live()
|
|
return True
|
|
|
|
def _handle_new_scene_frames(self, frames):
|
|
for f in frames:
|
|
GLib.idle_add(self._on_scene_marker, f["timestamp"])
|
|
self._on_new_frames(frames)
|
|
|
|
def _handle_new_audio(self, wav_path, start_time, duration,
|
|
segment_path=None, local_start=None):
|
|
if not self._stream_mgr:
|
|
return
|
|
# start_time is global; waveform uses global time
|
|
self._waveform_engine.append_chunk(wav_path, start_time)
|
|
peaks = self._waveform_engine.peaks
|
|
bucket_dur = self._waveform_engine.bucket_duration
|
|
GLib.idle_add(self._on_waveform_update, peaks.copy(), bucket_dur)
|
|
|
|
self._pending_transcript_audio.append({
|
|
"wav": wav_path, "global_start": start_time, "duration": duration,
|
|
"segment_path": segment_path or self._stream_mgr.recording_path,
|
|
"local_start": local_start if local_start is not None else start_time,
|
|
})
|
|
self._pending_transcript_duration += duration
|
|
if self._pending_transcript_duration < TRANSCRIBE_MIN_CHUNK_S:
|
|
return
|
|
|
|
first = self._pending_transcript_audio[0]
|
|
first_global = first["global_start"]
|
|
first_local = first["local_start"]
|
|
seg_path = first["segment_path"]
|
|
total_dur = self._pending_transcript_duration
|
|
self._pending_transcript_audio.clear()
|
|
self._pending_transcript_duration = 0.0
|
|
|
|
mgr = self._stream_mgr
|
|
chunk_wav = mgr.audio_dir / f"transcript_{int(first_global):06d}.wav"
|
|
|
|
def _transcribe():
|
|
from cht.stream import ffmpeg as ff
|
|
try:
|
|
# Extract audio using local time within the segment file
|
|
ff.extract_audio_chunk(
|
|
seg_path, chunk_wav,
|
|
start_time=first_local, duration=total_dur,
|
|
)
|
|
except Exception as e:
|
|
log.error("Transcript audio extraction failed: %s", e)
|
|
return
|
|
if not chunk_wav.exists():
|
|
return
|
|
# Transcribe with global time offset so segment timestamps are global
|
|
new_segs = self._transcriber.transcribe_chunk(chunk_wav, time_offset=first_global)
|
|
self._transcriber.save_index(mgr.transcript_dir / "index.json")
|
|
if new_segs:
|
|
GLib.idle_add(self._on_transcript_ready, new_segs)
|
|
|
|
Thread(target=_transcribe, daemon=True, name="transcriber").start()
|
|
|
|
def _flush_pending_transcript(self):
|
|
"""Force-transcribe any buffered audio (called before segment rotation)."""
|
|
if not self._pending_transcript_audio or not self._stream_mgr:
|
|
return
|
|
first = self._pending_transcript_audio[0]
|
|
first_global = first["global_start"]
|
|
first_local = first["local_start"]
|
|
seg_path = first["segment_path"]
|
|
total_dur = self._pending_transcript_duration
|
|
self._pending_transcript_audio.clear()
|
|
self._pending_transcript_duration = 0.0
|
|
|
|
mgr = self._stream_mgr
|
|
chunk_wav = mgr.audio_dir / f"transcript_{int(first_global):06d}.wav"
|
|
|
|
def _transcribe():
|
|
from cht.stream import ffmpeg as ff
|
|
try:
|
|
ff.extract_audio_chunk(
|
|
seg_path, chunk_wav,
|
|
start_time=first_local, duration=total_dur,
|
|
)
|
|
except Exception as e:
|
|
log.error("Flush transcript failed: %s", e)
|
|
return
|
|
if not chunk_wav.exists():
|
|
return
|
|
new_segs = self._transcriber.transcribe_chunk(chunk_wav, time_offset=first_global)
|
|
self._transcriber.save_index(mgr.transcript_dir / "index.json")
|
|
if new_segs:
|
|
GLib.idle_add(self._on_transcript_ready, new_segs)
|
|
|
|
Thread(target=_transcribe, daemon=True, name="transcriber_flush").start()
|
|
|
|
def _check_recorder(self):
|
|
if not self._streaming or not self._stream_mgr:
|
|
return False
|
|
if not self._stream_mgr.recorder_alive():
|
|
log.warning("Recorder died — restarting into new segment")
|
|
self._rotate_segment()
|
|
return True
|
|
|
|
def _rotate_segment(self):
|
|
"""Restart recorder into a new segment and update manifest."""
|
|
# Flush pending transcript buffer from the old segment
|
|
if self._pending_transcript_audio and self._pending_transcript_duration > 0:
|
|
self._flush_pending_transcript()
|
|
|
|
self._stream_mgr.restart_recorder()
|
|
self._on_recorder_restarted(self._stream_mgr.recording_path)
|
|
try:
|
|
rebuild_manifest(self._stream_mgr.session_dir)
|
|
except Exception as e:
|
|
log.error("Manifest rebuild failed: %s", e)
|
|
if self._on_manifest_updated:
|
|
GLib.idle_add(self._on_manifest_updated)
|