diff --git a/admin/mpr/media_assets/models.py b/admin/mpr/media_assets/models.py index a9a1f51..5ef9005 100644 --- a/admin/mpr/media_assets/models.py +++ b/admin/mpr/media_assets/models.py @@ -47,6 +47,12 @@ class BrandSource(models.TextChoices): CLOUD = "cloud_llm", "Cloud" MANUAL = "manual", "Manual" +class SourceType(models.TextChoices): + CHUNK_JOB = "chunk_job", "Chunk Job" + UPLOAD = "upload", "Upload" + DEVICE = "device", "Device" + STREAM = "stream", "Stream" + class MediaAsset(models.Model): """A video/audio file registered in the system.""" @@ -268,3 +274,32 @@ class SourceBrandSighting(models.Model): def __str__(self): return str(self.id) + +class SourceJob(models.Model): + """A group of chunks that belong together (same source video/session).""" + + job_id = models.CharField(max_length=255) + source_type = models.CharField(max_length=255) + chunk_count = models.IntegerField() + total_bytes = models.IntegerField(default=0) + + class Meta: + pass + + def __str__(self): + return str(self.id) + + +class ChunkInfo(models.Model): + """A single chunk (video segment) stored in blob storage.""" + + filename = models.CharField(max_length=500) + key = models.CharField(max_length=255) + size_bytes = models.IntegerField() + + class Meta: + pass + + def __str__(self): + return self.filename + diff --git a/core/api/detect_sources.py b/core/api/detect_sources.py new file mode 100644 index 0000000..0ee97e5 --- /dev/null +++ b/core/api/detect_sources.py @@ -0,0 +1,259 @@ +""" +Source browser for detection pipeline. + +Lists available media sources from blob storage (MinIO). +All file-based sources go through MinIO — no host filesystem access. +The pipeline downloads chunks to a temp path before processing. + +Source types (current and future): + - chunk_job: pre-chunked segments in MinIO (current) + - upload: user-uploaded file, lands in MinIO via upload endpoint (future) + - device: local camera/capture card via ffmpeg, no MinIO (future) + - stream: RTMP/HLS URL via ffmpeg, no MinIO (future) + +GET /detect/sources — list chunk jobs from blob store +GET /detect/sources/{job_id}/chunks — list chunks for a specific job +POST /detect/run — launch pipeline on selected source +""" + +from __future__ import annotations + +import logging +import os +import threading +import uuid + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/detect", tags=["detect"]) + +# In-process pipeline tracking +_running_jobs: dict[str, "threading.Thread"] = {} +_cancelled_jobs: set[str] = set() + + +class ChunkInfo(BaseModel): + filename: str + key: str + size_bytes: int + + +class SourceInfo(BaseModel): + job_id: str + source_type: str = "chunk_job" + chunk_count: int + total_bytes: int = 0 + + +class RunRequest(BaseModel): + video_path: str # storage key + profile_name: str = "soccer_broadcast" + source_asset_id: str = "" + checkpoint: bool = True + skip_vlm: bool = False + skip_cloud: bool = False + log_level: str = "INFO" # INFO | DEBUG + + +class RunResponse(BaseModel): + status: str + job_id: str + video_path: str + + +# --------------------------------------------------------------------------- +# Source listing +# --------------------------------------------------------------------------- + +def _list_sources() -> list[SourceInfo]: + """List chunk jobs from blob storage.""" + from core.storage.blob import get_store + + store = get_store("out") + try: + objects = store.list(prefix="chunks/") + except Exception as e: + logger.warning("Failed to list blob sources: %s", e) + return [] + + jobs: dict[str, int] = {} + job_bytes: dict[str, int] = {} + for obj in objects: + # Keys include store prefix: out/chunks/{job_id}/file.mp4 + # Strip prefix to get: chunks/{job_id}/file.mp4 + rel_key = obj.key.removeprefix(store.prefix) + parts = rel_key.split("/") + if len(parts) >= 3 and parts[0] == "chunks": + job_id = parts[1] + jobs[job_id] = jobs.get(job_id, 0) + 1 + job_bytes[job_id] = job_bytes.get(job_id, 0) + obj.size_bytes + + sources = [] + for job_id, count in sorted(jobs.items()): + source = SourceInfo( + job_id=job_id, + source_type="chunk_job", + chunk_count=count, + total_bytes=job_bytes.get(job_id, 0), + ) + sources.append(source) + return sources + + +@router.get("/sources", response_model=list[SourceInfo]) +def list_sources(): + """List available chunk jobs from blob storage.""" + return _list_sources() + + +@router.get("/sources/{source_job_id}/chunks", response_model=list[ChunkInfo]) +def list_chunks(source_job_id: str): + """List chunks for a specific source job.""" + from core.storage.blob import get_store + + store = get_store("out") + try: + objects = store.list(prefix=f"chunks/{source_job_id}/", extensions={".mp4"}) + except Exception as e: + logger.warning("Failed to list chunks for %s: %s", source_job_id, e) + raise HTTPException(status_code=503, detail=f"Blob storage unavailable: {e}") + + if not objects: + raise HTTPException(status_code=404, detail=f"Source not found: {source_job_id}") + + chunks = [] + for obj in objects: + info = ChunkInfo(filename=obj.filename, key=obj.key, size_bytes=obj.size_bytes) + chunks.append(info) + return sorted(chunks, key=lambda c: c.filename) + + +@router.get("/sources/{source_job_id}/chunks/{filename}/url") +def get_chunk_url(source_job_id: str, filename: str): + """Return a presigned URL for previewing a chunk in the browser.""" + from core.storage.blob import get_store + + store = get_store("out") + key = f"chunks/{source_job_id}/{filename}" + try: + url = store.get_url(key, expires=3600) + except Exception as e: + raise HTTPException(status_code=503, detail=f"Could not generate URL: {e}") + return {"url": url} + + +# --------------------------------------------------------------------------- +# Run pipeline +# --------------------------------------------------------------------------- + +def _resolve_video_path(video_path: str) -> str: + """Download a chunk from blob storage to a temp file.""" + from core.storage.blob import get_store + + store = get_store("out") + try: + return store.download_to_temp(video_path) + except Exception as e: + raise HTTPException(status_code=400, detail=f"Failed to download chunk: {e}") + + +@router.post("/run", response_model=RunResponse) +def run_pipeline(req: RunRequest): + """Launch a detection pipeline run on a source chunk.""" + from detect import emit + from detect.graph import get_pipeline + from detect.state import DetectState + + local_path = _resolve_video_path(req.video_path) + job_id = str(uuid.uuid4())[:8] + + if req.skip_vlm: + os.environ["SKIP_VLM"] = "1" + elif "SKIP_VLM" in os.environ: + del os.environ["SKIP_VLM"] + + if req.skip_cloud: + os.environ["SKIP_CLOUD"] = "1" + elif "SKIP_CLOUD" in os.environ: + del os.environ["SKIP_CLOUD"] + + # Clear any stale events from a previous run with same job_id + from core.events import _get_redis + from detect.events import DETECT_EVENTS_PREFIX + r = _get_redis() + r.delete(f"{DETECT_EVENTS_PREFIX}:{job_id}") + + emit.set_run_context( + run_id=job_id, parent_job_id=job_id, run_type="initial", + log_level=req.log_level, + ) + + pipeline = get_pipeline(checkpoint=req.checkpoint) + + initial_state = DetectState( + video_path=local_path, + job_id=job_id, + profile_name=req.profile_name, + source_asset_id=req.source_asset_id, + ) + + import traceback + + from detect.graph import PipelineCancelled, set_cancel_check, clear_cancel_check + + set_cancel_check(job_id, lambda: job_id in _cancelled_jobs) + + def _run(): + try: + emit.log(job_id, "Pipeline", "INFO", + f"Starting pipeline: {req.video_path} (profile={req.profile_name})") + pipeline.invoke(initial_state) + emit.log(job_id, "Pipeline", "INFO", "Pipeline completed successfully") + emit.job_complete(job_id, {"status": "completed"}) + except PipelineCancelled: + emit.log(job_id, "Pipeline", "INFO", "Pipeline cancelled") + emit.job_complete(job_id, {"status": "cancelled"}) + except Exception as e: + logger.exception("Pipeline run %s failed: %s", job_id, e) + tb = traceback.format_exc() + emit.log(job_id, "Pipeline", "ERROR", str(e)) + emit.log(job_id, "Pipeline", "DEBUG", tb) + emit.job_complete(job_id, {"status": "failed", "error": str(e)}) + finally: + _running_jobs.pop(job_id, None) + _cancelled_jobs.discard(job_id) + clear_cancel_check(job_id) + emit.clear_run_context() + + thread = threading.Thread(target=_run, daemon=True, name=f"pipeline-{job_id}") + _running_jobs[job_id] = thread + thread.start() + + return RunResponse(status="started", job_id=job_id, video_path=req.video_path) + + +@router.post("/stop/{job_id}") +def stop_pipeline(job_id: str): + """Stop a running pipeline. Signals cancellation; the thread checks on next stage.""" + from detect import emit + + if job_id not in _running_jobs: + raise HTTPException(status_code=404, detail=f"No running pipeline: {job_id}") + + _cancelled_jobs.add(job_id) + emit.log(job_id, "Pipeline", "INFO", "Stop requested — cancelling after current stage") + return {"status": "stopping", "job_id": job_id} + + +@router.post("/clear/{job_id}") +def clear_pipeline(job_id: str): + """Clear events for a job from Redis.""" + from core.events import _get_redis + from detect.events import DETECT_EVENTS_PREFIX + + r = _get_redis() + r.delete(f"{DETECT_EVENTS_PREFIX}:{job_id}") + return {"status": "cleared", "job_id": job_id} diff --git a/core/api/main.py b/core/api/main.py index 64c84f8..4c9d7f8 100644 --- a/core/api/main.py +++ b/core/api/main.py @@ -27,6 +27,7 @@ from core.api.chunker_sse import router as chunker_router from core.api.detect_sse import router as detect_router from core.api.detect_replay import router as detect_replay_router from core.api.detect_config import router as detect_config_router +from core.api.detect_sources import router as detect_sources_router from core.api.graphql import schema as graphql_schema CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "") @@ -64,6 +65,9 @@ app.include_router(detect_replay_router) # Detection config app.include_router(detect_config_router) +# Detection sources + run launcher +app.include_router(detect_sources_router) + @app.get("/health") def health(): diff --git a/core/jobs/lambda_handler.py b/core/jobs/lambda_handler.py index e072192..553e03c 100644 --- a/core/jobs/lambda_handler.py +++ b/core/jobs/lambda_handler.py @@ -20,8 +20,8 @@ logger = logging.getLogger() logger.setLevel(logging.INFO) # S3 config -S3_BUCKET_IN = os.environ.get("S3_BUCKET_IN", "mpr-media-in") -S3_BUCKET_OUT = os.environ.get("S3_BUCKET_OUT", "mpr-media-out") +S3_BUCKET_IN = os.environ.get("S3_BUCKET_IN", "in") +S3_BUCKET_OUT = os.environ.get("S3_BUCKET_OUT", "out") AWS_REGION = os.environ.get("AWS_REGION", "us-east-1") s3 = boto3.client("s3", region_name=AWS_REGION) diff --git a/core/schema/models/__init__.py b/core/schema/models/__init__.py index 9f1d3dc..eae0e4f 100644 --- a/core/schema/models/__init__.py +++ b/core/schema/models/__init__.py @@ -35,10 +35,12 @@ from .presets import BUILTIN_PRESETS, TranscodePreset from .detect import DETECT_VIEWS # noqa: F401 — discovered by modelgen generic loader from .ui_state import UI_STATE_VIEWS # noqa: F401 — UI store state types from .views import ChunkEvent, ChunkOutputFile, PipelineStats, WorkerEvent +from .sources import ChunkInfo, SourceJob, SourceType # Core domain models - generates Django, Pydantic, TypeScript DATACLASSES = [MediaAsset, TranscodePreset, TranscodeJob, ChunkJob, - DetectJob, StageCheckpoint, KnownBrand, SourceBrandSighting] + DetectJob, StageCheckpoint, KnownBrand, SourceBrandSighting, + SourceJob, ChunkInfo] # API request/response models - generates TypeScript only (no Django) # WorkerStatus from grpc.py is reused here @@ -52,7 +54,7 @@ API_MODELS = [ ] # Status enums - included in generated code -ENUMS = [AssetStatus, JobStatus, ChunkJobStatus, DetectJobStatus, RunType, BrandSource] +ENUMS = [AssetStatus, JobStatus, ChunkJobStatus, DetectJobStatus, RunType, BrandSource, SourceType] # View/event models - generates TypeScript for UI consumption VIEWS = [ChunkEvent, WorkerEvent, PipelineStats, ChunkOutputFile] @@ -105,6 +107,10 @@ __all__ = [ "WorkerEvent", "PipelineStats", "ChunkOutputFile", + # Sources + "SourceType", + "SourceJob", + "ChunkInfo", # For generator "DATACLASSES", "API_MODELS", diff --git a/core/schema/models/sources.py b/core/schema/models/sources.py new file mode 100644 index 0000000..425efb4 --- /dev/null +++ b/core/schema/models/sources.py @@ -0,0 +1,39 @@ +""" +Media source models. + +Describes what types of sources the detection pipeline can process. +Only chunk_job (blobs in MinIO) is implemented now — the rest are +extension points with defined shapes. +""" + +from dataclasses import dataclass, field +from enum import Enum + + +class SourceType(str, Enum): + CHUNK_JOB = "chunk_job" # pre-chunked video segments in blob storage + UPLOAD = "upload" # future: user-uploaded file → MinIO → pipeline + DEVICE = "device" # future: local camera/capture card via ffmpeg (no MinIO) + STREAM = "stream" # future: RTMP/HLS URL via ffmpeg (no MinIO) + + +@dataclass +class ChunkInfo: + """A single chunk (video segment) stored in blob storage.""" + filename: str + key: str # storage key (MinIO object key) + size_bytes: int + + +@dataclass +class SourceJob: + """ + A group of chunks that belong together (same source video/session). + + Listed by the source selector so the user can pick a job, + then drill into its chunks. + """ + job_id: str + source_type: str # SourceType value + chunk_count: int + total_bytes: int = 0 diff --git a/core/storage/__init__.py b/core/storage/__init__.py index 5dc18f9..d6c101b 100644 --- a/core/storage/__init__.py +++ b/core/storage/__init__.py @@ -1,6 +1,5 @@ +from .blob import BUCKET, PREFIX_CHECKPOINTS, PREFIX_IN, PREFIX_OUT, BlobObject, BlobStore, get_store from .s3 import ( - BUCKET_IN, - BUCKET_OUT, download_file, download_to_temp, get_presigned_url, @@ -8,3 +7,8 @@ from .s3 import ( list_objects, upload_file, ) + +# Backward compat — old code uses BUCKET_IN / BUCKET_OUT as full bucket names. +# Now they're one bucket; these exist so existing handlers don't break. +BUCKET_IN = BUCKET +BUCKET_OUT = BUCKET diff --git a/core/storage/blob.py b/core/storage/blob.py new file mode 100644 index 0000000..53b8e79 --- /dev/null +++ b/core/storage/blob.py @@ -0,0 +1,112 @@ +""" +Cloud-agnostic blob storage interface. + +All file-based sources (chunks, uploads, checkpoints) go through MinIO. +Local dev runs MinIO in docker-compose — same code path as production. +Production changes S3_ENDPOINT_URL; nothing else changes. + +Single bucket, multiple prefixes: + in/ — source media + out/ — transcoded chunks + checkpoints/ — detection intermediate blobs (frames, crops) + +Each prefix is independently configurable via env vars so they can +be split into separate buckets later if needed. + +Nothing outside core/storage/ should import boto3 directly. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from typing import Optional + + +# Single bucket, prefix-based layout +BUCKET = os.environ.get("S3_BUCKET", "mpr") +PREFIX_IN = os.environ.get("S3_PREFIX_IN", "in/") +PREFIX_OUT = os.environ.get("S3_PREFIX_OUT", "out/") +PREFIX_CHECKPOINTS = os.environ.get("S3_PREFIX_CHECKPOINTS", "checkpoints/") + + +@dataclass +class BlobObject: + key: str + filename: str + size_bytes: int + + +class BlobStore: + """ + Thin wrapper over the S3-compatible storage backend (MinIO / AWS S3). + + All configuration (endpoint URL, credentials, region) is read from + environment variables by the underlying s3 module. + """ + + def __init__(self, bucket: str, prefix: str = ""): + self.bucket = bucket + self.prefix = prefix + + def _full_prefix(self, prefix: str) -> str: + """Combine store prefix with caller prefix.""" + return self.prefix + prefix + + def list( + self, + prefix: str = "", + extensions: Optional[set[str]] = None, + ) -> list[BlobObject]: + """List objects in the bucket, optionally filtered by extension.""" + from core.storage.s3 import list_objects + + full = self._full_prefix(prefix) + raw = list_objects(self.bucket, prefix=full, extensions=extensions) + objects = [] + for obj in raw: + blob = BlobObject( + key=obj["key"], + filename=obj["filename"], + size_bytes=obj["size"], + ) + objects.append(blob) + return objects + + def download_to_temp(self, key: str) -> str: + """Download a blob to a temp file. Caller is responsible for cleanup.""" + from core.storage.s3 import download_to_temp + + return download_to_temp(self.bucket, key) + + def upload(self, local_path: str, key: str) -> None: + """Upload a local file to the bucket.""" + from core.storage.s3 import upload_file + + upload_file(local_path, self.bucket, key) + + def get_url(self, key: str, expires: int = 3600) -> str: + """Return a presigned URL for the given key.""" + from core.storage.s3 import get_presigned_url + + return get_presigned_url(self.bucket, key, expires=expires) + + +def get_store(purpose: str = "out") -> BlobStore: + """ + Return a BlobStore for the given purpose. + + Purposes map to prefixes: + "in" → source media (S3_PREFIX_IN) + "out" → transcoded output (S3_PREFIX_OUT) + "checkpoints" → detection blobs (S3_PREFIX_CHECKPOINTS) + + All share the same bucket (S3_BUCKET), each scoped to its prefix. + """ + prefix_map = { + "in": PREFIX_IN, + "out": PREFIX_OUT, + "checkpoints": PREFIX_CHECKPOINTS, + } + prefix = prefix_map.get(purpose, "") + return BlobStore(BUCKET, prefix=prefix) diff --git a/core/storage/s3.py b/core/storage/s3.py index ac2ab65..b735533 100644 --- a/core/storage/s3.py +++ b/core/storage/s3.py @@ -13,8 +13,8 @@ from typing import Optional import boto3 from botocore.config import Config -BUCKET_IN = os.environ.get("S3_BUCKET_IN", "mpr-media-in") -BUCKET_OUT = os.environ.get("S3_BUCKET_OUT", "mpr-media-out") +BUCKET_IN = os.environ.get("S3_BUCKET_IN", "in") +BUCKET_OUT = os.environ.get("S3_BUCKET_OUT", "out") def get_s3_client(): diff --git a/ctrl/Dockerfile b/ctrl/Dockerfile index de26256..bbf2d5d 100644 --- a/ctrl/Dockerfile +++ b/ctrl/Dockerfile @@ -1,5 +1,7 @@ FROM python:3.11-slim +RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg && rm -rf /var/lib/apt/lists/* + RUN pip install --no-cache-dir uv WORKDIR /app diff --git a/ctrl/Tiltfile b/ctrl/Tiltfile index 9aef23d..8feffa2 100644 --- a/ctrl/Tiltfile +++ b/ctrl/Tiltfile @@ -14,6 +14,7 @@ docker_build( 'mpr-fastapi', context='..', dockerfile='Dockerfile', + ignore=['.git', 'def', 'docs', 'media', 'ui', 'gpu', 'modelgen', '.claude', 'tests'], live_update=[ sync('..', '/app'), ], diff --git a/ctrl/k8s/base/fastapi.yaml b/ctrl/k8s/base/fastapi.yaml index ad42020..f7d6fe7 100644 --- a/ctrl/k8s/base/fastapi.yaml +++ b/ctrl/k8s/base/fastapi.yaml @@ -32,10 +32,10 @@ spec: periodSeconds: 10 resources: requests: - memory: 128Mi - cpu: 100m - limits: memory: 512Mi + cpu: 500m + limits: + memory: 2Gi --- apiVersion: v1 kind: Service diff --git a/ctrl/k8s/base/minio.yaml b/ctrl/k8s/base/minio.yaml index 157ac85..840da05 100644 --- a/ctrl/k8s/base/minio.yaml +++ b/ctrl/k8s/base/minio.yaml @@ -5,8 +5,10 @@ metadata: namespace: mpr data: S3_ENDPOINT_URL: http://minio:9000 - S3_BUCKET_IN: mpr-media-in - S3_BUCKET_OUT: mpr-media-out + S3_BUCKET: mpr + S3_PREFIX_IN: in/ + S3_PREFIX_OUT: out/ + S3_PREFIX_CHECKPOINTS: checkpoints/ AWS_ACCESS_KEY_ID: minioadmin AWS_SECRET_ACCESS_KEY: minioadmin AWS_REGION: us-east-1 @@ -54,9 +56,7 @@ spec: - -c - | sleep 3 - for bucket in mpr-media-in mpr-media-out; do - mkdir -p /data/$bucket - done + mkdir -p /data/mpr/in /data/mpr/out /data/mpr/checkpoints volumeMounts: - name: data mountPath: /data diff --git a/ctrl/k8s/kind-config.yaml.tpl b/ctrl/k8s/kind-config.yaml.tpl new file mode 100644 index 0000000..3b45655 --- /dev/null +++ b/ctrl/k8s/kind-config.yaml.tpl @@ -0,0 +1,12 @@ +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +name: mpr +nodes: + - role: control-plane + extraPortMappings: + - containerPort: 30080 + hostPort: 80 + protocol: TCP + extraMounts: + - hostPath: ${MEDIA_HOST_PATH} + containerPath: /mnt/media diff --git a/ctrl/k8s/overlays/dev/kustomization.yaml b/ctrl/k8s/overlays/dev/kustomization.yaml index 6dddd62..f35722f 100644 --- a/ctrl/k8s/overlays/dev/kustomization.yaml +++ b/ctrl/k8s/overlays/dev/kustomization.yaml @@ -3,6 +3,7 @@ kind: Kustomization resources: - ../../base + - minio-pvc.yaml patches: # Gateway as NodePort for local access @@ -28,3 +29,40 @@ patches: - op: add path: /spec/ports/0/nodePort value: 30379 + + # MinIO with persistent storage + host media mount for seeding. + # PV survives pod restarts. Host mount is read-only for mc mirror seeding. + # Requires kind cluster created with MEDIA_HOST_PATH extraMount (see kind-create.sh). + - target: + kind: Deployment + name: minio + patch: | + - op: replace + path: /spec/template/spec/volumes/0 + value: + name: data + persistentVolumeClaim: + claimName: minio-data + - op: add + path: /spec/template/spec/containers/0/volumeMounts/- + value: + name: host-media + mountPath: /host-media + readOnly: true + - op: add + path: /spec/template/spec/volumes/- + value: + name: host-media + hostPath: + path: /mnt/media + type: DirectoryOrCreate + - op: replace + path: /spec/template/spec/containers/0/lifecycle/postStart/exec/command + value: + - /bin/sh + - -c + - | + until curl -sf http://localhost:9000/minio/health/live; do sleep 1; done + /usr/bin/mc alias set local http://localhost:9000 minioadmin minioadmin --quiet + /usr/bin/mc mb --ignore-existing local/mpr + /usr/bin/mc cp --recursive /host-media/mpr/out/ local/mpr/out/ --quiet || true diff --git a/ctrl/k8s/overlays/dev/minio-pvc.yaml b/ctrl/k8s/overlays/dev/minio-pvc.yaml new file mode 100644 index 0000000..576ad3b --- /dev/null +++ b/ctrl/k8s/overlays/dev/minio-pvc.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: minio-data + namespace: mpr +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi diff --git a/ctrl/kind-create.sh b/ctrl/kind-create.sh new file mode 100755 index 0000000..f7fdea1 --- /dev/null +++ b/ctrl/kind-create.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# Create the kind cluster with host media mount. +# Usage: MEDIA_HOST_PATH=/home/you/mpr/media ./kind-create.sh +set -euo pipefail + +: "${MEDIA_HOST_PATH:?Set MEDIA_HOST_PATH to your local media directory (e.g. /home/you/mpr/media)}" + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +CONFIG_TPL="$SCRIPT_DIR/k8s/kind-config.yaml.tpl" + +envsubst < "$CONFIG_TPL" | kind create cluster --config - +echo "Cluster 'mpr' created with media mount: $MEDIA_HOST_PATH → /mnt/media" diff --git a/detect/checkpoint/frames.py b/detect/checkpoint/frames.py index 08e6425..64d5084 100644 --- a/detect/checkpoint/frames.py +++ b/detect/checkpoint/frames.py @@ -13,7 +13,7 @@ from detect.models import Frame logger = logging.getLogger(__name__) -BUCKET = os.environ.get("S3_BUCKET_OUT", "mpr-media-out") +BUCKET = os.environ.get("S3_BUCKET_OUT", "out") CHECKPOINT_PREFIX = "checkpoints" diff --git a/requirements.txt b/requirements.txt index 0acae65..4ba097d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,14 @@ langfuse>=2.0.0 # Cloud LLM providers (only needed for cloud escalation stage) anthropic>=0.40.0 +# Detection pipeline orchestration +numpy>=1.24.0 +Pillow>=10.0.0 +imagehash>=4.3.0 +ffmpeg-python>=0.2.0 +langgraph>=0.0.30 +rapidfuzz>=3.0.0 + # Testing pytest>=7.4.0 pytest-django>=4.7.0 diff --git a/tests/detect/manual/push_logs.py b/tests/detect/manual/push_logs.py index 0e0e17c..c5841aa 100644 --- a/tests/detect/manual/push_logs.py +++ b/tests/detect/manual/push_logs.py @@ -65,7 +65,7 @@ MESSAGES = { def main(): parser = argparse.ArgumentParser() - parser.add_argument("--job", default="manual-test") + parser.add_argument("--job", default=f"logs-{int(__import__('time').time()) % 100000}") parser.add_argument("--port", type=int, default=6382) parser.add_argument("--count", type=int, default=50) parser.add_argument("--delay", type=float, default=0.2) diff --git a/ui/common/types/generated.ts b/ui/common/types/generated.ts index 46dbf6f..722d02e 100644 --- a/ui/common/types/generated.ts +++ b/ui/common/types/generated.ts @@ -10,6 +10,7 @@ export type ChunkJobStatus = "pending" | "chunking" | "processing" | "collecting export type DetectJobStatus = "pending" | "running" | "paused" | "completed" | "failed" | "cancelled"; export type RunType = "initial" | "replay" | "retry"; export type BrandSource = "ocr" | "local_vlm" | "cloud_llm" | "manual"; +export type SourceType = "chunk_job" | "upload" | "device" | "stream"; export interface MediaAsset { id: string; @@ -169,6 +170,19 @@ export interface SourceBrandSighting { created_at: string | null; } +export interface SourceJob { + job_id: string; + source_type: string; + chunk_count: number; + total_bytes: number; +} + +export interface ChunkInfo { + filename: string; + key: string; + size_bytes: number; +} + export interface CreateJobRequest { source_asset_id: string; preset_id: string | null; diff --git a/ui/detection-app/src/panels/SourceSelector.vue b/ui/detection-app/src/panels/SourceSelector.vue new file mode 100644 index 0000000..6e9a2a1 --- /dev/null +++ b/ui/detection-app/src/panels/SourceSelector.vue @@ -0,0 +1,386 @@ + + + + +