Files
mediaproc/core/detect/checkpoint/frames.py
2026-03-30 13:05:28 -03:00

282 lines
7.6 KiB
Python

"""
Frame cache — per-timeline frame storage in blob storage (S3/MinIO).
Frames are extracted from chunks once, cached as JPEGs at
cache/timelines/{timeline_id}/frames/{seq}.jpg in the app's
blob storage. Any job on the timeline reads from the cache.
Cache is clearable and rebuildable from chunks.
Uses the same storage backend as the rest of the app, so it
works across lambdas, GPU boxes, and local dev.
"""
from __future__ import annotations
import base64
import io
import logging
import os
import tempfile
import numpy as np
from PIL import Image
from core.detect.models import Frame
logger = logging.getLogger(__name__)
BUCKET = os.environ.get("S3_BUCKET", "mpr")
CACHE_PREFIX = "cache/timelines"
def _frame_key(timeline_id: str, seq: int) -> str:
return f"{CACHE_PREFIX}/{timeline_id}/frames/{seq}.jpg"
def _list_prefix(timeline_id: str) -> str:
return f"{CACHE_PREFIX}/{timeline_id}/frames/"
def cache_exists(timeline_id: str) -> bool:
"""Check if frame cache exists for a timeline."""
from core.storage.s3 import list_objects
objects = list_objects(BUCKET, _list_prefix(timeline_id))
return len(objects) > 0
def cache_frames(timeline_id: str, frames: list[Frame], quality: int = 85) -> int:
"""
Write frames to blob storage as JPEGs.
Returns number of frames cached.
"""
from core.storage.s3 import upload_file
for frame in frames:
key = _frame_key(timeline_id, frame.sequence)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
img = Image.fromarray(frame.image)
img.save(tmp, format="JPEG", quality=quality)
tmp_path = tmp.name
try:
upload_file(tmp_path, BUCKET, key)
finally:
os.unlink(tmp_path)
logger.info("Cached %d frames for timeline %s", len(frames), timeline_id)
return len(frames)
def load_cached_frames(timeline_id: str) -> list[Frame]:
"""
Load all cached frames as Frame objects with numpy arrays.
Returns empty list if cache doesn't exist.
"""
from core.storage.s3 import list_objects, download_to_temp
objects = list_objects(BUCKET, _list_prefix(timeline_id))
if not objects:
return []
frames = []
for obj in objects:
key = obj["key"]
filename = key.rsplit("/", 1)[-1]
if not filename.endswith(".jpg"):
continue
seq = int(filename.replace(".jpg", ""))
tmp_path = download_to_temp(BUCKET, key)
try:
img = Image.open(tmp_path).convert("RGB")
image_array = np.array(img)
finally:
os.unlink(tmp_path)
frame = Frame(
sequence=seq,
chunk_id=0,
timestamp=0.0,
image=image_array,
perceptual_hash="",
)
frames.append(frame)
frames.sort(key=lambda f: f.sequence)
return frames
def load_cached_frames_b64(timeline_id: str) -> list[dict]:
"""
Load cached frames as base64 JPEGs for the UI.
Returns list of {seq, timestamp, jpeg_b64}.
"""
from core.storage.s3 import list_objects, download_to_temp
objects = list_objects(BUCKET, _list_prefix(timeline_id))
if not objects:
return []
result = []
for obj in objects:
key = obj["key"]
filename = key.rsplit("/", 1)[-1]
if not filename.endswith(".jpg"):
continue
seq = int(filename.replace(".jpg", ""))
tmp_path = download_to_temp(BUCKET, key)
try:
with open(tmp_path, "rb") as f:
jpeg_b64 = base64.b64encode(f.read()).decode()
finally:
os.unlink(tmp_path)
result.append({
"seq": seq,
"timestamp": 0.0,
"jpeg_b64": jpeg_b64,
})
result.sort(key=lambda f: f["seq"])
return result
# ---------------------------------------------------------------------------
# Debug overlay storage — per job/stage/frame
# ---------------------------------------------------------------------------
def _overlay_prefix(timeline_id: str, job_id: str, stage: str) -> str:
return f"{CACHE_PREFIX}/{timeline_id}/overlays/{job_id}/{stage}/"
def _overlay_key(timeline_id: str, job_id: str, stage: str, seq: int, name: str) -> str:
return f"{CACHE_PREFIX}/{timeline_id}/overlays/{job_id}/{stage}/{seq}_{name}.png"
def save_overlays(
timeline_id: str,
job_id: str,
stage: str,
seq: int,
overlays: dict[str, str],
):
"""
Save debug overlay images (base64 PNG) to blob storage.
overlays: {overlay_key: base64_png_string}
e.g. {"edge_overlay_b64": "iVBOR...", "lines_overlay_b64": "iVBOR..."}
"""
from core.storage.s3 import upload_file
import tempfile
for name, b64_data in overlays.items():
key = _overlay_key(timeline_id, job_id, stage, seq, name)
raw = base64.b64decode(b64_data)
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
tmp.write(raw)
tmp_path = tmp.name
try:
upload_file(tmp_path, BUCKET, key)
finally:
os.unlink(tmp_path)
logger.info("Saved %d overlays for timeline %s job %s stage %s frame %d",
len(overlays), timeline_id, job_id, stage, seq)
def load_overlays(
timeline_id: str,
job_id: str,
stage: str,
seq: int,
) -> dict[str, str] | None:
"""
Load debug overlay images from blob storage as base64 strings.
Returns {overlay_key: base64_png_string} or None if no overlays cached.
"""
from core.storage.s3 import list_objects, download_to_temp
prefix = _overlay_prefix(timeline_id, job_id, stage)
seq_prefix = f"{seq}_"
objects = list_objects(BUCKET, prefix)
overlays = {}
for obj in objects:
filename = obj["key"].rsplit("/", 1)[-1]
if not filename.startswith(seq_prefix):
continue
name = filename[len(seq_prefix):].replace(".png", "")
tmp_path = download_to_temp(BUCKET, obj["key"])
try:
with open(tmp_path, "rb") as f:
overlays[name] = base64.b64encode(f.read()).decode()
finally:
os.unlink(tmp_path)
return overlays if overlays else None
def list_overlay_frames(
timeline_id: str,
job_id: str,
stage: str,
) -> list[int]:
"""List frame sequences that have cached overlays."""
from core.storage.s3 import list_objects
prefix = _overlay_prefix(timeline_id, job_id, stage)
objects = list_objects(BUCKET, prefix)
seqs = set()
for obj in objects:
filename = obj["key"].rsplit("/", 1)[-1]
seq_str = filename.split("_")[0]
try:
seqs.add(int(seq_str))
except ValueError:
continue
return sorted(seqs)
def clear_cache(timeline_id: str):
"""Delete the frame cache for a timeline."""
from core.storage.s3 import delete_objects
prefix = _list_prefix(timeline_id)
delete_objects(BUCKET, prefix)
logger.info("Cleared frame cache for timeline %s", timeline_id)
def frames_to_b64(frames: list[Frame], quality: int = 75) -> list[dict]:
"""
Convert in-memory Frame objects to base64 JPEG dicts.
For API responses when frames are already in memory.
"""
result = []
for frame in frames:
buf = io.BytesIO()
img = Image.fromarray(frame.image)
img.save(buf, format="JPEG", quality=quality)
jpeg_b64 = base64.b64encode(buf.getvalue()).decode()
result.append({
"seq": frame.sequence,
"timestamp": frame.timestamp,
"jpeg_b64": jpeg_b64,
})
result.sort(key=lambda f: f["seq"])
return result