diff --git a/admin/mpr/media_assets/models.py b/admin/mpr/media_assets/models.py index 7e3f991..4d1b206 100644 --- a/admin/mpr/media_assets/models.py +++ b/admin/mpr/media_assets/models.py @@ -127,23 +127,24 @@ class Job(models.Model): class Timeline(models.Model): - """The frame sequence from a source video.""" + """A user-created selection of source material.""" id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + name = models.CharField(max_length=255) source_asset_id = models.UUIDField(null=True, blank=True) - source_video = models.CharField(max_length=255) + chunk_paths = models.JSONField(default=list, blank=True) profile_name = models.CharField(max_length=255) + status = models.CharField(max_length=255) fps = models.FloatField(default=2.0) - frames_prefix = models.CharField(max_length=255) - frames_manifest = models.JSONField(default=dict, blank=True) - frames_meta = models.JSONField(default=list, blank=True) + frame_count = models.IntegerField(default=0) + source_ephemeral = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) class Meta: ordering = ["-created_at"] def __str__(self): - return str(self.id) + return self.name class Checkpoint(models.Model): @@ -153,7 +154,7 @@ class Checkpoint(models.Model): timeline_id = models.UUIDField() job_id = models.UUIDField(null=True, blank=True) parent_id = models.UUIDField(null=True, blank=True) - stage_outputs = models.JSONField(default=dict, blank=True) + stage_name = models.CharField(max_length=255) config_overrides = models.JSONField(default=dict, blank=True) stats = models.JSONField(default=dict, blank=True) is_scenario = models.BooleanField(default=False) @@ -167,6 +168,24 @@ class Checkpoint(models.Model): return str(self.id) +class StageOutput(models.Model): + """Output of a single stage within a job.""" + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + job_id = models.UUIDField() + timeline_id = models.UUIDField() + stage_name = models.CharField(max_length=255) + checkpoint_id = models.UUIDField(null=True, blank=True) + output = models.JSONField(default=dict, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + ordering = ["-created_at"] + + def __str__(self): + return str(self.id) + + class Brand(models.Model): """A brand discovered or registered in the system.""" diff --git a/core/api/detect/__init__.py b/core/api/detect/__init__.py index 1b551c1..0cdc39d 100644 --- a/core/api/detect/__init__.py +++ b/core/api/detect/__init__.py @@ -11,6 +11,7 @@ from .run import router as run_router from .sse import router as sse_router from .replay import router as replay_router from .config import router as config_router +from .timeline import router as timeline_router router = APIRouter() router.include_router(sources_router) @@ -18,3 +19,4 @@ router.include_router(run_router) router.include_router(sse_router) router.include_router(replay_router) router.include_router(config_router) +router.include_router(timeline_router) diff --git a/core/api/detect/replay.py b/core/api/detect/replay.py index 02091bc..41e75fb 100644 --- a/core/api/detect/replay.py +++ b/core/api/detect/replay.py @@ -137,12 +137,15 @@ class CheckpointData(BaseModel): @router.get("/checkpoints/{timeline_id}/{stage}", response_model=CheckpointData) def get_checkpoint_data(timeline_id: str, stage: str): - """Load checkpoint frames + metadata for the editor UI.""" + """Load checkpoint frames + metadata for the editor UI. + + Reads from the timeline's frame cache (local filesystem). + """ from uuid import UUID from core.db.models import Timeline, Checkpoint from core.db.connection import get_session from core.db.checkpoint import list_checkpoints - from core.detect.checkpoint.frames import load_frames_b64 + from core.detect.checkpoint.frames import load_cached_frames_b64 with get_session() as session: timeline = session.get(Timeline, UUID(timeline_id)) @@ -152,16 +155,14 @@ def get_checkpoint_data(timeline_id: str, stage: str): checkpoints = list_checkpoints(session, UUID(timeline_id)) if not checkpoints: raise HTTPException(status_code=404, detail=f"No checkpoints for timeline {timeline_id}") - # Prefer a checkpoint that has this stage's output; fall back to latest + # Prefer a checkpoint for this stage; fall back to latest checkpoint = next( - (c for c in reversed(checkpoints) if stage in (c.stage_outputs or {})), + (c for c in reversed(checkpoints) if c.stage_name == stage), checkpoints[-1], ) - raw_manifest = timeline.frames_manifest or {} - manifest = {int(k): v for k, v in raw_manifest.items()} - frames_b64 = load_frames_b64(manifest, timeline.frames_meta or []) - + # Read from timeline's frame cache + frames_b64 = load_cached_frames_b64(timeline_id) frame_list = [ CheckpointFrameInfo(seq=f["seq"], timestamp=f["timestamp"], jpeg_b64=f["jpeg_b64"]) for f in frames_b64 @@ -171,7 +172,7 @@ def get_checkpoint_data(timeline_id: str, stage: str): timeline_id=timeline_id, stage=stage, profile_name=timeline.profile_name, - video_path=timeline.source_video, + video_path=timeline.chunk_paths[0] if timeline.chunk_paths else "", is_scenario=checkpoint.is_scenario, scenario_label=checkpoint.scenario_label, frames=frame_list, @@ -195,14 +196,12 @@ def list_scenarios_endpoint(): timeline = session.get(Timeline, s.timeline_id) if not timeline: continue - last_stage = next(reversed(s.stage_outputs), "") if s.stage_outputs else "" info = ScenarioInfo( timeline_id=str(s.timeline_id), - stage=last_stage, + stage=s.stage_name, scenario_label=s.scenario_label, profile_name=timeline.profile_name, - video_path=timeline.source_video, - frame_count=len(timeline.frames_manifest or {}), + video_path=timeline.chunk_paths[0] if timeline.chunk_paths else "", created_at=str(s.created_at) if s.created_at else "", ) result.append(info) diff --git a/core/api/detect/run.py b/core/api/detect/run.py index 404520b..abe7596 100644 --- a/core/api/detect/run.py +++ b/core/api/detect/run.py @@ -1,7 +1,7 @@ """ Pipeline run endpoints. -POST /detect/run — launch pipeline on selected source +POST /detect/run — launch pipeline on a timeline POST /detect/stop/{job_id} — cancel a running pipeline POST /detect/pause/{job_id} — pause after current stage POST /detect/resume/{job_id} — resume a paused pipeline @@ -30,20 +30,20 @@ _cancelled_jobs: set[str] = set() class RunRequest(BaseModel): - video_path: str # storage key + timeline_id: str profile_name: str = "soccer_broadcast" - source_asset_id: str = "" checkpoint: bool = True skip_vlm: bool = False skip_cloud: bool = False log_level: str = "INFO" # INFO | DEBUG pause_after_stage: bool = False + config_overrides: dict | None = None class RunResponse(BaseModel): status: str job_id: str - video_path: str + timeline_id: str def _resolve_video_path(video_path: str) -> str: @@ -59,13 +59,41 @@ def _resolve_video_path(video_path: str) -> str: @router.post("/run", response_model=RunResponse) def run_pipeline(req: RunRequest): - """Launch a detection pipeline run on a source chunk.""" + """Launch a detection pipeline run on a timeline.""" from core.detect import emit from core.detect.graph import get_pipeline from core.detect.state import DetectState + from core.detect.checkpoint.storage import get_timeline + from core.db.connection import get_session + from core.db.job import create_job, update_job_status - local_path = _resolve_video_path(req.video_path) - job_id = str(uuid.uuid4()) + # Load timeline + try: + timeline = get_timeline(req.timeline_id) + except ValueError: + raise HTTPException(status_code=404, detail=f"Timeline not found: {req.timeline_id}") + + chunk_paths = timeline["chunk_paths"] + if not chunk_paths: + raise HTTPException(status_code=400, detail="Timeline has no chunk paths") + + # Resolve first chunk to local path for the pipeline + local_path = _resolve_video_path(chunk_paths[0]) + + # Create job in DB + source_asset_id_str = timeline.get("source_asset_id", "") + with get_session() as session: + from uuid import UUID as _UUID + source_asset_id = _UUID(source_asset_id_str) if source_asset_id_str else uuid.uuid4() + job = create_job( + session, + source_asset_id=source_asset_id, + video_path=chunk_paths[0], + timeline_id=_UUID(req.timeline_id), + profile_name=req.profile_name, + config_overrides=req.config_overrides, + ) + job_id = str(job.id) if req.skip_vlm: os.environ["SKIP_VLM"] = "1" @@ -77,7 +105,7 @@ def run_pipeline(req: RunRequest): elif "SKIP_CLOUD" in os.environ: del os.environ["SKIP_CLOUD"] - # Clear any stale events from a previous run with same job_id + # Clear any stale events from core.events import _get_redis from core.detect.events import DETECT_EVENTS_PREFIX r = _get_redis() @@ -94,7 +122,9 @@ def run_pipeline(req: RunRequest): video_path=local_path, job_id=job_id, profile_name=req.profile_name, - source_asset_id=req.source_asset_id, + source_asset_id=source_asset_id_str or str(source_asset_id), + timeline_id=req.timeline_id, + config_overrides=req.config_overrides or {}, ) from core.detect.graph import ( @@ -105,18 +135,29 @@ def run_pipeline(req: RunRequest): set_cancel_check(job_id, lambda: job_id in _cancelled_jobs) init_pause(job_id, pause_after_stage=req.pause_after_stage) + def _update_job(status, stage=None, error=None): + from core.db.connection import get_session + from core.db.job import update_job_status + with get_session() as session: + update_job_status(session, _UUID(job_id), status, + current_stage=stage, error_message=error) + def _run(): try: + _update_job("running") emit.log(job_id, "Pipeline", "INFO", - f"Starting pipeline: {req.video_path} (profile={req.profile_name})") + f"Starting pipeline: {chunk_paths[0]} (profile={req.profile_name})") pipeline.invoke(initial_state) + _update_job("completed") emit.log(job_id, "Pipeline", "INFO", "Pipeline completed successfully") emit.job_complete(job_id, {"status": "completed"}) except PipelineCancelled: + _update_job("cancelled") emit.log(job_id, "Pipeline", "INFO", "Pipeline cancelled") emit.job_complete(job_id, {"status": "cancelled"}) except Exception as e: logger.exception("Pipeline run %s failed: %s", job_id, e) + _update_job("failed", error=str(e)) from core.detect.graph import _node_states, NODES if job_id in _node_states: states = _node_states[job_id] @@ -134,12 +175,14 @@ def run_pipeline(req: RunRequest): clear_cancel_check(job_id) clear_pause(job_id) emit.clear_run_context() + from core.detect.checkpoint.runner_bridge import reset_checkpoint_state + reset_checkpoint_state(job_id) thread = threading.Thread(target=_run, daemon=True, name=f"pipeline-{job_id}") _running_jobs[job_id] = thread thread.start() - return RunResponse(status="started", job_id=job_id, video_path=req.video_path) + return RunResponse(status="started", job_id=job_id, timeline_id=req.timeline_id) @router.post("/stop/{job_id}") @@ -224,18 +267,6 @@ def pipeline_status(job_id: str): return {"status": status, "job_id": job_id} -@router.get("/timeline/{job_id}") -def get_timeline_for_job(job_id: str): - """Get the timeline_id for a running or completed job.""" - from core.detect.checkpoint.runner_bridge import get_timeline_id - - tid = get_timeline_id(job_id) - if tid is None: - raise HTTPException(status_code=404, detail=f"No timeline for job: {job_id}") - - return {"timeline_id": tid, "job_id": job_id} - - @router.post("/clear/{job_id}") def clear_pipeline(job_id: str): """Clear events for a job from Redis.""" diff --git a/core/api/detect/timeline.py b/core/api/detect/timeline.py new file mode 100644 index 0000000..7f38efe --- /dev/null +++ b/core/api/detect/timeline.py @@ -0,0 +1,226 @@ +""" +Timeline + Job management endpoints. + +POST /detect/timeline — create timeline from chunk selection +GET /detect/timeline — list timelines +GET /detect/timeline/{id} — timeline detail +DELETE /detect/timeline/{id}/cache — clear frame cache + +GET /detect/jobs — list jobs (optionally by timeline) +GET /detect/jobs/{id} — job detail + checkpoints + stage outputs +""" + +from __future__ import annotations + +import logging + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/detect", tags=["detect"]) + + +# --- Request/Response models --- + +class CreateTimelineRequest(BaseModel): + chunk_paths: list[str] + profile_name: str = "soccer_broadcast" + name: str = "" + source_asset_id: str = "" + fps: float = 2.0 + + +class TimelineResponse(BaseModel): + id: str + name: str + chunk_paths: list[str] + profile_name: str + status: str + fps: float + frame_count: int + source_ephemeral: bool + created_at: str | None = None + + +class JobResponse(BaseModel): + id: str + timeline_id: str | None + source_asset_id: str + video_path: str + profile_name: str + run_type: str + status: str + current_stage: str | None + config_overrides: dict + error_message: str | None + created_at: str | None + started_at: str | None + completed_at: str | None + + +class JobDetailResponse(JobResponse): + checkpoints: list[dict] + stage_outputs: dict[str, dict] + + +# --- Timeline endpoints --- + +@router.post("/timeline", response_model=TimelineResponse) +def create_timeline_endpoint(req: CreateTimelineRequest): + """Create a timeline from a chunk selection.""" + from uuid import UUID + from core.detect.checkpoint.storage import create_timeline + + source_asset_id = UUID(req.source_asset_id) if req.source_asset_id else None + tid = create_timeline( + chunk_paths=req.chunk_paths, + profile_name=req.profile_name, + name=req.name, + source_asset_id=source_asset_id, + fps=req.fps, + ) + + from core.detect.checkpoint.storage import get_timeline + tl = get_timeline(tid) + return TimelineResponse( + id=tl["id"], + name=tl["name"], + chunk_paths=tl["chunk_paths"], + profile_name=tl["profile_name"], + status=tl["status"], + fps=tl["fps"], + frame_count=0, + source_ephemeral=False, + created_at=tl["created_at"], + ) + + +@router.get("/timeline", response_model=list[TimelineResponse]) +def list_timelines(): + """List all timelines.""" + from sqlmodel import select + from core.db.models import Timeline + from core.db.connection import get_session + + with get_session() as session: + stmt = select(Timeline).order_by(Timeline.created_at.desc()) + timelines = session.exec(stmt).all() + + return [ + TimelineResponse( + id=str(t.id), + name=t.name, + chunk_paths=t.chunk_paths or [], + profile_name=t.profile_name, + status=t.status, + fps=t.fps, + frame_count=t.frame_count, + source_ephemeral=t.source_ephemeral, + created_at=str(t.created_at) if t.created_at else None, + ) + for t in timelines + ] + + +@router.get("/timeline/{timeline_id}", response_model=TimelineResponse) +def get_timeline_endpoint(timeline_id: str): + """Get timeline detail.""" + from core.detect.checkpoint.storage import get_timeline + try: + tl = get_timeline(timeline_id) + except ValueError: + raise HTTPException(status_code=404, detail=f"Timeline not found: {timeline_id}") + + from core.detect.checkpoint.frames import cache_exists + from uuid import UUID + from core.db.models import Timeline + from core.db.connection import get_session + + with get_session() as session: + timeline = session.get(Timeline, UUID(timeline_id)) + + return TimelineResponse( + id=tl["id"], + name=tl["name"], + chunk_paths=tl["chunk_paths"], + profile_name=tl["profile_name"], + status=tl["status"], + fps=tl["fps"], + frame_count=timeline.frame_count if timeline else 0, + source_ephemeral=timeline.source_ephemeral if timeline else False, + created_at=tl["created_at"], + ) + + +@router.delete("/timeline/{timeline_id}/cache") +def clear_timeline_cache(timeline_id: str): + """Clear the frame cache for a timeline.""" + from core.detect.checkpoint.frames import clear_cache + from core.detect.checkpoint.storage import update_timeline_status + + clear_cache(timeline_id) + update_timeline_status(timeline_id, "created") + return {"status": "cleared", "timeline_id": timeline_id} + + +# --- Job endpoints --- + +def _job_to_response(job) -> JobResponse: + return JobResponse( + id=str(job.id), + timeline_id=str(job.timeline_id) if job.timeline_id else None, + source_asset_id=str(job.source_asset_id), + video_path=job.video_path, + profile_name=job.profile_name, + run_type=job.run_type, + status=job.status, + current_stage=job.current_stage, + config_overrides=job.config_overrides or {}, + error_message=job.error_message, + created_at=str(job.created_at) if job.created_at else None, + started_at=str(job.started_at) if job.started_at else None, + completed_at=str(job.completed_at) if job.completed_at else None, + ) + + +@router.get("/jobs", response_model=list[JobResponse]) +def list_jobs_endpoint(timeline_id: str | None = Query(None)): + """List jobs, optionally filtered by timeline.""" + from uuid import UUID + from core.db.connection import get_session + from core.db.job import list_jobs + + tid = UUID(timeline_id) if timeline_id else None + with get_session() as session: + jobs = list_jobs(session, timeline_id=tid) + + return [_job_to_response(j) for j in jobs] + + +@router.get("/jobs/{job_id}", response_model=JobDetailResponse) +def get_job_endpoint(job_id: str): + """Get job detail with checkpoints and stage outputs.""" + from uuid import UUID + from core.db.connection import get_session + from core.db.job import get_job + from core.detect.checkpoint.storage import ( + get_checkpoints_for_job, + load_stage_outputs_for_job, + ) + + with get_session() as session: + job = get_job(session, UUID(job_id)) + if not job: + raise HTTPException(status_code=404, detail=f"Job not found: {job_id}") + + checkpoints = get_checkpoints_for_job(job_id) + stage_outputs = load_stage_outputs_for_job(job_id) + + base = _job_to_response(job) + return JobDetailResponse( + **base.model_dump(), + checkpoints=checkpoints, + stage_outputs=stage_outputs, + ) diff --git a/core/db/job.py b/core/db/job.py index 2f0bc6c..ca6ddcd 100644 --- a/core/db/job.py +++ b/core/db/job.py @@ -2,6 +2,7 @@ from __future__ import annotations +from datetime import datetime from typing import Optional from uuid import UUID @@ -10,10 +11,70 @@ from sqlmodel import Session, select from .models import Job -def list_jobs(session: Session, parent_id: Optional[UUID] = None, status: Optional[str] = None) -> list[Job]: +def create_job( + session: Session, + source_asset_id: UUID, + video_path: str, + timeline_id: UUID, + profile_name: str = "soccer_broadcast", + run_type: str = "initial", + parent_id: UUID | None = None, + config_overrides: dict | None = None, +) -> Job: + job = Job( + source_asset_id=source_asset_id, + video_path=video_path, + timeline_id=timeline_id, + profile_name=profile_name, + run_type=run_type, + parent_id=parent_id, + config_overrides=config_overrides or {}, + status="pending", + ) + session.add(job) + session.commit() + session.refresh(job) + return job + + +def update_job_status( + session: Session, + job_id: UUID, + status: str, + current_stage: str | None = None, + error_message: str | None = None, +): + job = session.get(Job, job_id) + if not job: + return + job.status = status + if current_stage is not None: + job.current_stage = current_stage + if error_message is not None: + job.error_message = error_message + if status == "running" and not job.started_at: + job.started_at = datetime.utcnow() + if status in ("completed", "failed", "cancelled"): + job.completed_at = datetime.utcnow() + session.commit() + + +def get_job(session: Session, job_id: UUID) -> Job | None: + return session.get(Job, job_id) + + +def list_jobs( + session: Session, + timeline_id: UUID | None = None, + parent_id: UUID | None = None, + status: str | None = None, +) -> list[Job]: stmt = select(Job) + if timeline_id: + stmt = stmt.where(Job.timeline_id == timeline_id) if parent_id: stmt = stmt.where(Job.parent_id == parent_id) if status: stmt = stmt.where(Job.status == status) + stmt = stmt.order_by(Job.created_at.desc()) return list(session.exec(stmt).all()) diff --git a/core/db/models.py b/core/db/models.py index dbcf8d1..5cda061 100644 --- a/core/db/models.py +++ b/core/db/models.py @@ -114,17 +114,18 @@ class Job(SQLModel, table=True): completed_at: Optional[datetime] = None class Timeline(SQLModel, table=True): - """The frame sequence from a source video.""" + """A user-created selection of source material.""" __tablename__ = "timeline" id: UUID = Field(default_factory=uuid4, primary_key=True) + name: str = "" source_asset_id: Optional[UUID] = Field(default=None, index=True) - source_video: str = "" + chunk_paths: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) profile_name: str = "" + status: str = "created" fps: float = 2.0 - frames_prefix: str = "" - frames_manifest: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) - frames_meta: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) + frame_count: int = 0 + source_ephemeral: bool = False created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) class Checkpoint(SQLModel, table=True): @@ -135,13 +136,25 @@ class Checkpoint(SQLModel, table=True): timeline_id: UUID job_id: Optional[UUID] = Field(default=None, index=True) parent_id: Optional[UUID] = None - stage_outputs: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) + stage_name: str = "" config_overrides: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) stats: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) is_scenario: bool = False scenario_label: str = "" created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) +class StageOutput(SQLModel, table=True): + """Output of a single stage within a job.""" + __tablename__ = "stage_output" + + id: UUID = Field(default_factory=uuid4, primary_key=True) + job_id: UUID = Field(index=True) + timeline_id: UUID + stage_name: str + checkpoint_id: Optional[UUID] = None + output: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) + created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) + class Brand(SQLModel, table=True): """A brand discovered or registered in the system.""" __tablename__ = "brand" diff --git a/core/detect/checkpoint/__init__.py b/core/detect/checkpoint/__init__.py index a2741d0..4131c2b 100644 --- a/core/detect/checkpoint/__init__.py +++ b/core/detect/checkpoint/__init__.py @@ -1,19 +1,31 @@ """ -Checkpoint system — Timeline + Checkpoint tree. +Checkpoint system — Timeline + Checkpoint tree + StageOutput. detect/checkpoint/ - frames.py — frame image S3 upload/download - storage.py — Timeline + Checkpoint (Postgres + MinIO) - replay.py — replay (TODO: migrate to new model) + frames.py — per-timeline frame cache (local filesystem) + storage.py — Timeline, Checkpoint, StageOutput persistence + replay.py — replay from checkpoint (TODO: rework in 5d) runner_bridge.py — checkpoint hook for PipelineRunner """ from .storage import ( create_timeline, - get_timeline_frames, - get_timeline_frames_b64, + get_timeline, + update_timeline_status, + save_checkpoint, + get_checkpoints_for_job, + get_checkpoints_for_timeline, save_stage_output, load_stage_output, + load_stage_outputs_for_job, + load_stage_outputs_for_timeline, ) -from .frames import save_frames, load_frames -from .runner_bridge import checkpoint_after_stage, reset_checkpoint_state, get_timeline_id +from .frames import ( + cache_exists, + cache_frames, + load_cached_frames, + load_cached_frames_b64, + clear_cache, + frames_to_b64, +) +from .runner_bridge import checkpoint_after_stage, reset_checkpoint_state, get_latest_checkpoint diff --git a/core/detect/checkpoint/frames.py b/core/detect/checkpoint/frames.py index c1d81ca..16d0aea 100644 --- a/core/detect/checkpoint/frames.py +++ b/core/detect/checkpoint/frames.py @@ -1,7 +1,19 @@ -"""Frame image storage — save/load to S3/MinIO as JPEGs.""" +""" +Frame cache — per-timeline frame storage in blob storage (S3/MinIO). + +Frames are extracted from chunks once, cached as JPEGs at +cache/timelines/{timeline_id}/frames/{seq}.jpg in the app's +blob storage. Any job on the timeline reads from the cache. +Cache is clearable and rebuildable from chunks. + +Uses the same storage backend as the rest of the app, so it +works across lambdas, GPU boxes, and local dev. +""" from __future__ import annotations +import base64 +import io import logging import os import tempfile @@ -14,25 +26,39 @@ from core.detect.models import Frame logger = logging.getLogger(__name__) BUCKET = os.environ.get("S3_BUCKET", "mpr") -CHECKPOINT_PREFIX = "checkpoints" +CACHE_PREFIX = "cache/timelines" -def save_frames(job_id: str, frames: list[Frame]) -> dict[int, str]: +def _frame_key(timeline_id: str, seq: int) -> str: + return f"{CACHE_PREFIX}/{timeline_id}/frames/{seq}.jpg" + + +def _list_prefix(timeline_id: str) -> str: + return f"{CACHE_PREFIX}/{timeline_id}/frames/" + + +def cache_exists(timeline_id: str) -> bool: + """Check if frame cache exists for a timeline.""" + from core.storage.s3 import list_objects + + objects = list_objects(BUCKET, _list_prefix(timeline_id)) + return len(objects) > 0 + + +def cache_frames(timeline_id: str, frames: list[Frame], quality: int = 85) -> int: """ - Save frame images to S3 as JPEGs. + Write frames to blob storage as JPEGs. - Returns manifest: {sequence: s3_key} + Returns number of frames cached. """ from core.storage.s3 import upload_file - manifest = {} - for frame in frames: - key = f"{CHECKPOINT_PREFIX}/{job_id}/frames/{frame.sequence}.jpg" + key = _frame_key(timeline_id, frame.sequence) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: img = Image.fromarray(frame.image) - img.save(tmp, format="JPEG", quality=85) + img.save(tmp, format="JPEG", quality=quality) tmp_path = tmp.name try: @@ -40,25 +66,30 @@ def save_frames(job_id: str, frames: list[Frame]) -> dict[int, str]: finally: os.unlink(tmp_path) - manifest[frame.sequence] = key - - logger.info("Saved %d frames to s3://%s/%s/%s/frames/", - len(frames), BUCKET, CHECKPOINT_PREFIX, job_id) - return manifest + logger.info("Cached %d frames for timeline %s", len(frames), timeline_id) + return len(frames) -def load_frames(manifest: dict[int, str], frame_metadata: list[dict]) -> list[Frame]: +def load_cached_frames(timeline_id: str) -> list[Frame]: """ - Load frame images from S3 and reconstitute Frame objects. + Load all cached frames as Frame objects with numpy arrays. - frame_metadata: list of dicts with sequence, chunk_id, timestamp, perceptual_hash. + Returns empty list if cache doesn't exist. """ - from core.storage.s3 import download_to_temp + from core.storage.s3 import list_objects, download_to_temp + + objects = list_objects(BUCKET, _list_prefix(timeline_id)) + if not objects: + return [] - meta_map = {m["sequence"]: m for m in frame_metadata} frames = [] + for obj in objects: + key = obj["key"] + filename = key.rsplit("/", 1)[-1] + if not filename.endswith(".jpg"): + continue + seq = int(filename.replace(".jpg", "")) - for seq, key in manifest.items(): tmp_path = download_to_temp(BUCKET, key) try: img = Image.open(tmp_path).convert("RGB") @@ -66,13 +97,12 @@ def load_frames(manifest: dict[int, str], frame_metadata: list[dict]) -> list[Fr finally: os.unlink(tmp_path) - meta = meta_map.get(seq, {}) frame = Frame( sequence=seq, - chunk_id=meta.get("chunk_id", 0), - timestamp=meta.get("timestamp", 0.0), + chunk_id=0, + timestamp=0.0, image=image_array, - perceptual_hash=meta.get("perceptual_hash", ""), + perceptual_hash="", ) frames.append(frame) @@ -80,32 +110,70 @@ def load_frames(manifest: dict[int, str], frame_metadata: list[dict]) -> list[Fr return frames -def load_frames_b64(manifest: dict[int, str], frame_metadata: list[dict]) -> list[dict]: +def load_cached_frames_b64(timeline_id: str) -> list[dict]: """ - Load frame images from S3 as base64 JPEG — lightweight, no numpy. + Load cached frames as base64 JPEGs for the UI. - Returns list of dicts: {seq, timestamp, jpeg_b64} + Returns list of {seq, timestamp, jpeg_b64}. """ - import base64 - from core.storage.s3 import download_to_temp + from core.storage.s3 import list_objects, download_to_temp - meta_map = {m["sequence"]: m for m in frame_metadata} - frames = [] + objects = list_objects(BUCKET, _list_prefix(timeline_id)) + if not objects: + return [] + + result = [] + for obj in objects: + key = obj["key"] + filename = key.rsplit("/", 1)[-1] + if not filename.endswith(".jpg"): + continue + seq = int(filename.replace(".jpg", "")) - for seq, key in manifest.items(): tmp_path = download_to_temp(BUCKET, key) try: with open(tmp_path, "rb") as f: - jpeg_bytes = f.read() + jpeg_b64 = base64.b64encode(f.read()).decode() finally: os.unlink(tmp_path) - meta = meta_map.get(seq, {}) - frames.append({ + result.append({ "seq": seq, - "timestamp": meta.get("timestamp", 0.0), - "jpeg_b64": base64.b64encode(jpeg_bytes).decode(), + "timestamp": 0.0, + "jpeg_b64": jpeg_b64, }) - frames.sort(key=lambda f: f["seq"]) - return frames + result.sort(key=lambda f: f["seq"]) + return result + + +def clear_cache(timeline_id: str): + """Delete the frame cache for a timeline.""" + from core.storage.s3 import delete_objects + + prefix = _list_prefix(timeline_id) + delete_objects(BUCKET, prefix) + logger.info("Cleared frame cache for timeline %s", timeline_id) + + +def frames_to_b64(frames: list[Frame], quality: int = 75) -> list[dict]: + """ + Convert in-memory Frame objects to base64 JPEG dicts. + + For API responses when frames are already in memory. + """ + result = [] + for frame in frames: + buf = io.BytesIO() + img = Image.fromarray(frame.image) + img.save(buf, format="JPEG", quality=quality) + jpeg_b64 = base64.b64encode(buf.getvalue()).decode() + + result.append({ + "seq": frame.sequence, + "timestamp": frame.timestamp, + "jpeg_b64": jpeg_b64, + }) + + result.sort(key=lambda f: f["seq"]) + return result diff --git a/core/detect/checkpoint/runner_bridge.py b/core/detect/checkpoint/runner_bridge.py index 831b921..fa69ff6 100644 --- a/core/detect/checkpoint/runner_bridge.py +++ b/core/detect/checkpoint/runner_bridge.py @@ -1,13 +1,9 @@ """ Runner bridge — checkpoint hook called by PipelineRunner after each stage. -Owns the per-job state (timeline, frame manifest, checkpoint chain) that -the runner shouldn't know about. - -Timeline and Job are independent entities: -- One Timeline can serve multiple Jobs (re-run with different params) -- One Job operates on one Timeline (set after frame extraction) -- Checkpoints belong to Timeline, tagged with the Job that created them +Saves a checkpoint + stage output after each stage completes. +Timeline and Job are independent: timeline_id and job_id come from +the pipeline state (set at job creation time). """ from __future__ import annotations @@ -16,63 +12,37 @@ import logging logger = logging.getLogger(__name__) -# Per-job state -_timeline_id: dict[str, str] = {} -_frames_manifest: dict[str, dict[int, str]] = {} +# Per-job state: tracks the latest checkpoint so we can chain parent → child _latest_checkpoint: dict[str, str] = {} def reset_checkpoint_state(job_id: str): """Clean up per-job checkpoint state. Called when pipeline finishes.""" - _timeline_id.pop(job_id, None) - _frames_manifest.pop(job_id, None) _latest_checkpoint.pop(job_id, None) def checkpoint_after_stage(job_id: str, stage_name: str, state: dict, result: dict): """ - Save a checkpoint after a stage completes. + Save a checkpoint + stage output after a stage completes. Called by the runner. Handles: - - Timeline creation (once, on extract_frames) - - Frame upload (via create_timeline) - Stage output serialization (via stage registry) - Checkpoint chain (parent → child) + - Stage output as separate row in StageOutput table """ if not job_id: return - from .storage import create_timeline, save_stage_output + timeline_id = state.get("timeline_id", "") + if not timeline_id: + logger.warning("No timeline_id in state for job %s, skipping checkpoint", job_id) + return + + from .storage import save_checkpoint, save_stage_output from core.detect.stages.base import _REGISTRY merged = {**state, **result} - # On extract_frames: create Timeline + upload frames + root checkpoint - if stage_name == "extract_frames" and job_id not in _timeline_id: - frames = merged.get("frames", []) - video_path = merged.get("video_path", "") - profile_name = merged.get("profile_name", "") - - tid, cid = create_timeline( - source_video=video_path, - profile_name=profile_name, - frames=frames, - ) - _timeline_id[job_id] = tid - _latest_checkpoint[job_id] = cid - logger.info("Job %s → Timeline %s (root checkpoint %s)", job_id, tid, cid) - - # Emit timeline_id via SSE so the UI can use it for checkpoint loads - from core.detect import emit - emit.log(job_id, "Checkpoint", "INFO", f"timeline_id={tid}") - return - - # For subsequent stages: save checkpoint on the timeline - tid = _timeline_id.get(job_id) - if not tid: - logger.warning("No timeline for job %s, skipping checkpoint", job_id) - return - # Serialize stage output using the stage's serialize_fn if available stage_cls = _REGISTRY.get(stage_name) serialize_fn = getattr(getattr(stage_cls, "definition", None), "serialize_fn", None) @@ -81,17 +51,41 @@ def checkpoint_after_stage(job_id: str, stage_name: str, state: dict, result: di else: output_json = {} + # Convert stats dataclass to dict for JSONB storage + import dataclasses + raw_stats = state.get("stats", {}) + if dataclasses.is_dataclass(raw_stats): + stats_dict = dataclasses.asdict(raw_stats) + elif isinstance(raw_stats, dict): + stats_dict = raw_stats + else: + stats_dict = {} + + # Save checkpoint (lightweight tree node) parent_id = _latest_checkpoint.get(job_id) - new_checkpoint_id = save_stage_output( - timeline_id=tid, - parent_checkpoint_id=parent_id, + checkpoint_id = save_checkpoint( + timeline_id=timeline_id, stage_name=stage_name, - output_json=output_json, + parent_checkpoint_id=parent_id, + config_overrides=state.get("config_overrides"), + stats=stats_dict, job_id=job_id, ) - _latest_checkpoint[job_id] = new_checkpoint_id + _latest_checkpoint[job_id] = checkpoint_id + + # Save stage output (separate row, upsert by job+stage) + if output_json: + save_stage_output( + job_id=job_id, + timeline_id=timeline_id, + stage_name=stage_name, + output=output_json, + checkpoint_id=checkpoint_id, + ) + + logger.info("Checkpoint %s + output for stage %s (job %s)", checkpoint_id, stage_name, job_id) -def get_timeline_id(job_id: str) -> str | None: - """Get the timeline_id for a running job. Used by the UI to load checkpoints.""" - return _timeline_id.get(job_id) +def get_latest_checkpoint(job_id: str) -> str | None: + """Get the latest checkpoint_id for a running job.""" + return _latest_checkpoint.get(job_id) diff --git a/core/detect/checkpoint/serializer.py b/core/detect/checkpoint/serializer.py index 0894a3d..7e2f1be 100644 --- a/core/detect/checkpoint/serializer.py +++ b/core/detect/checkpoint/serializer.py @@ -6,6 +6,9 @@ This file has no model-specific knowledge — stages own their data format. The only things serialized here are the "envelope" fields (job_id, video_path, etc.) that don't belong to any stage. + +Frames are ephemeral (in-memory during a run). Serialization stores +metadata only; frames are re-extracted from chunks when needed. """ from __future__ import annotations @@ -18,10 +21,10 @@ from core.schema.serializers.pipeline import ( # Envelope fields — not owned by any stage, always present -ENVELOPE_KEYS = ["job_id", "video_path", "profile_name", "config_overrides"] +ENVELOPE_KEYS = ["job_id", "video_path", "profile_name", "timeline_id", "config_overrides"] -def serialize_state(state: dict, frames_manifest: dict[int, str]) -> dict: +def serialize_state(state: dict) -> dict: """ Serialize DetectState to a JSON-compatible dict. @@ -37,9 +40,6 @@ def serialize_state(state: dict, frames_manifest: dict[int, str]) -> dict: default = {} if key == "config_overrides" else "" checkpoint[key] = state.get(key, default) - # Frames manifest (needed by frame-loading stages) - checkpoint["frames_manifest"] = {str(k): v for k, v in frames_manifest.items()} - # Stats (shared across stages, not owned by one) stats = state.get("stats") if stats is not None: @@ -60,8 +60,9 @@ def serialize_state(state: dict, frames_manifest: dict[int, str]) -> dict: def deserialize_state(checkpoint: dict, frames: list) -> dict: """ - Reconstitute DetectState from a checkpoint dict + loaded frames. + Reconstitute DetectState from a checkpoint dict + frames. + Frames are provided by the caller (re-extracted from chunks). Calls each stage's deserialize_fn to restore stage-owned data. """ from core.detect.stages.base import _REGISTRY @@ -75,7 +76,7 @@ def deserialize_state(checkpoint: dict, frames: list) -> dict: default = {} if key == "config_overrides" else "" state[key] = checkpoint.get(key, default) - # Frames (always present, loaded externally) + # Frames (provided externally, ephemeral) state["frames"] = frames # Stats diff --git a/core/detect/checkpoint/storage.py b/core/detect/checkpoint/storage.py index 1bdc3c9..272da70 100644 --- a/core/detect/checkpoint/storage.py +++ b/core/detect/checkpoint/storage.py @@ -1,9 +1,9 @@ """ -Checkpoint storage — Timeline + Checkpoint (tree of snapshots). +Checkpoint storage — Timeline, Checkpoint, StageOutput persistence. -Timeline: frame sequence from source video (frames in MinIO) -Checkpoint: snapshot of pipeline state (stage outputs as JSONB in Postgres) - parent_id forms a tree — multiple children = different config tries +Timeline: user-created source selection (chunk paths) +Checkpoint: lightweight tree node (parent_id, stage_name, config, stats) +StageOutput: per-stage result (flat table, one row per job+stage) """ from __future__ import annotations @@ -11,8 +11,6 @@ from __future__ import annotations import logging from uuid import UUID -from .frames import save_frames, load_frames, CHECKPOINT_PREFIX - logger = logging.getLogger(__name__) @@ -21,66 +19,41 @@ logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- def create_timeline( - source_video: str, - profile_name: str, - frames: list, - fps: float = 2.0, + chunk_paths: list[str], + profile_name: str = "", + name: str = "", source_asset_id: UUID | None = None, -) -> tuple[str, str]: + fps: float = 2.0, +) -> str: """ - Create a timeline from frames. Uploads frame images to MinIO, - creates Timeline + root Checkpoint in Postgres. + Create a timeline from a chunk selection. - Returns (timeline_id, checkpoint_id). + Called by the user (via API) before any pipeline runs. + Returns timeline_id. """ - from core.db.models import Timeline, Checkpoint + from core.db.models import Timeline from core.db.connection import get_session with get_session() as session: timeline = Timeline( - source_video=source_video, + name=name, + chunk_paths=chunk_paths, profile_name=profile_name, source_asset_id=source_asset_id, fps=fps, + status="created", ) session.add(timeline) - session.flush() + session.commit() + session.refresh(timeline) tid = str(timeline.id) - # Upload frames to MinIO - manifest = save_frames(tid, frames) - - frames_meta = [ - { - "sequence": f.sequence, - "chunk_id": getattr(f, "chunk_id", 0), - "timestamp": f.timestamp, - "perceptual_hash": getattr(f, "perceptual_hash", ""), - } - for f in frames - ] - - timeline.frames_prefix = f"{CHECKPOINT_PREFIX}/{tid}/frames/" - timeline.frames_manifest = {str(k): v for k, v in manifest.items()} - timeline.frames_meta = frames_meta - - checkpoint = Checkpoint( - timeline_id=timeline.id, - parent_id=None, - stage_outputs={}, - stats={"frames_extracted": len(frames)}, - ) - session.add(checkpoint) - session.commit() - session.refresh(checkpoint) - cid = str(checkpoint.id) - - logger.info("Timeline created: %s (%d frames, root checkpoint %s)", tid, len(frames), cid) - return tid, cid + logger.info("Timeline created: %s (%d chunks)", tid, len(chunk_paths)) + return tid -def get_timeline_frames(timeline_id: str) -> list: - """Load frames from a timeline (from MinIO) as Frame objects.""" +def get_timeline(timeline_id: str) -> dict: + """Load a timeline as a dict.""" from core.db.models import Timeline from core.db.connection import get_session @@ -89,36 +62,40 @@ def get_timeline_frames(timeline_id: str) -> list: if not timeline: raise ValueError(f"Timeline not found: {timeline_id}") - raw_manifest = timeline.frames_manifest or {} - manifest = {int(k): v for k, v in raw_manifest.items()} - return load_frames(manifest, timeline.frames_meta or []) + return { + "id": str(timeline.id), + "name": timeline.name, + "chunk_paths": timeline.chunk_paths, + "profile_name": timeline.profile_name, + "status": timeline.status, + "fps": timeline.fps, + "source_asset_id": str(timeline.source_asset_id) if timeline.source_asset_id else None, + "created_at": str(timeline.created_at) if timeline.created_at else None, + } -def get_timeline_frames_b64(timeline_id: str) -> list[dict]: - """Load frames as base64 JPEG (lightweight, no numpy).""" +def update_timeline_status(timeline_id: str, status: str, frame_count: int | None = None): + """Update timeline status and optionally frame count.""" from core.db.models import Timeline from core.db.connection import get_session - from .frames import load_frames_b64 with get_session() as session: timeline = session.get(Timeline, UUID(timeline_id)) - if not timeline: - raise ValueError(f"Timeline not found: {timeline_id}") - - raw_manifest = timeline.frames_manifest or {} - manifest = {int(k): v for k, v in raw_manifest.items()} - return load_frames_b64(manifest, timeline.frames_meta or []) + if timeline: + timeline.status = status + if frame_count is not None: + timeline.frame_count = frame_count + session.commit() # --------------------------------------------------------------------------- # Checkpoint # --------------------------------------------------------------------------- -def save_stage_output( +def save_checkpoint( timeline_id: str, - parent_checkpoint_id: str | None, stage_name: str, - output_json: dict, + parent_checkpoint_id: str | None = None, config_overrides: dict | None = None, stats: dict | None = None, is_scenario: bool = False, @@ -126,32 +103,22 @@ def save_stage_output( job_id: str | None = None, ) -> str: """ - Save a stage's output as a new checkpoint (child of parent). + Save a checkpoint (lightweight tree node). - Carries forward stage outputs from parent + adds the new one. + No stage outputs — those go in StageOutput table separately. Returns the new checkpoint ID. """ from core.db.models import Checkpoint from core.db.connection import get_session with get_session() as session: - parent_outputs = {} - parent_stats = {} - parent_config = {} - if parent_checkpoint_id: - parent = session.get(Checkpoint, UUID(parent_checkpoint_id)) - if parent: - parent_outputs = dict(parent.stage_outputs or {}) - parent_stats = dict(parent.stats or {}) - parent_config = dict(parent.config_overrides or {}) - checkpoint = Checkpoint( timeline_id=UUID(timeline_id), job_id=UUID(job_id) if job_id else None, parent_id=UUID(parent_checkpoint_id) if parent_checkpoint_id else None, - stage_outputs={**parent_outputs, stage_name: output_json}, - config_overrides={**parent_config, **(config_overrides or {})}, - stats={**parent_stats, **(stats or {})}, + stage_name=stage_name, + config_overrides=config_overrides or {}, + stats=stats or {}, is_scenario=is_scenario, scenario_label=scenario_label, ) @@ -165,13 +132,172 @@ def save_stage_output( return cid -def load_stage_output(checkpoint_id: str, stage_name: str) -> dict | None: - """Load a stage's output from a checkpoint.""" +def get_checkpoints_for_job(job_id: str) -> list[dict]: + """List checkpoints for a job, ordered by creation time.""" + from sqlmodel import select from core.db.models import Checkpoint from core.db.connection import get_session with get_session() as session: - checkpoint = session.get(Checkpoint, UUID(checkpoint_id)) - if not checkpoint: + stmt = ( + select(Checkpoint) + .where(Checkpoint.job_id == UUID(job_id)) + .order_by(Checkpoint.created_at) + ) + checkpoints = session.exec(stmt).all() + + return [ + { + "id": str(c.id), + "timeline_id": str(c.timeline_id), + "job_id": str(c.job_id) if c.job_id else None, + "parent_id": str(c.parent_id) if c.parent_id else None, + "stage_name": c.stage_name, + "config_overrides": c.config_overrides or {}, + "stats": c.stats or {}, + "is_scenario": c.is_scenario, + "scenario_label": c.scenario_label, + "created_at": str(c.created_at) if c.created_at else None, + } + for c in checkpoints + ] + + +def get_checkpoints_for_timeline(timeline_id: str) -> list[dict]: + """List all checkpoints on a timeline, ordered by creation time.""" + from sqlmodel import select + from core.db.models import Checkpoint + from core.db.connection import get_session + + with get_session() as session: + stmt = ( + select(Checkpoint) + .where(Checkpoint.timeline_id == UUID(timeline_id)) + .order_by(Checkpoint.created_at) + ) + checkpoints = session.exec(stmt).all() + + return [ + { + "id": str(c.id), + "timeline_id": str(c.timeline_id), + "job_id": str(c.job_id) if c.job_id else None, + "parent_id": str(c.parent_id) if c.parent_id else None, + "stage_name": c.stage_name, + "config_overrides": c.config_overrides or {}, + "stats": c.stats or {}, + "is_scenario": c.is_scenario, + "scenario_label": c.scenario_label, + "created_at": str(c.created_at) if c.created_at else None, + } + for c in checkpoints + ] + + +# --------------------------------------------------------------------------- +# StageOutput +# --------------------------------------------------------------------------- + +def save_stage_output( + job_id: str, + timeline_id: str, + stage_name: str, + output: dict, + checkpoint_id: str | None = None, +) -> str: + """ + Save (upsert) a stage output. One row per (job_id, stage_name). + + Returns the stage_output ID. + """ + from sqlmodel import select + from core.db.models import StageOutput + from core.db.connection import get_session + + with get_session() as session: + # Upsert: check if exists + stmt = ( + select(StageOutput) + .where(StageOutput.job_id == UUID(job_id)) + .where(StageOutput.stage_name == stage_name) + ) + existing = session.exec(stmt).first() + + if existing: + existing.output = output + existing.checkpoint_id = UUID(checkpoint_id) if checkpoint_id else None + session.commit() + session.refresh(existing) + return str(existing.id) + + stage_output = StageOutput( + job_id=UUID(job_id), + timeline_id=UUID(timeline_id), + stage_name=stage_name, + checkpoint_id=UUID(checkpoint_id) if checkpoint_id else None, + output=output, + ) + session.add(stage_output) + session.commit() + session.refresh(stage_output) + return str(stage_output.id) + + +def load_stage_output(job_id: str, stage_name: str) -> dict | None: + """Load a stage's output by job + stage name.""" + from sqlmodel import select + from core.db.models import StageOutput + from core.db.connection import get_session + + with get_session() as session: + stmt = ( + select(StageOutput) + .where(StageOutput.job_id == UUID(job_id)) + .where(StageOutput.stage_name == stage_name) + ) + row = session.exec(stmt).first() + + if not row: return None - return (checkpoint.stage_outputs or {}).get(stage_name) + return row.output + + +def load_stage_outputs_for_job(job_id: str) -> dict[str, dict]: + """Load all stage outputs for a job. Returns {stage_name: output}.""" + from sqlmodel import select + from core.db.models import StageOutput + from core.db.connection import get_session + + with get_session() as session: + stmt = ( + select(StageOutput) + .where(StageOutput.job_id == UUID(job_id)) + ) + rows = session.exec(stmt).all() + + return {row.stage_name: row.output for row in rows} + + +def load_stage_outputs_for_timeline(timeline_id: str, stage_name: str | None = None) -> list[dict]: + """Load stage outputs for a timeline, optionally filtered by stage.""" + from sqlmodel import select + from core.db.models import StageOutput + from core.db.connection import get_session + + with get_session() as session: + stmt = select(StageOutput).where(StageOutput.timeline_id == UUID(timeline_id)) + if stage_name: + stmt = stmt.where(StageOutput.stage_name == stage_name) + rows = session.exec(stmt).all() + + return [ + { + "id": str(r.id), + "job_id": str(r.job_id), + "stage_name": r.stage_name, + "checkpoint_id": str(r.checkpoint_id) if r.checkpoint_id else None, + "output": r.output, + "created_at": str(r.created_at) if r.created_at else None, + } + for r in rows + ] diff --git a/core/detect/graph/nodes.py b/core/detect/graph/nodes.py index 76737ce..25f906d 100644 --- a/core/detect/graph/nodes.py +++ b/core/detect/graph/nodes.py @@ -98,6 +98,15 @@ def node_extract_frames(state: DetectState) -> dict: frames = extract_frames(state["video_path"], config, job_id=job_id) span.set_output({"frames_extracted": len(frames)}) + # Cache frames on the timeline for reuse across jobs and UI + timeline_id = state.get("timeline_id") + if timeline_id: + from core.detect.checkpoint.frames import cache_frames, cache_exists + if not cache_exists(timeline_id): + cache_frames(timeline_id, frames) + from core.detect.checkpoint.storage import update_timeline_status + update_timeline_status(timeline_id, "cached", frame_count=len(frames)) + _emit(state, "extract_frames", "done") return {"frames": frames, "stats": PipelineStats(frames_extracted=len(frames))} diff --git a/core/detect/stages/registry/_serializers.py b/core/detect/stages/registry/_serializers.py index c4bf038..c5da4a9 100644 --- a/core/detect/stages/registry/_serializers.py +++ b/core/detect/stages/registry/_serializers.py @@ -12,8 +12,7 @@ from core.schema.serializers._common import ( ) from core.schema.serializers.pipeline import ( serialize_frame_meta, - serialize_frames_with_upload as serialize_frames, - deserialize_frames_with_download as deserialize_frames, + serialize_frames_meta, serialize_text_candidate, serialize_text_candidates, deserialize_text_candidate, diff --git a/core/detect/stages/registry/preprocessing.py b/core/detect/stages/registry/preprocessing.py index deb353a..9cd9be0 100644 --- a/core/detect/stages/registry/preprocessing.py +++ b/core/detect/stages/registry/preprocessing.py @@ -2,18 +2,19 @@ from core.detect.stages.models import StageDefinition, StageIO, StageConfigField from core.detect.stages.base import register_stage -from ._serializers import serialize_frames, deserialize_frames +from ._serializers import serialize_frame_meta def _ser_extract(state: dict, job_id: str) -> dict: frames = state.get("frames", []) - meta, manifest = serialize_frames(frames, job_id) - return {"frames_meta": meta, "frames_manifest": manifest} + meta = [serialize_frame_meta(f) for f in frames] + return {"frames_meta": meta, "frame_count": len(frames)} def _deser_extract(data: dict, job_id: str) -> dict: - frames = deserialize_frames(data["frames_meta"], data["frames_manifest"], job_id) - return {"frames": frames} + # Frames are ephemeral — re-extract from chunks on demand. + # Store metadata so we know what was extracted. + return {"_frames_meta": data.get("frames_meta", [])} def _ser_filter(state: dict, job_id: str) -> dict: diff --git a/core/detect/state.py b/core/detect/state.py index 344223c..6e234ec 100644 --- a/core/detect/state.py +++ b/core/detect/state.py @@ -16,6 +16,7 @@ class DetectState(TypedDict, total=False): # Input video_path: str job_id: str + timeline_id: str profile_name: str source_asset_id: str # UUID of the source MediaAsset diff --git a/core/gpu/models/models.py b/core/gpu/models/models.py index f2a90f5..ca820df 100644 --- a/core/gpu/models/models.py +++ b/core/gpu/models/models.py @@ -115,13 +115,13 @@ class SegmentFieldRequest(BaseModel): class SegmentFieldResponse(BaseModel): """Response from field segmentation.""" - boundary: List[List[int]] = Field(default_factory=list) + boundary: List[str] = Field(default_factory=list) coverage: float = 0.0 mask_b64: str = "" class SegmentFieldDebugResponse(BaseModel): """Response from field segmentation with debug overlay.""" - boundary: List[List[int]] = Field(default_factory=list) + boundary: List[str] = Field(default_factory=list) coverage: float = 0.0 mask_overlay_b64: str = "" diff --git a/core/schema/models/__init__.py b/core/schema/models/__init__.py index 1006e3d..232c22f 100644 --- a/core/schema/models/__init__.py +++ b/core/schema/models/__init__.py @@ -28,6 +28,7 @@ from .grpc import ( from .job import Job, JobStatus, RunType from .timeline import Timeline from .checkpoint import Checkpoint +from .stage_output import StageOutput from .brand import BrandSource, Brand from .media import AssetStatus, MediaAsset from .profile import Profile @@ -41,7 +42,7 @@ from .source import ChunkInfo, SourceJob, SourceType # Core domain models - generates SQLModel, TypeScript DATACLASSES = [MediaAsset, TranscodePreset, - Job, Timeline, Checkpoint, Brand, Profile] + Job, Timeline, Checkpoint, StageOutput, Brand, Profile] # API request/response models API_MODELS = [ diff --git a/core/schema/models/checkpoint.py b/core/schema/models/checkpoint.py index 038aee7..e793576 100644 --- a/core/schema/models/checkpoint.py +++ b/core/schema/models/checkpoint.py @@ -11,25 +11,24 @@ class Checkpoint: """ A snapshot of pipeline state on a timeline. - Stage outputs stored as JSONB — each stage serializes to JSON, - the checkpoint stores it without knowing the shape. - parent_id forms a tree: multiple children from the same parent = different config tries from the same starting point. + + Stage outputs are stored separately in StageOutput table, + not carried in the checkpoint itself. """ id: UUID timeline_id: UUID - job_id: Optional[UUID] = None # which job created this checkpoint - parent_id: Optional[UUID] = None # null = root checkpoint + job_id: Optional[UUID] = None + parent_id: Optional[UUID] = None - # Stage outputs — JSONB per stage, opaque to the checkpoint layer - stage_outputs: Dict[str, Any] = field(default_factory=dict) + stage_name: str = "" # which stage produced this checkpoint # Config that produced this checkpoint config_overrides: Dict[str, Any] = field(default_factory=dict) - # Pipeline state + # Pipeline stats at this point stats: Dict[str, Any] = field(default_factory=dict) # Scenario bookmark diff --git a/core/schema/models/job.py b/core/schema/models/job.py index 5cf3ba4..d68c3fd 100644 --- a/core/schema/models/job.py +++ b/core/schema/models/job.py @@ -38,7 +38,7 @@ class Job: video_path: str profile_name: str = "soccer_broadcast" - # Timeline — set after frame extraction, or upfront for replay jobs + # Timeline — set at job creation (timeline exists before any job) timeline_id: Optional[UUID] = None # Lineage diff --git a/core/schema/models/stage_output.py b/core/schema/models/stage_output.py new file mode 100644 index 0000000..ac3dcba --- /dev/null +++ b/core/schema/models/stage_output.py @@ -0,0 +1,27 @@ +"""StageOutput schema — per-stage result storage.""" + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, Optional +from uuid import UUID + + +@dataclass +class StageOutput: + """ + Output of a single stage within a job. + + Flat table with composite unique (job_id, stage_name). + Upserted on each stage completion. Independently queryable — + "give me all edge detection outputs for this timeline." + """ + + id: UUID + job_id: UUID + timeline_id: UUID + stage_name: str + checkpoint_id: Optional[UUID] = None + + output: Dict[str, Any] = field(default_factory=dict) + + created_at: Optional[datetime] = None diff --git a/core/schema/models/timeline.py b/core/schema/models/timeline.py index dcca8b3..25befd0 100644 --- a/core/schema/models/timeline.py +++ b/core/schema/models/timeline.py @@ -1,4 +1,4 @@ -"""Timeline schema — source of truth for frame sequences.""" +"""Timeline schema — source of truth for source material sequences.""" from dataclasses import dataclass, field from datetime import datetime @@ -9,21 +9,27 @@ from uuid import UUID @dataclass class Timeline: """ - The frame sequence from a source video. + A user-created selection of source material. - Independent of stages — exists before any stage runs. - Frames stored in MinIO as JPEGs, metadata here. - One timeline per job. + Exists before any job runs. Holds source references (chunk paths, + asset IDs) and extraction config. + + Frame cache: extracted frames live at media/timelines/{id}/frames/ + as JPEGs. Any job on this timeline reads from the cache. Cache is + rebuildable from chunks (clear + re-extract). For ephemeral sources + (streams), the cache is the only record. + + Many jobs can work on the same timeline. """ id: UUID + name: str = "" source_asset_id: Optional[UUID] = None - source_video: str = "" + chunk_paths: List[str] = field(default_factory=list) profile_name: str = "" + status: str = "created" # created | cached | ready fps: float = 2.0 - - frames_prefix: str = "" # s3: timeline/{id}/frames/ - frames_manifest: Dict[int, str] = field(default_factory=dict) # seq → s3 key - frames_meta: List[Dict[str, Any]] = field(default_factory=list) + frame_count: int = 0 + source_ephemeral: bool = False # True for streams — cache can't be rebuilt created_at: Optional[datetime] = None diff --git a/core/schema/serializers/pipeline.py b/core/schema/serializers/pipeline.py index 92e78a7..27234b8 100644 --- a/core/schema/serializers/pipeline.py +++ b/core/schema/serializers/pipeline.py @@ -2,7 +2,7 @@ Serializers for detection pipeline runtime models. Special handling: - - Frame.image (np.ndarray → S3, excluded from JSON) + - Frame.image (np.ndarray, ephemeral — only metadata serialized) - TextCandidate.frame (object ref → frame_sequence integer) Everything else uses dataclasses.asdict() via safe_construct. """ @@ -24,7 +24,7 @@ from ._common import safe_construct, serialize_dataclass, serialize_dataclass_li # --------------------------------------------------------------------------- -# Frame — image goes to S3 separately +# Frame — metadata only (image is ephemeral, re-extracted from chunks) # --------------------------------------------------------------------------- def serialize_frame_meta(frame: Frame) -> dict: @@ -34,21 +34,9 @@ def serialize_frame_meta(frame: Frame) -> dict: return result -def serialize_frames_with_upload(frames: list[Frame], job_id: str) -> tuple[list[dict], dict[int, str]]: - """Upload frame images to S3, return metadata + manifest.""" - from core.detect.checkpoint.frames import save_frames - - manifest = save_frames(job_id, frames) - meta = [serialize_frame_meta(f) for f in frames] - return meta, manifest - - -def deserialize_frames_with_download(meta: list[dict], manifest: dict, job_id: str) -> list[Frame]: - """Load frames from S3 + metadata.""" - from core.detect.checkpoint.frames import load_frames - - int_manifest = {int(k): v for k, v in manifest.items()} - return load_frames(int_manifest, meta) +def serialize_frames_meta(frames: list[Frame]) -> list[dict]: + """Serialize frame metadata for all frames.""" + return [serialize_frame_meta(f) for f in frames] # --------------------------------------------------------------------------- diff --git a/core/storage/s3.py b/core/storage/s3.py index b735533..388918f 100644 --- a/core/storage/s3.py +++ b/core/storage/s3.py @@ -80,6 +80,18 @@ def upload_file(local_path: str, bucket: str, key: str) -> None: s3.upload_file(local_path, bucket, key) +def delete_objects(bucket: str, prefix: str) -> int: + """Delete all objects under a prefix. Returns count of deleted objects.""" + s3 = get_s3_client() + objects = list_objects(bucket, prefix) + if not objects: + return 0 + + delete_keys = [{"Key": obj["key"]} for obj in objects] + s3.delete_objects(Bucket=bucket, Delete={"Objects": delete_keys}) + return len(delete_keys) + + def get_presigned_url(bucket: str, key: str, expires: int = 3600) -> str: """Generate a presigned URL for an S3 object.""" s3 = get_s3_client() diff --git a/ui/common/types/generated.ts b/ui/common/types/generated.ts index e6ff86f..89cd48c 100644 --- a/ui/common/types/generated.ts +++ b/ui/common/types/generated.ts @@ -77,13 +77,14 @@ export interface Job { export interface Timeline { id: string; + name: string; source_asset_id: string | null; - source_video: string; + chunk_paths: string[]; profile_name: string; + status: string; fps: number; - frames_prefix: string; - frames_manifest: Record; - frames_meta: string[]; + frame_count: number; + source_ephemeral: boolean; created_at: string | null; } @@ -92,7 +93,7 @@ export interface Checkpoint { timeline_id: string; job_id: string | null; parent_id: string | null; - stage_outputs: Record; + stage_name: string; config_overrides: Record; stats: Record; is_scenario: boolean; @@ -100,6 +101,16 @@ export interface Checkpoint { created_at: string | null; } +export interface StageOutput { + id: string; + job_id: string; + timeline_id: string; + stage_name: string; + checkpoint_id: string | null; + output: Record; + created_at: string | null; +} + export interface Brand { id: string; canonical_name: string; diff --git a/ui/detection-app/src/composables/useCheckpointLoader.ts b/ui/detection-app/src/composables/useCheckpointLoader.ts index 126c90c..aad131d 100644 --- a/ui/detection-app/src/composables/useCheckpointLoader.ts +++ b/ui/detection-app/src/composables/useCheckpointLoader.ts @@ -29,35 +29,15 @@ export function useCheckpointLoader( stripSelEndOverride.value ?? Math.max(0, checkpointFrames.value.length - 1), ) - // Cache job_id → timeline_id mappings - const timelineCache = new Map() - // Track current frame from SSE source.on<{ frame_ref: number; jpeg_b64: string }>('frame_update', (e) => { currentFrameImage.value = e.jpeg_b64 currentFrameRef.value = e.frame_ref }) - async function resolveTimelineId(job: string): Promise { - if (timelineCache.has(job)) return timelineCache.get(job)! - - try { - const resp = await fetch(`/api/detect/timeline/${job}`) - if (!resp.ok) return null - const data = await resp.json() - const tid = data.timeline_id - if (tid) timelineCache.set(job, tid) - return tid - } catch { - return null - } - } - async function loadCheckpoint(job: string, stage: string) { try { - // Resolve timeline_id from job_id - const timelineId = await resolveTimelineId(job) - const lookupId = timelineId ?? job + const lookupId = pipeline.timelineId || job const resp = await fetch(`/api/detect/checkpoints/${lookupId}/${stage}`) if (!resp.ok) return diff --git a/ui/detection-app/src/composables/useStageRegistry.ts b/ui/detection-app/src/composables/useStageRegistry.ts index 3f98e88..0e5f53a 100644 --- a/ui/detection-app/src/composables/useStageRegistry.ts +++ b/ui/detection-app/src/composables/useStageRegistry.ts @@ -56,10 +56,13 @@ export function useStageRegistry() { } /** - * Stages that have config fields (and thus can open a parameter editor). + * Stages that have a visual stage editor (canvas + overlays + sliders). + * Add stage names here when a visual editor is implemented for them. */ + const STAGE_EDITORS = new Set(['detect_edges', 'field_segmentation']) + const editableStages = computed(() => - stages.value.filter(s => s.config_fields.length > 0).map(s => s.name) + stages.value.filter(s => STAGE_EDITORS.has(s.name)).map(s => s.name) ) function getStage(name: string): StageInfo | undefined { diff --git a/ui/detection-app/src/panels/SourceSelector.vue b/ui/detection-app/src/panels/SourceSelector.vue index 1146eea..d4e0971 100644 --- a/ui/detection-app/src/panels/SourceSelector.vue +++ b/ui/detection-app/src/panels/SourceSelector.vue @@ -2,6 +2,7 @@ import { ref, onMounted } from 'vue' import { Panel } from 'mpr-ui-framework' import { usePipelineStore } from '../stores/pipeline' +import type { Timeline, Job } from '@common/types/generated' const pipeline = usePipelineStore() @@ -32,6 +33,9 @@ interface ProfileInfo { const sources = ref([]) const chunks = ref([]) const profiles = ref([]) +const timelines = ref([]) +const timelineJobs = ref([]) +const selectedTimeline = ref(null) const selectedSource = ref(null) const selectedChunks = ref>(new Set()) const selectedProfile = ref('soccer_broadcast') @@ -48,9 +52,10 @@ async function loadSources() { loading.value = true error.value = null try { - const [srcResp, profResp] = await Promise.all([ + const [srcResp, profResp, tlResp] = await Promise.all([ fetch('/api/detect/sources'), fetch('/api/detect/config/profiles'), + fetch('/api/detect/timeline'), ]) if (!srcResp.ok) throw new Error(`${srcResp.status} ${srcResp.statusText}`) sources.value = await srcResp.json() @@ -61,6 +66,10 @@ async function loadSources() { selectedProfile.value = profiles.value[0].name } } + + if (tlResp.ok) { + timelines.value = await tlResp.json() + } } catch (e: any) { error.value = `Failed to load sources: ${e.message}` } finally { @@ -81,6 +90,32 @@ async function loadChunks(jobId: string) { } } +async function selectTimeline(tl: Timeline) { + selectedTimeline.value = tl.id + timelineJobs.value = [] + try { + const resp = await fetch(`/api/detect/jobs?timeline_id=${tl.id}`) + if (!resp.ok) throw new Error(`${resp.status}`) + timelineJobs.value = await resp.json() + } catch (e: any) { + error.value = `Failed to load jobs: ${e.message}` + } +} + +function loadJob(job: Job) { + // Navigate to the job — full page load so all panels initialize with the job context + const url = new URL(window.location.href) + url.searchParams.set('job', job.id) + url.hash = '' + window.location.href = url.toString() +} + +function formatDate(dateStr: string | null): string { + if (!dateStr) return '' + const d = new Date(dateStr) + return d.toLocaleString(undefined, { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }) +} + function toggleChunk(chunk: ChunkInfo) { const s = new Set(selectedChunks.value) if (s.has(chunk.key)) { @@ -125,15 +160,34 @@ async function runPipeline() { running.value = true error.value = null - // Run first selected chunk (multi-run queuing is future work) - const videoPath = [...selectedChunks.value][0] + const chunkPaths = [...selectedChunks.value] try { - const resp = await fetch('/api/detect/run', { + // 1. Create timeline from chunk selection + const tlResp = await fetch('/api/detect/timeline', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ - video_path: videoPath, + chunk_paths: chunkPaths, + profile_name: selectedProfile.value, + source_asset_id: selectedSource.value || '', + name: chunkPaths.length === 1 + ? chunkPaths[0].split('/').pop() ?? '' + : `${chunkPaths.length} chunks`, + }), + }) + if (!tlResp.ok) { + const detail = await tlResp.text() + throw new Error(`Timeline creation failed: ${tlResp.status}: ${detail}`) + } + const timeline = await tlResp.json() + + // 2. Run pipeline on the timeline + const runResp = await fetch('/api/detect/run', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + timeline_id: timeline.id, profile_name: selectedProfile.value, checkpoint: checkpoint.value, skip_vlm: skipVlm.value, @@ -142,12 +196,13 @@ async function runPipeline() { pause_after_stage: pauseAfterStage.value, }), }) - if (!resp.ok) { - const detail = await resp.text() - throw new Error(`${resp.status}: ${detail}`) + if (!runResp.ok) { + const detail = await runResp.text() + throw new Error(`Pipeline start failed: ${runResp.status}: ${detail}`) } - const data = await resp.json() + const data = await runResp.json() + pipeline.setTimelineId(timeline.id) emit('job-started', data.job_id, { pauseAfterStage: pauseAfterStage.value }) } catch (e: any) { error.value = `Failed to start pipeline: ${e.message}` @@ -255,6 +310,42 @@ onMounted(loadSources) + +
+

Recent Timelines

+
+
+ {{ tl.name || tl.id.slice(0, 12) }} + + {{ tl.status.toUpperCase() }} + {{ tl.frame_count }} frames + {{ formatDate(tl.created_at) }} + +
+
+ + +
+
+ {{ job.id.slice(0, 8) }} + {{ job.status }} + {{ job.current_stage.replace(/_/g, ' ') }} + {{ formatDate(job.created_at) }} +
+
+
No jobs yet
+
+
@@ -465,4 +556,56 @@ onMounted(loadSources) background: var(--status-error); color: #000; } + +/* Job list */ +.job-list { + background: var(--surface-2); + border-radius: var(--panel-radius); + padding: var(--space-1); + margin-top: var(--space-1); +} + +.job-item { + display: flex; + align-items: center; + gap: var(--space-2); + padding: var(--space-1) var(--space-2); + border-radius: 3px; + cursor: pointer; + font-size: var(--font-size-sm); + color: var(--text-secondary); +} + +.job-item:hover { + background: var(--surface-3); +} + +.job-id { + font-family: var(--font-mono); + color: var(--text-dim); +} + +.job-status { + font-size: 10px; + font-weight: 700; + padding: 1px 4px; + border-radius: 2px; +} + +.job-status.completed { color: var(--status-live); background: rgba(0, 255, 128, 0.1); } +.job-status.failed { color: var(--status-error); background: rgba(224, 82, 82, 0.1); } +.job-status.running { color: var(--status-processing); background: rgba(255, 213, 79, 0.1); } +.job-status.cancelled { color: var(--text-dim); background: var(--surface-3); } +.job-status.pending { color: var(--text-dim); } + +.job-stage { + color: var(--text-dim); + font-size: 10px; +} + +.job-date { + color: var(--text-dim); + font-size: 10px; + margin-left: auto; +} diff --git a/ui/detection-app/src/stores/pipeline.ts b/ui/detection-app/src/stores/pipeline.ts index d60672a..87d0b0b 100644 --- a/ui/detection-app/src/stores/pipeline.ts +++ b/ui/detection-app/src/stores/pipeline.ts @@ -12,6 +12,7 @@ import type { CheckpointInfo } from '../types/sse-contract' export const usePipelineStore = defineStore('pipeline', () => { const jobId = ref('') + const timelineId = ref('') const status = ref('idle') const nodes = ref([]) const currentStage = ref(null) @@ -35,6 +36,10 @@ export const usePipelineStore = defineStore('pipeline', () => { jobId.value = id } + function setTimelineId(id: string) { + timelineId.value = id + } + function setStatus(s: string) { status.value = s } @@ -92,13 +97,14 @@ export const usePipelineStore = defineStore('pipeline', () => { parentJobId.value = null runType.value = 'initial' error.value = null + timelineId.value = '' } return { - jobId, status, nodes, currentStage, runId, parentJobId, runType, + jobId, timelineId, status, nodes, currentStage, runId, parentJobId, runType, checkpoints, error, layoutMode, editorStage, sourceHasSelection, isRunning, isPaused, canReplay, isEditing, - setJob, setStatus, updateNodes, setRunContext, setCheckpoints, setError, + setJob, setTimelineId, setStatus, updateNodes, setRunContext, setCheckpoints, setError, openSourceSelector, openBBoxEditor, openStageEditor, closeEditor, reset, } }) diff --git a/ui/detection-app/tsconfig.json b/ui/detection-app/tsconfig.json index cee8317..84b95f3 100644 --- a/ui/detection-app/tsconfig.json +++ b/ui/detection-app/tsconfig.json @@ -11,7 +11,8 @@ "skipLibCheck": true, "baseUrl": ".", "paths": { - "@/*": ["src/*"] + "@/*": ["src/*"], + "@common/*": ["../common/*"] } }, "include": ["src/**/*.ts", "src/**/*.vue"] diff --git a/ui/detection-app/vite.config.ts b/ui/detection-app/vite.config.ts index 566e67d..37be6aa 100644 --- a/ui/detection-app/vite.config.ts +++ b/ui/detection-app/vite.config.ts @@ -8,6 +8,7 @@ export default defineConfig({ resolve: { alias: { '@': resolve(__dirname, 'src'), + '@common': resolve(__dirname, '../common'), }, }, server: { diff --git a/ui/framework/src/renderers/GraphRenderer.vue b/ui/framework/src/renderers/GraphRenderer.vue index dce6ff3..53017cb 100644 --- a/ui/framework/src/renderers/GraphRenderer.vue +++ b/ui/framework/src/renderers/GraphRenderer.vue @@ -116,8 +116,7 @@ const flowNodes = computed(() => status: n.status, ...appearance, hasCheckpoint: n.hasCheckpoint ?? false, - hasRegionEditor: regionStageSet.value.has(n.id), - hasEditors: (n.availableEditors?.length ?? 0) > 0, + hasStageEditor: regionStageSet.value.has(n.id), isRunning: n.status === 'running', isActive: n.id === props.activeStage, }, @@ -190,9 +189,9 @@ function onNodeClick(id: string) {