audio and transcript
This commit is contained in:
0
cht/audio/__init__.py
Normal file
0
cht/audio/__init__.py
Normal file
90
cht/audio/waveform.py
Normal file
90
cht/audio/waveform.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""
|
||||
Waveform peak computation from WAV files.
|
||||
|
||||
Reads 16kHz mono PCM WAV files (as produced by ffmpeg extract_audio_chunk),
|
||||
computes RMS amplitude per time bucket, and stores peaks as a numpy array
|
||||
that grows incrementally during live recording.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import wave
|
||||
|
||||
import numpy as np
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WaveformEngine:
|
||||
"""Computes and accumulates waveform peak data from WAV chunks."""
|
||||
|
||||
def __init__(self, bucket_ms=50):
|
||||
self._bucket_ms = bucket_ms
|
||||
self._peaks = np.empty(0, dtype=np.float32)
|
||||
self._total_duration = 0.0
|
||||
|
||||
@property
|
||||
def peaks(self):
|
||||
return self._peaks
|
||||
|
||||
@property
|
||||
def bucket_duration(self):
|
||||
return self._bucket_ms / 1000.0
|
||||
|
||||
@property
|
||||
def total_duration(self):
|
||||
return self._total_duration
|
||||
|
||||
def append_chunk(self, wav_path, start_time):
|
||||
"""Read a WAV chunk and append its peaks to the internal array."""
|
||||
samples, sample_rate = self._read_wav(wav_path)
|
||||
if samples is None:
|
||||
return
|
||||
new_peaks = self._compute_rms(samples, sample_rate)
|
||||
if len(new_peaks) > 0:
|
||||
self._peaks = np.concatenate([self._peaks, new_peaks])
|
||||
chunk_duration = len(samples) / sample_rate
|
||||
self._total_duration = start_time + chunk_duration
|
||||
log.info("Waveform: +%d peaks (total %d, %.1fs)",
|
||||
len(new_peaks), len(self._peaks), self._total_duration)
|
||||
|
||||
def compute_full(self, wav_path):
|
||||
"""Compute all peaks from a complete WAV file (for loaded sessions)."""
|
||||
self._peaks = np.empty(0, dtype=np.float32)
|
||||
self._total_duration = 0.0
|
||||
samples, sample_rate = self._read_wav(wav_path)
|
||||
if samples is None:
|
||||
return
|
||||
self._peaks = self._compute_rms(samples, sample_rate)
|
||||
self._total_duration = len(samples) / sample_rate
|
||||
log.info("Waveform full: %d peaks, %.1fs", len(self._peaks), self._total_duration)
|
||||
|
||||
def reset(self):
|
||||
self._peaks = np.empty(0, dtype=np.float32)
|
||||
self._total_duration = 0.0
|
||||
|
||||
def _read_wav(self, wav_path):
|
||||
"""Read a 16-bit PCM WAV file into a float32 numpy array."""
|
||||
try:
|
||||
with wave.open(str(wav_path), "rb") as wf:
|
||||
n_frames = wf.getnframes()
|
||||
if n_frames == 0:
|
||||
return None, 0
|
||||
sample_rate = wf.getframerate()
|
||||
raw = wf.readframes(n_frames)
|
||||
samples = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
return samples, sample_rate
|
||||
except Exception as e:
|
||||
log.warning("Failed to read WAV %s: %s", wav_path, e)
|
||||
return None, 0
|
||||
|
||||
def _compute_rms(self, samples, sample_rate):
|
||||
"""Compute RMS amplitude per bucket."""
|
||||
bucket_size = int(sample_rate * self._bucket_ms / 1000)
|
||||
if bucket_size <= 0 or len(samples) < bucket_size:
|
||||
return np.empty(0, dtype=np.float32)
|
||||
|
||||
# Trim to whole buckets
|
||||
n_buckets = len(samples) // bucket_size
|
||||
trimmed = samples[:n_buckets * bucket_size].reshape(n_buckets, bucket_size)
|
||||
rms = np.sqrt(np.mean(trimmed ** 2, axis=1)).astype(np.float32)
|
||||
return rms
|
||||
Reference in New Issue
Block a user