Files
mitus/cht/summary/merger.py
2026-05-07 13:04:40 -03:00

89 lines
2.6 KiB
Python

"""Interleave diarized audio segments with selected screen frames by timestamp.
Direct port of mts/meetus/transcript_merger.py:merge_transcripts (line 162).
"""
import logging
log = logging.getLogger(__name__)
def merge(audio_segments: list[dict], frame_segments: list[dict],
name_map: dict[str, str] | None = None) -> list[dict]:
"""Combine and group by speaker; screen frames break speaker groups.
`audio_segments`: each {timestamp, text, speaker?}.
`frame_segments`: each {timestamp, frame_path}.
`name_map`: optional SPEAKER_xx → real name remap, applied to outputs.
Returns merged list sorted by timestamp.
"""
name_map = name_map or {}
audio = [{**s, "type": "audio"} for s in audio_segments]
screen = [{**s, "type": "screen"} for s in frame_segments]
all_segs = sorted(audio + screen, key=lambda x: x["timestamp"])
grouped: list[dict] = []
current = None
def _label(speaker):
if not speaker:
return None
return name_map.get(speaker, speaker)
for seg in all_segs:
if seg["type"] == "screen":
if current is not None:
grouped.append(current)
current = None
grouped.append(seg)
continue
speaker = _label(seg.get("speaker"))
if current is None:
current = {
"timestamp": seg["timestamp"],
"text": seg["text"],
"speaker": speaker,
"type": "audio",
}
elif speaker == current.get("speaker"):
current["text"] += " " + seg["text"]
else:
grouped.append(current)
current = {
"timestamp": seg["timestamp"],
"text": seg["text"],
"speaker": speaker,
"type": "audio",
}
if current is not None:
grouped.append(current)
return grouped
def whisperx_to_audio_segments(diarized: dict) -> list[dict]:
"""Convert whisperx JSON segments to the merger's audio format."""
out = []
for seg in diarized.get("segments", []):
text = (seg.get("text") or "").strip()
if not text:
continue
out.append({
"timestamp": float(seg.get("start", 0.0)),
"text": text,
"speaker": seg.get("speaker"),
})
return out
def collect_speakers(diarized: dict) -> list[str]:
"""Distinct SPEAKER_xx labels found in the diarization, sorted."""
seen = set()
for seg in diarized.get("segments", []):
sp = seg.get("speaker")
if sp:
seen.add(sp)
return sorted(seen)