Files
mitus/cht/stream/processor.py

466 lines
17 KiB
Python

"""SessionProcessor: processes raw frame data and audio from recordings.
Receives raw JPEG frames from StreamRecorder (via on_raw_frame callback) and
handles all frame processing: file writing, frame index, GUI callbacks.
Also extracts audio from fMP4 files by polling (latency-insensitive).
The boundary with StreamRecorder:
Recorder: reads pipe → fires on_raw_frame(jpeg_bytes, global_ts)
Processor: writes JPEG to disk, updates index, fires on_new_frames to GUI
When Rust owns transport, SessionProcessor connects to the server's Unix
domain socket (scene.sock) for a live H.264 stream, pipes it to ffmpeg
for GPU scene detection. Continuous stream — no polling, no restarts.
"""
import json
import logging
import os
import re
import socket
import time
from pathlib import Path
from queue import Queue, Empty
from threading import Thread
from cht.config import (
AUDIO_EXTRACT_INTERVAL,
AUDIO_SAFETY_MARGIN,
SCENE_THRESHOLD,
SCENE_FLUSH_FRAMES,
)
from cht.stream import ffmpeg as ff
log = logging.getLogger(__name__)
class SessionProcessor:
"""Writes scene frames to disk and extracts audio from fMP4."""
def __init__(self, session_dir: Path):
self.session_dir = session_dir
self.frames_dir = session_dir / "frames"
self.audio_dir = session_dir / "audio"
self._stop_flags: set[str] = set()
self._threads: dict[str, Thread] = {}
self._on_new_frames = None
self._on_new_audio = None
self._last_scene_capture = 0.0
self._get_recording_path = None
self._get_current_global_offset = None
def attach(self, get_recording_path, get_current_global_offset):
"""Wire up callbacks to query the recorder's current state."""
self._get_recording_path = get_recording_path
self._get_current_global_offset = get_current_global_offset
# -- Scene frame handling (called from recorder's pipe thread) --
def on_raw_frame(self, jpeg_bytes: bytes, global_ts: float):
"""Receive a raw JPEG frame from the recorder pipe. Write and index it."""
frame_num = self._next_frame_number()
frame_id = f"F{frame_num:04d}"
frame_path = self.frames_dir / f"{frame_id}.jpg"
frame_path.write_bytes(jpeg_bytes)
entry = {
"id": frame_id,
"timestamp": global_ts,
"path": str(frame_path),
"sent_to_agent": False,
}
self._append_frame_index(entry)
log.info("Scene frame: %s at %.1fs", frame_id, global_ts)
if self._on_new_frames:
self._on_new_frames([entry])
def set_on_new_frames(self, cb):
self._on_new_frames = cb
# -- On-demand capture (recorder extracts bytes, processor indexes) --
def on_captured_frame(self, jpeg_bytes: bytes, global_ts: float):
"""Receive a manually captured frame. Write and index it."""
self.on_raw_frame(jpeg_bytes, global_ts)
def capture_now_from_file(self):
"""Extract the current frame from the growing fMP4 (Rust transport mode)."""
import tempfile, os as _os
def _capture():
seg = self._get_recording_path() if self._get_recording_path else None
if not seg or not seg.exists():
log.warning("capture_now: no recording file")
return
try:
import subprocess
result = subprocess.run(
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", str(seg)],
capture_output=True, text=True,
)
duration = float(result.stdout.strip())
except Exception as e:
log.warning("capture_now: could not probe duration: %s", e)
return
if duration < 1:
log.warning("capture_now: recording too short")
return
timestamp = max(0, duration - 0.5)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
tmp_path = Path(tmp.name)
try:
ff.extract_frame_at(seg, tmp_path, timestamp)
if not tmp_path.exists():
log.warning("capture_now: frame not written")
return
jpeg_bytes = tmp_path.read_bytes()
except Exception as e:
log.error("capture_now failed: %s", e)
return
finally:
try:
_os.unlink(tmp_path)
except Exception:
pass
offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0
self.on_raw_frame(jpeg_bytes, timestamp + offset)
Thread(target=_capture, daemon=True, name="capture_now").start()
def _capture_current_frame(self):
"""Capture a fresh frame from the recording file's current tip.
Called when scene detection triggers. The scene filter's own JPEG
is stale (buffered in the encoder), so we extract directly from
the fMP4 which is always near-current.
"""
seg = self._get_recording_path() if self._get_recording_path else None
if not seg or not seg.exists():
return
duration = self._probe_safe_duration(seg)
if not duration or duration < 0.5:
return
local_ts = max(0, duration - 0.3)
offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0
import tempfile, os as _os
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
tmp_path = Path(tmp.name)
try:
ff.extract_frame_at(seg, tmp_path, local_ts)
if not tmp_path.exists() or tmp_path.stat().st_size == 0:
return
jpeg_bytes = tmp_path.read_bytes()
except Exception as e:
log.debug("Scene capture failed: %s", e)
return
finally:
try:
_os.unlink(tmp_path)
except Exception:
pass
self.on_raw_frame(jpeg_bytes, local_ts + offset)
def restart_scene_detector(self, threshold):
"""Restart scene detector with a new threshold.
Kills the running ffmpeg — the detector thread reconnects automatically
and picks up the new threshold on the next call to start_scene_detector.
"""
if "scene_detector" in self._procs:
ff.stop_proc(self._procs.pop("scene_detector"), timeout=2)
# Spawn a fresh thread with the new threshold; old thread will exit
# when its ffmpeg proc dies.
self.start_scene_detector(threshold=threshold)
# -- Frame index --
@property
def frame_count(self) -> int:
index_path = self.frames_dir / "index.json"
if index_path.exists():
try:
return len(json.loads(index_path.read_text()))
except Exception:
pass
return 0
def _next_frame_number(self) -> int:
index_path = self.frames_dir / "index.json"
if index_path.exists():
try:
return len(json.loads(index_path.read_text())) + 1
except Exception:
pass
return 1
def _append_frame_index(self, entry: dict):
index_path = self.frames_dir / "index.json"
index = json.loads(index_path.read_text()) if index_path.exists() else []
index.append(entry)
index_path.write_text(json.dumps(index, indent=2))
# -- Scene detection via Unix socket (Rust transport mode) --
def start_scene_detector(self, threshold=None):
"""Connect to Rust server's scene socket and run GPU scene detection.
The server provides a live H.264 stream via a Unix domain socket at
stream/scene.sock. We pipe it to ffmpeg for CUDA scene detection —
continuous stream, no polling, no restarts.
"""
threshold = threshold or SCENE_THRESHOLD
t = Thread(target=self._scene_detect_loop, daemon=True,
name="scene_detector", args=(threshold,))
t.start()
self._threads["scene_detector"] = t
def _scene_detect_loop(self, threshold):
"""Connect to scene socket, pipe H.264 to ffmpeg, read scene frames.
Retries on failure (e.g. ffmpeg dies from bad initial frames).
The server buffers the latest keyframe so reconnects start clean.
"""
from cht.config import DATA_DIR
socket_path = DATA_DIR / "scene.sock"
# Wait for the socket to appear (server creates it on session start).
while "stop" not in self._stop_flags:
if socket_path.exists():
break
time.sleep(0.5)
if "stop" in self._stop_flags:
return
while "stop" not in self._stop_flags:
try:
self._run_scene_session(socket_path, threshold)
except Exception:
log.exception("Scene detector error")
if "stop" in self._stop_flags:
break
# If the socket is gone, the session ended — don't retry.
if not socket_path.exists():
log.info("Scene detector: socket gone, session ended")
break
log.info("Scene detector: reconnecting in 2s...")
time.sleep(2.0)
log.info("Scene detector stopped")
def _run_scene_session(self, socket_path, threshold):
"""Single scene detection session: connect, run ffmpeg, read frames."""
log.info("Scene detector: connecting to %s", socket_path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(str(socket_path))
except OSError as e:
log.debug("Scene detector: connect failed: %s", e)
return
log.info("Scene detector: connected, starting ffmpeg")
node = ff.detect_scenes_from_pipe(
scene_threshold=threshold, flush_frames=SCENE_FLUSH_FRAMES,
)
proc = ff.run_async(node, pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
self._procs = getattr(self, "_procs", {})
self._procs["scene_detector"] = proc
# Thread: socket → ffmpeg stdin
def _feed_stdin():
try:
while "stop" not in self._stop_flags:
data = sock.recv(65536)
if not data:
break
try:
proc.stdin.write(data)
proc.stdin.flush()
except (BrokenPipeError, OSError):
break
finally:
try:
proc.stdin.close()
except OSError:
pass
sock.close()
log.debug("Scene detector: stdin feeder stopped")
stdin_t = Thread(target=_feed_stdin, daemon=True, name="scene_stdin")
stdin_t.start()
# Thread: ffmpeg stderr → parse showinfo timestamps → queue
ts_queue = Queue()
offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0
def _read_stderr():
for raw in proc.stderr:
line = raw.decode("utf-8", errors="replace").rstrip()
if not line:
continue
if "showinfo" in line:
pts_match = re.search(r"pts_time:\s*([\d.]+)", line)
if pts_match:
ts_queue.put(float(pts_match.group(1)))
elif line.startswith("[") or "error" in line.lower() or "warning" in line.lower():
log.debug("[scene] %s", line)
log.debug("[scene] stderr closed")
stderr_t = Thread(target=_read_stderr, daemon=True, name="scene_stderr")
stderr_t.start()
# Main: read JPEG frames from stdout, pair with stderr timestamps,
# skip flush frames. Same proven pattern as StreamRecorder._read_stdout.
flush_window = (SCENE_FLUSH_FRAMES + 1) / 30.0
last_pts = 0.0
buf = b""
raw_fd = proc.stdout.fileno()
while True:
chunk = os.read(raw_fd, 65536)
if not chunk:
break
buf += chunk
while True:
soi = buf.find(b"\xff\xd8")
if soi < 0:
buf = b""
break
eoi = buf.find(b"\xff\xd9", soi + 2)
if eoi < 0:
buf = buf[soi:]
break
jpeg_data = buf[soi:eoi + 2]
buf = buf[eoi + 2:]
try:
pts_time = ts_queue.get(timeout=2.0)
except Empty:
log.warning("No timestamp for scene frame, using 0")
pts_time = 0.0
if pts_time - last_pts < flush_window:
log.debug("Skipping flush frame at pts=%.3f", pts_time)
continue
last_pts = pts_time
global_ts = pts_time + offset
log.debug("Scene frame at pts=%.3f (global=%.3f)", pts_time, global_ts)
self.on_raw_frame(jpeg_data, global_ts)
ff.stop_proc(proc, timeout=3)
log.info("Scene detector: ffmpeg exited (last_pts=%.1f)", last_pts)
def start_audio_extractor(self, on_new_audio=None):
"""Periodically extract audio from the growing fMP4 as WAV chunks."""
self._on_new_audio = on_new_audio
self.audio_dir.mkdir(parents=True, exist_ok=True)
t = Thread(target=self._audio_loop, daemon=True, name="audio_extractor")
t.start()
self._threads["audio_extractor"] = t
def stop(self):
self._stop_flags.add("stop")
for name, proc in getattr(self, "_procs", {}).items():
ff.stop_proc(proc, timeout=3)
self._procs = {} if hasattr(self, "_procs") else {}
def _has_audio_stream(self, seg: Path) -> bool:
try:
import ffmpeg as ffmpeg_lib
info = ffmpeg_lib.probe(str(seg))
return any(s.get("codec_type") == "audio" for s in info.get("streams", []))
except Exception:
return False
def _find_audio_source(self):
"""Find audio source: fMP4 with audio track, or standalone audio.aac from Rust server."""
seg = self._get_recording_path() if self._get_recording_path else None
if seg and seg.exists() and self._has_audio_stream(seg):
return seg
# Rust server writes raw AAC alongside the fMP4
stream_dir = self.session_dir / "stream"
aac_path = stream_dir / "audio.aac"
if aac_path.exists() and aac_path.stat().st_size > 100:
return aac_path
return None
def _audio_loop(self):
processed_time = 0.0
chunk_num = 0
current_source = None
while "stop" not in self._stop_flags:
time.sleep(AUDIO_EXTRACT_INTERVAL)
source = self._find_audio_source()
if not source:
continue
if source != current_source:
current_source = source
processed_time = 0.0
chunk_num = 0
log.info("Audio extractor: using %s", source.name)
if source.stat().st_size < 100_000:
continue
safe_duration = self._probe_safe_duration(source)
if safe_duration is None or safe_duration <= 0:
continue
process_to = safe_duration - AUDIO_SAFETY_MARGIN
if process_to <= processed_time + 1.0:
continue
chunk_duration = process_to - processed_time
wav_path = self.audio_dir / f"chunk_{chunk_num:04d}.wav"
try:
ff.extract_audio_chunk(source, wav_path,
start_time=processed_time,
duration=chunk_duration)
except Exception as e:
log.error("Audio extraction failed: %s", e)
continue
if wav_path.exists() and wav_path.stat().st_size > 100:
offset = self._get_current_global_offset() if self._get_current_global_offset else 0.0
global_start = processed_time + offset
log.info("Audio chunk: %s (%.1fs → %.1fs, global %.1fs)",
wav_path.name, processed_time, process_to, global_start)
if self._on_new_audio:
self._on_new_audio(
wav_path, global_start, chunk_duration,
segment_path=source, local_start=processed_time,
)
chunk_num += 1
processed_time = process_to
log.info("Audio extractor stopped")
def _probe_safe_duration(self, seg: Path):
try:
import ffmpeg as ffmpeg_lib
info = ffmpeg_lib.probe(str(seg))
dur = float(info.get("format", {}).get("duration", 0))
if dur > 0:
return dur
for stream in info.get("streams", []):
sdur = float(stream.get("duration", 0))
if sdur > 0:
return sdur
except Exception:
pass
try:
return seg.stat().st_size / 65_000
except Exception:
return None