diff --git a/api/graphql.py b/api/graphql.py new file mode 100644 index 0000000..e0d5d61 --- /dev/null +++ b/api/graphql.py @@ -0,0 +1,251 @@ +""" +GraphQL API using graphene, mounted on FastAPI/Starlette. + +Provides the same data as the REST API but via GraphQL queries and mutations. +Uses Django ORM directly for data access. +Types are generated from schema/ via modelgen — see api/schema/graphql.py. +""" + +import os + +import graphene + +from api.schema.graphql import ( + CreateJobInput, + MediaAssetType, + ScanResultType, + SystemStatusType, + TranscodeJobType, + TranscodePresetType, +) +from core.storage import BUCKET_IN, list_objects + +# Media extensions (same as assets route) +VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} +AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} +MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS + + +# --------------------------------------------------------------------------- +# Queries +# --------------------------------------------------------------------------- + + +class Query(graphene.ObjectType): + assets = graphene.List( + MediaAssetType, + status=graphene.String(), + search=graphene.String(), + ) + asset = graphene.Field(MediaAssetType, id=graphene.UUID(required=True)) + jobs = graphene.List( + TranscodeJobType, + status=graphene.String(), + source_asset_id=graphene.UUID(), + ) + job = graphene.Field(TranscodeJobType, id=graphene.UUID(required=True)) + presets = graphene.List(TranscodePresetType) + system_status = graphene.Field(SystemStatusType) + + def resolve_assets(self, info, status=None, search=None): + from mpr.media_assets.models import MediaAsset + + qs = MediaAsset.objects.all() + if status: + qs = qs.filter(status=status) + if search: + qs = qs.filter(filename__icontains=search) + return qs + + def resolve_asset(self, info, id): + from mpr.media_assets.models import MediaAsset + + try: + return MediaAsset.objects.get(id=id) + except MediaAsset.DoesNotExist: + return None + + def resolve_jobs(self, info, status=None, source_asset_id=None): + from mpr.media_assets.models import TranscodeJob + + qs = TranscodeJob.objects.all() + if status: + qs = qs.filter(status=status) + if source_asset_id: + qs = qs.filter(source_asset_id=source_asset_id) + return qs + + def resolve_job(self, info, id): + from mpr.media_assets.models import TranscodeJob + + try: + return TranscodeJob.objects.get(id=id) + except TranscodeJob.DoesNotExist: + return None + + def resolve_presets(self, info): + from mpr.media_assets.models import TranscodePreset + + return TranscodePreset.objects.all() + + def resolve_system_status(self, info): + return {"status": "ok", "version": "0.1.0"} + + +# --------------------------------------------------------------------------- +# Mutations +# --------------------------------------------------------------------------- + + +class ScanMediaFolder(graphene.Mutation): + class Arguments: + pass + + Output = ScanResultType + + def mutate(self, info): + from mpr.media_assets.models import MediaAsset + + objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS) + existing = set(MediaAsset.objects.values_list("filename", flat=True)) + + registered = [] + skipped = [] + + for obj in objects: + if obj["filename"] in existing: + skipped.append(obj["filename"]) + continue + try: + MediaAsset.objects.create( + filename=obj["filename"], + file_path=obj["key"], + file_size=obj["size"], + ) + registered.append(obj["filename"]) + except Exception: + pass + + return ScanResultType( + found=len(objects), + registered=len(registered), + skipped=len(skipped), + files=registered, + ) + + +class CreateJob(graphene.Mutation): + class Arguments: + input = CreateJobInput(required=True) + + Output = TranscodeJobType + + def mutate(self, info, input): + from pathlib import Path + + from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset + + try: + source = MediaAsset.objects.get(id=input.source_asset_id) + except MediaAsset.DoesNotExist: + raise Exception("Source asset not found") + + preset = None + preset_snapshot = {} + if input.preset_id: + try: + preset = TranscodePreset.objects.get(id=input.preset_id) + preset_snapshot = { + "name": preset.name, + "container": preset.container, + "video_codec": preset.video_codec, + "audio_codec": preset.audio_codec, + } + except TranscodePreset.DoesNotExist: + raise Exception("Preset not found") + + if not preset and not input.trim_start and not input.trim_end: + raise Exception("Must specify preset_id or trim_start/trim_end") + + output_filename = input.output_filename + if not output_filename: + stem = Path(source.filename).stem + ext = preset_snapshot.get("container", "mp4") if preset else "mp4" + output_filename = f"{stem}_output.{ext}" + + job = TranscodeJob.objects.create( + source_asset_id=source.id, + preset_id=preset.id if preset else None, + preset_snapshot=preset_snapshot, + trim_start=input.trim_start, + trim_end=input.trim_end, + output_filename=output_filename, + output_path=output_filename, + priority=input.priority or 0, + ) + + # Dispatch + executor_mode = os.environ.get("MPR_EXECUTOR", "local") + if executor_mode == "lambda": + from task.executor import get_executor + + get_executor().run( + job_id=str(job.id), + source_path=source.file_path, + output_path=output_filename, + preset=preset_snapshot or None, + trim_start=input.trim_start, + trim_end=input.trim_end, + duration=source.duration, + ) + else: + from task.tasks import run_transcode_job + + result = run_transcode_job.delay( + job_id=str(job.id), + source_key=source.file_path, + output_key=output_filename, + preset=preset_snapshot or None, + trim_start=input.trim_start, + trim_end=input.trim_end, + duration=source.duration, + ) + job.celery_task_id = result.id + job.save(update_fields=["celery_task_id"]) + + return job + + +class CancelJob(graphene.Mutation): + class Arguments: + id = graphene.UUID(required=True) + + Output = TranscodeJobType + + def mutate(self, info, id): + from mpr.media_assets.models import TranscodeJob + + try: + job = TranscodeJob.objects.get(id=id) + except TranscodeJob.DoesNotExist: + raise Exception("Job not found") + + if job.status not in ("pending", "processing"): + raise Exception(f"Cannot cancel job with status: {job.status}") + + job.status = "cancelled" + job.save(update_fields=["status"]) + return job + + +class Mutation(graphene.ObjectType): + scan_media_folder = ScanMediaFolder.Field() + create_job = CreateJob.Field() + cancel_job = CancelJob.Field() + + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + +schema = graphene.Schema(query=Query, mutation=Mutation) diff --git a/api/main.py b/api/main.py index ed8b74f..c323064 100644 --- a/api/main.py +++ b/api/main.py @@ -20,7 +20,9 @@ django.setup() from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from api.graphql import schema as graphql_schema from api.routes import assets_router, jobs_router, presets_router, system_router +from starlette_graphene3 import GraphQLApp, make_graphiql_handler app = FastAPI( title="MPR API", @@ -45,6 +47,9 @@ app.include_router(assets_router, prefix="/api") app.include_router(presets_router, prefix="/api") app.include_router(jobs_router, prefix="/api") +# GraphQL +app.mount("/graphql", GraphQLApp(schema=graphql_schema, on_get=make_graphiql_handler())) + @app.get("/") def root(): diff --git a/api/routes/assets.py b/api/routes/assets.py index 5f333c2..95abaca 100644 --- a/api/routes/assets.py +++ b/api/routes/assets.py @@ -8,46 +8,27 @@ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query from api.deps import get_asset -from api.schemas import AssetCreate, AssetResponse, AssetUpdate +from api.schema import AssetCreate, AssetResponse, AssetUpdate +from core.storage import BUCKET_IN, list_objects router = APIRouter(prefix="/assets", tags=["assets"]) +# Supported media extensions +VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} +AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} +MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS + @router.post("/", response_model=AssetResponse, status_code=201) def create_asset(data: AssetCreate): - """ - Register a media file as an asset. - - The file must exist on disk. A probe task will be queued - to extract metadata asynchronously. - """ - from pathlib import Path - + """Register a media file as an asset.""" from mpr.media_assets.models import MediaAsset - # Validate file exists - path = Path(data.file_path) - if not path.exists(): - raise HTTPException(status_code=400, detail="File not found") - - # Store path relative to media root - import os - - media_root = Path(os.environ.get("MEDIA_IN", "/app/media/in")) - try: - rel_path = str(path.relative_to(media_root)) - except ValueError: - rel_path = path.name - - # Create asset asset = MediaAsset.objects.create( - filename=data.filename or path.name, - file_path=rel_path, - file_size=path.stat().st_size, + filename=data.filename or data.file_path.split("/")[-1], + file_path=data.file_path, + file_size=data.file_size, ) - - # TODO: Queue probe task via gRPC/Celery - return asset @@ -61,10 +42,8 @@ def list_assets( from mpr.media_assets.models import MediaAsset qs = MediaAsset.objects.all() - if status: qs = qs.filter(status=status) - return list(qs[offset : offset + limit]) @@ -102,62 +81,36 @@ def delete_asset(asset_id: UUID, asset=Depends(get_asset)): @router.post("/scan", response_model=dict) def scan_media_folder(): """ - Scan the media folder for new video/audio files and register them as assets. - - Returns a summary of files found and registered. + Scan the S3 media-in bucket for new video/audio files and register them as assets. """ - import os - from pathlib import Path - from mpr.media_assets.models import MediaAsset - # Get media input folder from environment - media_root = os.environ.get("MEDIA_IN", "/app/media/in") - media_path = Path(media_root) - - if not media_path.exists(): - raise HTTPException( - status_code=500, detail=f"Media folder not found: {media_root}" - ) - - # Supported video/audio extensions - video_exts = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} - audio_exts = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} - supported_exts = video_exts | audio_exts + # List objects from S3 bucket + objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS) # Get existing filenames to avoid duplicates existing_filenames = set(MediaAsset.objects.values_list("filename", flat=True)) - # Scan for media files - found_files = [] registered_files = [] skipped_files = [] - for file_path in media_path.rglob("*"): - if file_path.is_file() and file_path.suffix.lower() in supported_exts: - found_files.append(str(file_path)) + for obj in objects: + if obj["filename"] in existing_filenames: + skipped_files.append(obj["filename"]) + continue - # Skip if already registered - if file_path.name in existing_filenames: - skipped_files.append(file_path.name) - continue - - # Register new asset with path relative to media root - rel_path = str(file_path.relative_to(media_path)) - try: - asset = MediaAsset.objects.create( - filename=file_path.name, - file_path=rel_path, - file_size=file_path.stat().st_size, - ) - registered_files.append(file_path.name) - - # TODO: Queue probe task to extract metadata - except Exception as e: - print(f"Error registering {file_path.name}: {e}") + try: + MediaAsset.objects.create( + filename=obj["filename"], + file_path=obj["key"], + file_size=obj["size"], + ) + registered_files.append(obj["filename"]) + except Exception as e: + print(f"Error registering {obj['filename']}: {e}") return { - "found": len(found_files), + "found": len(objects), "registered": len(registered_files), "skipped": len(skipped_files), "files": registered_files, diff --git a/api/routes/jobs.py b/api/routes/jobs.py index 31c948b..1fe6e54 100644 --- a/api/routes/jobs.py +++ b/api/routes/jobs.py @@ -2,17 +2,20 @@ Job endpoints - transcode/trim job management. """ -import json +import os +from pathlib import Path from typing import Optional from uuid import UUID -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, Header, HTTPException, Query from api.deps import get_asset, get_job, get_preset -from api.schemas import JobCreate, JobResponse +from api.schema import JobCreate, JobResponse router = APIRouter(prefix="/jobs", tags=["jobs"]) +CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "") + @router.post("/", response_model=JobResponse, status_code=201) def create_job(data: JobCreate): @@ -36,7 +39,6 @@ def create_job(data: JobCreate): if data.preset_id: try: preset = TranscodePreset.objects.get(id=data.preset_id) - # Snapshot preset at job creation time preset_snapshot = { "name": preset.name, "container": preset.container, @@ -61,22 +63,13 @@ def create_job(data: JobCreate): status_code=400, detail="Must specify preset_id or trim_start/trim_end" ) - # Generate output filename and path - import os - from pathlib import Path - + # Generate output filename - stored as S3 key in output bucket output_filename = data.output_filename if not output_filename: stem = Path(source.filename).stem ext = preset_snapshot.get("container", "mp4") if preset else "mp4" output_filename = f"{stem}_output.{ext}" - media_out = os.environ.get("MEDIA_OUT", "/app/media/out") - output_path = str(Path(media_out) / output_filename) - - media_in = os.environ.get("MEDIA_IN", "/app/media/in") - source_path = str(Path(media_in) / source.file_path) - # Create job job = TranscodeJob.objects.create( source_asset_id=source.id, @@ -85,26 +78,95 @@ def create_job(data: JobCreate): trim_start=data.trim_start, trim_end=data.trim_end, output_filename=output_filename, - output_path=output_path, + output_path=output_filename, # S3 key in output bucket priority=data.priority or 0, ) - # Dispatch to Celery + # Dispatch based on executor mode + executor_mode = os.environ.get("MPR_EXECUTOR", "local") + + if executor_mode == "lambda": + _dispatch_lambda(job, source, preset_snapshot) + else: + _dispatch_celery(job, source, preset_snapshot) + + return job + + +def _dispatch_celery(job, source, preset_snapshot): + """Dispatch job to Celery worker.""" from task.tasks import run_transcode_job result = run_transcode_job.delay( job_id=str(job.id), - source_path=source_path, - output_path=output_path, + source_key=source.file_path, + output_key=job.output_filename, preset=preset_snapshot or None, - trim_start=data.trim_start, - trim_end=data.trim_end, + trim_start=job.trim_start, + trim_end=job.trim_end, duration=source.duration, ) job.celery_task_id = result.id job.save(update_fields=["celery_task_id"]) - return job + +def _dispatch_lambda(job, source, preset_snapshot): + """Dispatch job to AWS Step Functions.""" + from task.executor import get_executor + + executor = get_executor() + executor.run( + job_id=str(job.id), + source_path=source.file_path, + output_path=job.output_filename, + preset=preset_snapshot or None, + trim_start=job.trim_start, + trim_end=job.trim_end, + duration=source.duration, + ) + + +@router.post("/{job_id}/callback") +def job_callback( + job_id: UUID, + payload: dict, + x_api_key: Optional[str] = Header(None), +): + """ + Callback endpoint for Lambda to report job completion. + Protected by API key. + """ + if CALLBACK_API_KEY and x_api_key != CALLBACK_API_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + + from django.utils import timezone + + from mpr.media_assets.models import TranscodeJob + + try: + job = TranscodeJob.objects.get(id=job_id) + except TranscodeJob.DoesNotExist: + raise HTTPException(status_code=404, detail="Job not found") + + status = payload.get("status", "failed") + job.status = status + job.progress = 100.0 if status == "completed" else job.progress + update_fields = ["status", "progress"] + + if payload.get("error"): + job.error_message = payload["error"] + update_fields.append("error_message") + + if status == "completed": + job.completed_at = timezone.now() + update_fields.append("completed_at") + elif status == "failed": + job.completed_at = timezone.now() + update_fields.append("completed_at") + + job.save(update_fields=update_fields) + + return {"ok": True} @router.get("/", response_model=list[JobResponse]) @@ -118,12 +180,10 @@ def list_jobs( from mpr.media_assets.models import TranscodeJob qs = TranscodeJob.objects.all() - if status: qs = qs.filter(status=status) if source_asset_id: qs = qs.filter(source_asset_id=source_asset_id) - return list(qs[offset : offset + limit]) @@ -154,11 +214,8 @@ def cancel_job(job_id: UUID, job=Depends(get_job)): status_code=400, detail=f"Cannot cancel job with status: {job.status}" ) - # TODO: Cancel via gRPC - job.status = "cancelled" job.save(update_fields=["status"]) - return job @@ -173,6 +230,4 @@ def retry_job(job_id: UUID, job=Depends(get_job)): job.error_message = None job.save(update_fields=["status", "progress", "error_message"]) - # TODO: Resubmit via gRPC - return job diff --git a/api/routes/presets.py b/api/routes/presets.py index b3cac6f..7d60848 100644 --- a/api/routes/presets.py +++ b/api/routes/presets.py @@ -7,7 +7,7 @@ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException from api.deps import get_preset -from api.schemas import PresetCreate, PresetResponse, PresetUpdate +from api.schema import PresetCreate, PresetResponse, PresetUpdate router = APIRouter(prefix="/presets", tags=["presets"]) diff --git a/api/schemas/__init__.py b/api/schema/__init__.py similarity index 100% rename from api/schemas/__init__.py rename to api/schema/__init__.py diff --git a/api/schemas/asset.py b/api/schema/asset.py similarity index 95% rename from api/schemas/asset.py rename to api/schema/asset.py index 7f13d61..45f9197 100644 --- a/api/schemas/asset.py +++ b/api/schema/asset.py @@ -28,7 +28,7 @@ class AssetCreate(BaseSchema): bitrate: Optional[int] = None properties: Dict[str, Any] comments: str = "" - tags: List[str] + tags: List[str] = Field(default_factory=list) class AssetUpdate(BaseSchema): """AssetUpdate schema.""" @@ -65,6 +65,6 @@ class AssetResponse(BaseSchema): bitrate: Optional[int] = None properties: Dict[str, Any] comments: str = "" - tags: List[str] + tags: List[str] = Field(default_factory=list) created_at: Optional[datetime] = None updated_at: Optional[datetime] = None diff --git a/api/schemas/base.py b/api/schema/base.py similarity index 100% rename from api/schemas/base.py rename to api/schema/base.py diff --git a/api/schema/graphql.py b/api/schema/graphql.py new file mode 100644 index 0000000..7f7017d --- /dev/null +++ b/api/schema/graphql.py @@ -0,0 +1,129 @@ +""" +Graphene Types - GENERATED FILE + +Do not edit directly. Regenerate using modelgen. +""" + +import graphene + + +class AssetStatus(graphene.Enum): + PENDING = "pending" + READY = "ready" + ERROR = "error" + + +class JobStatus(graphene.Enum): + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class MediaAssetType(graphene.ObjectType): + """A video/audio file registered in the system.""" + + id = graphene.UUID() + filename = graphene.String() + file_path = graphene.String() + status = graphene.String() + error_message = graphene.String() + file_size = graphene.Int() + duration = graphene.Float() + video_codec = graphene.String() + audio_codec = graphene.String() + width = graphene.Int() + height = graphene.Int() + framerate = graphene.Float() + bitrate = graphene.Int() + properties = graphene.JSONString() + comments = graphene.String() + tags = graphene.List(graphene.String) + created_at = graphene.DateTime() + updated_at = graphene.DateTime() + + +class TranscodePresetType(graphene.ObjectType): + """A reusable transcoding configuration (like Handbrake presets).""" + + id = graphene.UUID() + name = graphene.String() + description = graphene.String() + is_builtin = graphene.Boolean() + container = graphene.String() + video_codec = graphene.String() + video_bitrate = graphene.String() + video_crf = graphene.Int() + video_preset = graphene.String() + resolution = graphene.String() + framerate = graphene.Float() + audio_codec = graphene.String() + audio_bitrate = graphene.String() + audio_channels = graphene.Int() + audio_samplerate = graphene.Int() + extra_args = graphene.List(graphene.String) + created_at = graphene.DateTime() + updated_at = graphene.DateTime() + + +class TranscodeJobType(graphene.ObjectType): + """A transcoding or trimming job in the queue.""" + + id = graphene.UUID() + source_asset_id = graphene.UUID() + preset_id = graphene.UUID() + preset_snapshot = graphene.JSONString() + trim_start = graphene.Float() + trim_end = graphene.Float() + output_filename = graphene.String() + output_path = graphene.String() + output_asset_id = graphene.UUID() + status = graphene.String() + progress = graphene.Float() + current_frame = graphene.Int() + current_time = graphene.Float() + speed = graphene.String() + error_message = graphene.String() + celery_task_id = graphene.String() + execution_arn = graphene.String() + priority = graphene.Int() + created_at = graphene.DateTime() + started_at = graphene.DateTime() + completed_at = graphene.DateTime() + + +class CreateJobInput(graphene.InputObjectType): + """Request body for creating a transcode/trim job.""" + + source_asset_id = graphene.UUID(required=True) + preset_id = graphene.UUID() + trim_start = graphene.Float() + trim_end = graphene.Float() + output_filename = graphene.String() + priority = graphene.Int(default_value=0) + + +class SystemStatusType(graphene.ObjectType): + """System status response.""" + + status = graphene.String() + version = graphene.String() + + +class ScanResultType(graphene.ObjectType): + """Result of scanning the media input bucket.""" + + found = graphene.Int() + registered = graphene.Int() + skipped = graphene.Int() + files = graphene.List(graphene.String) + + +class WorkerStatusType(graphene.ObjectType): + """Worker health and capabilities.""" + + available = graphene.Boolean() + active_jobs = graphene.Int() + supported_codecs = graphene.List(graphene.String) + gpu_available = graphene.Boolean() diff --git a/api/schemas/job.py b/api/schema/job.py similarity index 78% rename from api/schemas/job.py rename to api/schema/job.py index c24abe4..8414cea 100644 --- a/api/schemas/job.py +++ b/api/schema/job.py @@ -17,19 +17,27 @@ class JobStatus(str, Enum): class JobCreate(BaseSchema): - """Client-facing job creation request.""" - + """JobCreate schema.""" source_asset_id: UUID preset_id: Optional[UUID] = None + preset_snapshot: Dict[str, Any] trim_start: Optional[float] = None trim_end: Optional[float] = None - output_filename: Optional[str] = None + output_filename: str = "" + output_path: Optional[str] = None + output_asset_id: Optional[UUID] = None + progress: float = 0.0 + current_frame: Optional[int] = None + current_time: Optional[float] = None + speed: Optional[str] = None + celery_task_id: Optional[str] = None + execution_arn: Optional[str] = None priority: int = 0 - + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None class JobUpdate(BaseSchema): """JobUpdate schema.""" - source_asset_id: Optional[UUID] = None preset_id: Optional[UUID] = None preset_snapshot: Optional[Dict[str, Any]] = None @@ -45,14 +53,13 @@ class JobUpdate(BaseSchema): speed: Optional[str] = None error_message: Optional[str] = None celery_task_id: Optional[str] = None + execution_arn: Optional[str] = None priority: Optional[int] = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None - class JobResponse(BaseSchema): """JobResponse schema.""" - id: UUID source_asset_id: UUID preset_id: Optional[UUID] = None @@ -69,6 +76,7 @@ class JobResponse(BaseSchema): speed: Optional[str] = None error_message: Optional[str] = None celery_task_id: Optional[str] = None + execution_arn: Optional[str] = None priority: int = 0 created_at: Optional[datetime] = None started_at: Optional[datetime] = None diff --git a/api/schemas/preset.py b/api/schema/preset.py similarity index 94% rename from api/schemas/preset.py rename to api/schema/preset.py index 5075a55..33488d8 100644 --- a/api/schemas/preset.py +++ b/api/schema/preset.py @@ -24,7 +24,7 @@ class PresetCreate(BaseSchema): audio_bitrate: Optional[str] = None audio_channels: Optional[int] = None audio_samplerate: Optional[int] = None - extra_args: List[str] + extra_args: List[str] = Field(default_factory=list) class PresetUpdate(BaseSchema): """PresetUpdate schema.""" @@ -61,6 +61,6 @@ class PresetResponse(BaseSchema): audio_bitrate: Optional[str] = None audio_channels: Optional[int] = None audio_samplerate: Optional[int] = None - extra_args: List[str] + extra_args: List[str] = Field(default_factory=list) created_at: Optional[datetime] = None updated_at: Optional[datetime] = None diff --git a/api/schemas/models.py b/api/schemas/models.py deleted file mode 100644 index 3cd1cf5..0000000 --- a/api/schemas/models.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -Pydantic Models - GENERATED FILE - -Do not edit directly. Regenerate using modelgen. -""" - -from datetime import datetime -from enum import Enum -from typing import Any, Dict, List, Optional -from uuid import UUID - -from pydantic import BaseModel, Field - -class AssetStatus(str, Enum): - PENDING = "pending" - READY = "ready" - ERROR = "error" - -class JobStatus(str, Enum): - PENDING = "pending" - PROCESSING = "processing" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - -class MediaAsset(BaseModel): - """A video/audio file registered in the system.""" - id: UUID - filename: str - file_path: str - status: AssetStatus = "AssetStatus.PENDING" - error_message: Optional[str] = None - file_size: Optional[int] = None - duration: Optional[float] = None - video_codec: Optional[str] = None - audio_codec: Optional[str] = None - width: Optional[int] = None - height: Optional[int] = None - framerate: Optional[float] = None - bitrate: Optional[int] = None - properties: Dict[str, Any] - comments: str = "" - tags: List[str] = Field(default_factory=list) - created_at: Optional[datetime] = None - updated_at: Optional[datetime] = None - -class TranscodePreset(BaseModel): - """A reusable transcoding configuration (like Handbrake presets).""" - id: UUID - name: str - description: str = "" - is_builtin: bool = False - container: str = "mp4" - video_codec: str = "libx264" - video_bitrate: Optional[str] = None - video_crf: Optional[int] = None - video_preset: Optional[str] = None - resolution: Optional[str] = None - framerate: Optional[float] = None - audio_codec: str = "aac" - audio_bitrate: Optional[str] = None - audio_channels: Optional[int] = None - audio_samplerate: Optional[int] = None - extra_args: List[str] = Field(default_factory=list) - created_at: Optional[datetime] = None - updated_at: Optional[datetime] = None - -class TranscodeJob(BaseModel): - """A transcoding or trimming job in the queue.""" - id: UUID - source_asset_id: UUID - preset_id: Optional[UUID] = None - preset_snapshot: Dict[str, Any] - trim_start: Optional[float] = None - trim_end: Optional[float] = None - output_filename: str = "" - output_path: Optional[str] = None - output_asset_id: Optional[UUID] = None - status: JobStatus = "JobStatus.PENDING" - progress: float = 0.0 - current_frame: Optional[int] = None - current_time: Optional[float] = None - speed: Optional[str] = None - error_message: Optional[str] = None - celery_task_id: Optional[str] = None - priority: int = 0 - created_at: Optional[datetime] = None - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None diff --git a/core/storage.py b/core/storage.py new file mode 100644 index 0000000..ac2ab65 --- /dev/null +++ b/core/storage.py @@ -0,0 +1,90 @@ +""" +S3 storage layer. + +Uses MinIO locally (S3-compatible) and real AWS S3 in production. +The only difference is S3_ENDPOINT_URL: set for MinIO, omit for AWS. +""" + +import os +import tempfile +from pathlib import Path +from typing import Optional + +import boto3 +from botocore.config import Config + +BUCKET_IN = os.environ.get("S3_BUCKET_IN", "mpr-media-in") +BUCKET_OUT = os.environ.get("S3_BUCKET_OUT", "mpr-media-out") + + +def get_s3_client(): + """Get a boto3 S3 client. Works with both MinIO and real AWS S3.""" + kwargs = { + "region_name": os.environ.get("AWS_REGION", "us-east-1"), + "config": Config(signature_version="s3v4"), + } + endpoint = os.environ.get("S3_ENDPOINT_URL") + if endpoint: + kwargs["endpoint_url"] = endpoint + kwargs["aws_access_key_id"] = os.environ.get("AWS_ACCESS_KEY_ID", "minioadmin") + kwargs["aws_secret_access_key"] = os.environ.get("AWS_SECRET_ACCESS_KEY", "minioadmin") + return boto3.client("s3", **kwargs) + + +def list_objects(bucket: str, prefix: str = "", extensions: Optional[set] = None) -> list[dict]: + """List objects in an S3 bucket, optionally filtered by file extension.""" + s3 = get_s3_client() + objects = [] + kwargs = {"Bucket": bucket, "Prefix": prefix} + + while True: + response = s3.list_objects_v2(**kwargs) + for obj in response.get("Contents", []): + key = obj["Key"] + if extensions: + ext = Path(key).suffix.lower() + if ext not in extensions: + continue + objects.append({ + "key": key, + "size": obj["Size"], + "filename": Path(key).name, + }) + if not response.get("IsTruncated"): + break + kwargs["ContinuationToken"] = response["NextContinuationToken"] + + return objects + + +def download_file(bucket: str, key: str, local_path: str) -> str: + """Download a file from S3 to a local path.""" + s3 = get_s3_client() + Path(local_path).parent.mkdir(parents=True, exist_ok=True) + s3.download_file(bucket, key, local_path) + return local_path + + +def download_to_temp(bucket: str, key: str) -> str: + """Download a file from S3 to a temp file. Caller must clean up.""" + ext = Path(key).suffix + fd, tmp_path = tempfile.mkstemp(suffix=ext) + os.close(fd) + download_file(bucket, key, tmp_path) + return tmp_path + + +def upload_file(local_path: str, bucket: str, key: str) -> None: + """Upload a local file to S3.""" + s3 = get_s3_client() + s3.upload_file(local_path, bucket, key) + + +def get_presigned_url(bucket: str, key: str, expires: int = 3600) -> str: + """Generate a presigned URL for an S3 object.""" + s3 = get_s3_client() + return s3.generate_presigned_url( + "get_object", + Params={"Bucket": bucket, "Key": key}, + ExpiresIn=expires, + ) diff --git a/ctrl/.env.template b/ctrl/.env.template index 5e8c5f5..6cf3e19 100644 --- a/ctrl/.env.template +++ b/ctrl/.env.template @@ -27,9 +27,13 @@ GRPC_HOST=grpc GRPC_PORT=50051 GRPC_MAX_WORKERS=10 -# Media -MEDIA_IN=/app/media/in -MEDIA_OUT=/app/media/out +# S3 Storage (MinIO locally, real S3 on AWS) +S3_ENDPOINT_URL=http://minio:9000 +S3_BUCKET_IN=mpr-media-in +S3_BUCKET_OUT=mpr-media-out +AWS_REGION=us-east-1 +AWS_ACCESS_KEY_ID=minioadmin +AWS_SECRET_ACCESS_KEY=minioadmin # Vite VITE_ALLOWED_HOSTS=your-domain.local diff --git a/ctrl/deploy.sh b/ctrl/deploy.sh index cd71847..83c197d 100755 --- a/ctrl/deploy.sh +++ b/ctrl/deploy.sh @@ -1,18 +1,17 @@ #!/bin/bash -# Deploy MPR to remote server via rsync -# Uses project .gitignore for excludes +# MPR Deploy Script # -# Usage: ./ctrl/deploy.sh [--restart] [--dry-run] +# Usage: ./ctrl/deploy.sh [options] # -# Examples: -# ./ctrl/deploy.sh # Sync files only -# ./ctrl/deploy.sh --restart # Sync and restart services -# ./ctrl/deploy.sh --dry-run # Preview sync +# Commands: +# rsync [--restart] [--dry-run] Sync to remote server via rsync +# aws Deploy AWS infrastructure (Lambda, Step Functions, S3) -set -e +set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +cd "$PROJECT_ROOT" source "$SCRIPT_DIR/.env" 2>/dev/null || true @@ -21,56 +20,268 @@ GREEN='\033[0;32m' YELLOW='\033[1;33m' NC='\033[0m' -if [ -z "$SERVER" ] || [ -z "$REMOTE_PATH" ]; then - echo -e "${RED}Error: SERVER and REMOTE_PATH must be set in ctrl/.env${NC}" - echo "Example:" - echo " SERVER=user@host" - echo " REMOTE_PATH=~/mpr" - exit 1 -fi +# ─── Rsync Deploy ───────────────────────────────────────────────────────────── -RESTART=false -DRY_RUN="" +deploy_rsync() { + if [ -z "${SERVER:-}" ] || [ -z "${REMOTE_PATH:-}" ]; then + echo -e "${RED}Error: SERVER and REMOTE_PATH must be set in ctrl/.env${NC}" + echo "Example:" + echo " SERVER=user@host" + echo " REMOTE_PATH=~/mpr" + exit 1 + fi -while [ $# -gt 0 ]; do - case "$1" in - --restart) - RESTART=true - shift - ;; - --dry-run) - DRY_RUN="--dry-run" - shift - ;; - *) - echo "Unknown option: $1" - exit 1 - ;; - esac -done + RESTART=false + DRY_RUN="" -echo -e "${GREEN}=== Deploying MPR to $SERVER:$REMOTE_PATH ===${NC}" + while [ $# -gt 0 ]; do + case "$1" in + --restart) RESTART=true; shift ;; + --dry-run) DRY_RUN="--dry-run"; shift ;; + *) echo "Unknown option: $1"; exit 1 ;; + esac + done -# Sync files using .gitignore for excludes -echo -e "${YELLOW}Syncing files...${NC}" -rsync -avz --delete $DRY_RUN \ - --filter=':- .gitignore' \ - --exclude='.git' \ - --exclude='media/*' \ - --exclude='ctrl/.env' \ - "$PROJECT_ROOT/" "$SERVER:$REMOTE_PATH/" + echo -e "${GREEN}=== Deploying MPR to $SERVER:$REMOTE_PATH ===${NC}" -if [ -n "$DRY_RUN" ]; then - echo -e "${YELLOW}Dry run - no changes made${NC}" - exit 0 -fi + echo -e "${YELLOW}Syncing files...${NC}" + rsync -avz --delete $DRY_RUN \ + --filter=':- .gitignore' \ + --exclude='.git' \ + --exclude='media/*' \ + --exclude='ctrl/.env' \ + "$PROJECT_ROOT/" "$SERVER:$REMOTE_PATH/" -# Copy env template if .env doesn't exist on remote -ssh "$SERVER" "[ -f $REMOTE_PATH/ctrl/.env ] || cp $REMOTE_PATH/ctrl/.env.template $REMOTE_PATH/ctrl/.env" + if [ -n "$DRY_RUN" ]; then + echo -e "${YELLOW}Dry run - no changes made${NC}" + exit 0 + fi -if [ "$RESTART" = true ]; then - echo -e "${YELLOW}Restarting services...${NC}" - ssh "$SERVER" "cd $REMOTE_PATH/ctrl && docker compose down && docker compose up -d --build" -fi + ssh "$SERVER" "[ -f $REMOTE_PATH/ctrl/.env ] || cp $REMOTE_PATH/ctrl/.env.template $REMOTE_PATH/ctrl/.env" -echo -e "${GREEN}Done!${NC}" + if [ "$RESTART" = true ]; then + echo -e "${YELLOW}Restarting services...${NC}" + ssh "$SERVER" "cd $REMOTE_PATH/ctrl && docker compose down && docker compose up -d --build" + fi + + echo -e "${GREEN}Done!${NC}" +} + +# ─── AWS Deploy ──────────────────────────────────────────────────────────────── + +deploy_aws() { + REGION="${AWS_REGION:-us-east-1}" + ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) + PROJECT="mpr" + + # S3 + BUCKET_IN="${S3_BUCKET_IN:-mpr-media-in}" + BUCKET_OUT="${S3_BUCKET_OUT:-mpr-media-out}" + + # ECR + ECR_REPO="${PROJECT}-transcode" + ECR_URI="${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/${ECR_REPO}" + + # Lambda + LAMBDA_NAME="${PROJECT}-transcode" + LAMBDA_TIMEOUT=900 + LAMBDA_MEMORY=2048 + + # Step Functions + SFN_NAME="${PROJECT}-transcode" + + # IAM + LAMBDA_ROLE_NAME="${PROJECT}-lambda-role" + SFN_ROLE_NAME="${PROJECT}-sfn-role" + + # Callback + CALLBACK_URL="${CALLBACK_URL:-https://mpr.mcrn.ar/api}" + CALLBACK_API_KEY="${CALLBACK_API_KEY:-changeme}" + + echo -e "${GREEN}=== Deploying MPR to AWS ($REGION, account $ACCOUNT_ID) ===${NC}" + + # ─── S3 Buckets ─────────────────────────────────────────────────────── + + echo -e "${YELLOW}Creating S3 buckets...${NC}" + for bucket in "$BUCKET_IN" "$BUCKET_OUT"; do + if ! aws s3api head-bucket --bucket "$bucket" 2>/dev/null; then + aws s3api create-bucket \ + --bucket "$bucket" \ + --region "$REGION" \ + --create-bucket-configuration LocationConstraint="$REGION" + echo " Created $bucket" + else + echo " $bucket already exists" + fi + done + + # ─── IAM Roles ──────────────────────────────────────────────────────── + + echo -e "${YELLOW}Creating IAM roles...${NC}" + + if ! aws iam get-role --role-name "$LAMBDA_ROLE_NAME" 2>/dev/null; then + aws iam create-role \ + --role-name "$LAMBDA_ROLE_NAME" \ + --assume-role-policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole" + }] + }' + aws iam attach-role-policy \ + --role-name "$LAMBDA_ROLE_NAME" \ + --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + aws iam put-role-policy \ + --role-name "$LAMBDA_ROLE_NAME" \ + --policy-name "${PROJECT}-s3-access" \ + --policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": ["s3:GetObject", "s3:PutObject"], + "Resource": [ + "arn:aws:s3:::'"$BUCKET_IN"'/*", + "arn:aws:s3:::'"$BUCKET_OUT"'/*" + ] + }] + }' + echo " Created $LAMBDA_ROLE_NAME" + echo " Waiting for role to propagate..." + sleep 10 + else + echo " $LAMBDA_ROLE_NAME already exists" + fi + LAMBDA_ROLE_ARN=$(aws iam get-role --role-name "$LAMBDA_ROLE_NAME" --query Role.Arn --output text) + + if ! aws iam get-role --role-name "$SFN_ROLE_NAME" 2>/dev/null; then + aws iam create-role \ + --role-name "$SFN_ROLE_NAME" \ + --assume-role-policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"Service": "states.amazonaws.com"}, + "Action": "sts:AssumeRole" + }] + }' + aws iam put-role-policy \ + --role-name "$SFN_ROLE_NAME" \ + --policy-name "${PROJECT}-sfn-invoke-lambda" \ + --policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "lambda:InvokeFunction", + "Resource": "arn:aws:lambda:'"$REGION"':'"$ACCOUNT_ID"':function:'"$LAMBDA_NAME"'" + }] + }' + echo " Created $SFN_ROLE_NAME" + sleep 10 + else + echo " $SFN_ROLE_NAME already exists" + fi + SFN_ROLE_ARN=$(aws iam get-role --role-name "$SFN_ROLE_NAME" --query Role.Arn --output text) + + # ─── ECR Repository ────────────────────────────────────────────────── + + echo -e "${YELLOW}Setting up ECR...${NC}" + if ! aws ecr describe-repositories --repository-names "$ECR_REPO" --region "$REGION" 2>/dev/null; then + aws ecr create-repository --repository-name "$ECR_REPO" --region "$REGION" + echo " Created ECR repo $ECR_REPO" + else + echo " ECR repo $ECR_REPO already exists" + fi + + # ─── Build & Push Lambda Image ─────────────────────────────────────── + + echo -e "${YELLOW}Building Lambda container image...${NC}" + + docker build -f ctrl/lambda/Dockerfile -t "${ECR_REPO}:latest" . + + echo -e "${YELLOW}Pushing to ECR...${NC}" + aws ecr get-login-password --region "$REGION" | \ + docker login --username AWS --password-stdin "${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com" + + docker tag "${ECR_REPO}:latest" "${ECR_URI}:latest" + docker push "${ECR_URI}:latest" + + # ─── Lambda Function ───────────────────────────────────────────────── + + echo -e "${YELLOW}Deploying Lambda function...${NC}" + LAMBDA_ARN="arn:aws:lambda:${REGION}:${ACCOUNT_ID}:function:${LAMBDA_NAME}" + + if aws lambda get-function --function-name "$LAMBDA_NAME" --region "$REGION" 2>/dev/null; then + aws lambda update-function-code \ + --function-name "$LAMBDA_NAME" \ + --image-uri "${ECR_URI}:latest" \ + --region "$REGION" + echo " Updated $LAMBDA_NAME" + else + aws lambda create-function \ + --function-name "$LAMBDA_NAME" \ + --package-type Image \ + --code ImageUri="${ECR_URI}:latest" \ + --role "$LAMBDA_ROLE_ARN" \ + --timeout "$LAMBDA_TIMEOUT" \ + --memory-size "$LAMBDA_MEMORY" \ + --environment "Variables={S3_BUCKET_IN=${BUCKET_IN},S3_BUCKET_OUT=${BUCKET_OUT},AWS_REGION=${REGION}}" \ + --region "$REGION" + echo " Created $LAMBDA_NAME" + fi + + # ─── Step Functions ─────────────────────────────────────────────────── + + echo -e "${YELLOW}Deploying Step Functions state machine...${NC}" + + SFN_DEFINITION=$(sed "s|\${TranscodeLambdaArn}|${LAMBDA_ARN}|g" ctrl/state-machine.json) + + SFN_ARN="arn:aws:states:${REGION}:${ACCOUNT_ID}:stateMachine:${SFN_NAME}" + if aws stepfunctions describe-state-machine --state-machine-arn "$SFN_ARN" --region "$REGION" 2>/dev/null; then + aws stepfunctions update-state-machine \ + --state-machine-arn "$SFN_ARN" \ + --definition "$SFN_DEFINITION" \ + --region "$REGION" + echo " Updated $SFN_NAME" + else + aws stepfunctions create-state-machine \ + --name "$SFN_NAME" \ + --definition "$SFN_DEFINITION" \ + --role-arn "$SFN_ROLE_ARN" \ + --region "$REGION" + echo " Created $SFN_NAME" + fi + + # ─── Summary ────────────────────────────────────────────────────────── + + echo "" + echo -e "${GREEN}Deployment complete!${NC}" + echo "" + echo "Add these to your .env:" + echo " MPR_EXECUTOR=lambda" + echo " STEP_FUNCTION_ARN=${SFN_ARN}" + echo " LAMBDA_FUNCTION_ARN=${LAMBDA_ARN}" + echo " S3_BUCKET_IN=${BUCKET_IN}" + echo " S3_BUCKET_OUT=${BUCKET_OUT}" + echo " CALLBACK_URL=${CALLBACK_URL}" + echo " CALLBACK_API_KEY=${CALLBACK_API_KEY}" +} + +# ─── Main ────────────────────────────────────────────────────────────────────── + +COMMAND="${1:-}" +shift || true + +case "$COMMAND" in + rsync) deploy_rsync "$@" ;; + aws) deploy_aws "$@" ;; + *) + echo "Usage: ./ctrl/deploy.sh [options]" + echo "" + echo "Commands:" + echo " rsync [--restart] [--dry-run] Sync to remote server" + echo " aws Deploy AWS infrastructure" + exit 1 + ;; +esac diff --git a/ctrl/docker-compose.yml b/ctrl/docker-compose.yml index 27db613..0cc6c44 100644 --- a/ctrl/docker-compose.yml +++ b/ctrl/docker-compose.yml @@ -5,8 +5,12 @@ x-common-env: &common-env DEBUG: 1 GRPC_HOST: grpc GRPC_PORT: 50051 - MEDIA_IN: ${MEDIA_IN:-/app/media/in} - MEDIA_OUT: ${MEDIA_OUT:-/app/media/out} + S3_ENDPOINT_URL: http://minio:9000 + S3_BUCKET_IN: mpr-media-in + S3_BUCKET_OUT: mpr-media-out + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + AWS_REGION: us-east-1 x-healthcheck-defaults: &healthcheck-defaults interval: 5s @@ -42,17 +46,46 @@ services: <<: *healthcheck-defaults test: ["CMD", "redis-cli", "ping"] + minio: + image: minio/minio + command: ["server", "/data", "--console-address", ":9001"] + ports: + - "9000:9000" + - "9001:9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + volumes: + - minio-data:/data + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "mc", "ready", "local"] + + minio-init: + image: minio/mc + depends_on: + minio: + condition: service_healthy + entrypoint: ["/bin/sh", "-c"] + command: + - | + mc alias set local http://minio:9000 minioadmin minioadmin + mc mb --ignore-existing local/mpr-media-in + mc mb --ignore-existing local/mpr-media-out + mc anonymous set download local/mpr-media-in + mc anonymous set download local/mpr-media-out + nginx: image: nginx:alpine ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf:ro - - ../media:/app/media:ro depends_on: - django - fastapi - timeline + - minio # ============================================================================= # Application Services @@ -72,7 +105,6 @@ services: <<: *common-env volumes: - ..:/app - - ../media:/app/media depends_on: postgres: condition: service_healthy @@ -90,7 +122,6 @@ services: <<: *common-env volumes: - ..:/app - - ../media:/app/media depends_on: postgres: condition: service_healthy @@ -110,7 +141,6 @@ services: GRPC_MAX_WORKERS: 10 volumes: - ..:/app - - ../media:/app/media depends_on: postgres: condition: service_healthy @@ -127,7 +157,6 @@ services: MPR_EXECUTOR: local volumes: - ..:/app - - ../media:/app/media depends_on: postgres: condition: service_healthy @@ -150,6 +179,7 @@ services: volumes: postgres-data: redis-data: + minio-data: networks: default: diff --git a/ctrl/generate.sh b/ctrl/generate.sh index 5d7ce6f..3c7a253 100755 --- a/ctrl/generate.sh +++ b/ctrl/generate.sh @@ -1,40 +1,12 @@ #!/bin/bash # Model generation script for MPR -# Generates Django, Pydantic, TypeScript, and Protobuf from schema/models +# Generates all targets from schema/modelgen.json config set -e - cd "$(dirname "$0")/.." echo "Generating models from schema/models..." - -# Django ORM models: domain models + enums -python -m modelgen from-schema \ - --schema schema/models \ - --output mpr/media_assets/models.py \ - --targets django \ - --include dataclasses,enums - -# Pydantic schemas for FastAPI: domain models + enums -python -m modelgen from-schema \ - --schema schema/models \ - --output api/schemas/models.py \ - --targets pydantic \ - --include dataclasses,enums - -# TypeScript types for Timeline UI: domain models + enums + API types -python -m modelgen from-schema \ - --schema schema/models \ - --output ui/timeline/src/types.ts \ - --targets typescript \ - --include dataclasses,enums,api - -# Protobuf for gRPC: gRPC messages + service -python -m modelgen from-schema \ - --schema schema/models \ - --output rpc/protos/worker.proto \ - --targets proto \ - --include grpc +python -m modelgen generate --config schema/modelgen.json # Generate gRPC stubs from proto echo "Generating gRPC stubs..." diff --git a/ctrl/lambda/Dockerfile b/ctrl/lambda/Dockerfile new file mode 100644 index 0000000..13693aa --- /dev/null +++ b/ctrl/lambda/Dockerfile @@ -0,0 +1,21 @@ +FROM public.ecr.aws/lambda/python:3.11 + +# Install ffmpeg static binary +RUN yum install -y tar xz && \ + curl -L https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz -o /tmp/ffmpeg.tar.xz && \ + tar -xf /tmp/ffmpeg.tar.xz -C /tmp && \ + cp /tmp/ffmpeg-*-amd64-static/ffmpeg /usr/local/bin/ffmpeg && \ + cp /tmp/ffmpeg-*-amd64-static/ffprobe /usr/local/bin/ffprobe && \ + rm -rf /tmp/ffmpeg* && \ + yum clean all + +# Install Python dependencies +COPY ctrl/lambda/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY task/lambda_handler.py ${LAMBDA_TASK_ROOT}/task/lambda_handler.py +COPY task/__init__.py ${LAMBDA_TASK_ROOT}/task/__init__.py +COPY core/ ${LAMBDA_TASK_ROOT}/core/ + +CMD ["task.lambda_handler.handler"] diff --git a/ctrl/lambda/requirements.txt b/ctrl/lambda/requirements.txt new file mode 100644 index 0000000..4ba1e0f --- /dev/null +++ b/ctrl/lambda/requirements.txt @@ -0,0 +1,2 @@ +ffmpeg-python>=0.2.0 +requests>=2.31.0 diff --git a/ctrl/nginx.conf b/ctrl/nginx.conf index 4c252d8..34f0551 100644 --- a/ctrl/nginx.conf +++ b/ctrl/nginx.conf @@ -21,6 +21,10 @@ http { server timeline:5173; } + upstream minio { + server minio:9000; + } + server { listen 80; server_name mpr.local.ar; @@ -67,16 +71,15 @@ http { proxy_set_header Host $host; } - # Media files - input (source) - location /media/in { - alias /app/media/in; - autoindex on; + # Media files - proxied from MinIO (local) or S3 (AWS) + location /media/in/ { + proxy_pass http://minio/mpr-media-in/; + proxy_set_header Host $http_host; } - # Media files - output (transcoded) - location /media/out { - alias /app/media/out; - autoindex on; + location /media/out/ { + proxy_pass http://minio/mpr-media-out/; + proxy_set_header Host $http_host; } # Default to Timeline UI diff --git a/ctrl/state-machine.json b/ctrl/state-machine.json new file mode 100644 index 0000000..b27bf94 --- /dev/null +++ b/ctrl/state-machine.json @@ -0,0 +1,39 @@ +{ + "Comment": "MPR Transcode Job - orchestrates Lambda-based media transcoding", + "StartAt": "Transcode", + "States": { + "Transcode": { + "Type": "Task", + "Resource": "${TranscodeLambdaArn}", + "TimeoutSeconds": 900, + "Retry": [ + { + "ErrorEquals": ["States.TaskFailed", "Lambda.ServiceException"], + "IntervalSeconds": 10, + "MaxAttempts": 2, + "BackoffRate": 2.0 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Next": "HandleError", + "ResultPath": "$.error" + } + ], + "Next": "Done" + }, + "HandleError": { + "Type": "Pass", + "Parameters": { + "status": "failed", + "job_id.$": "$.job_id", + "error.$": "$.error.Cause" + }, + "Next": "Done" + }, + "Done": { + "Type": "Succeed" + } + } +} diff --git a/docs/architecture/01-system-overview.dot b/docs/architecture/01-system-overview.dot index 2ee63e0..16b4bec 100644 --- a/docs/architecture/01-system-overview.dot +++ b/docs/architecture/01-system-overview.dot @@ -3,13 +3,11 @@ digraph system_overview { node [shape=box, style=rounded, fontname="Helvetica"] edge [fontname="Helvetica", fontsize=10] - // Title labelloc="t" label="MPR - System Overview" fontsize=16 fontname="Helvetica-Bold" - // Styling graph [splines=ortho, nodesep=0.8, ranksep=0.8] // External @@ -18,7 +16,7 @@ digraph system_overview { style=dashed color=gray - browser [label="Browser\nmpr.local.ar", shape=ellipse] + browser [label="Browser\nmpr.local.ar / mpr.mcrn.ar", shape=ellipse] } // Nginx reverse proxy @@ -37,7 +35,7 @@ digraph system_overview { fillcolor="#f0f8e8" django [label="Django\n/admin\nport 8701"] - fastapi [label="FastAPI\n/api\nport 8702"] + fastapi [label="FastAPI\n/api + /graphql\nport 8702"] timeline [label="Timeline UI\n/ui\nport 5173"] } @@ -48,8 +46,17 @@ digraph system_overview { fillcolor="#fff8e8" grpc_server [label="gRPC Server\nport 50051"] - celery [label="Celery Worker\n(local)"] - lambda [label="Lambda\n(cloud)", style="dashed,rounded"] + celery [label="Celery Worker\n(local mode)"] + } + + // AWS layer + subgraph cluster_aws { + label="AWS (lambda mode)" + style=filled + fillcolor="#fde8d0" + + step_functions [label="Step Functions\nstate machine"] + lambda [label="Lambda\nFFmpeg container"] } // Data layer @@ -58,48 +65,50 @@ digraph system_overview { style=filled fillcolor="#f8e8f0" - postgres [label="PostgreSQL\nport 5433", shape=cylinder] - redis [label="Redis\nport 6380", shape=cylinder] - sqs [label="SQS\n(cloud)", shape=cylinder, style=dashed] + postgres [label="PostgreSQL\nport 5436", shape=cylinder] + redis [label="Redis\nport 6381", shape=cylinder] } // Storage subgraph cluster_storage { - label="File Storage" + label="S3 Storage" style=filled fillcolor="#f0f0f0" - local_fs [label="Local FS\n/media", shape=folder] - s3 [label="S3\n(cloud)", shape=folder, style=dashed] + minio [label="MinIO (local)\nport 9000", shape=folder] + s3 [label="AWS S3 (cloud)", shape=folder, style="dashed,rounded"] + bucket_in [label="mpr-media-in", shape=note] + bucket_out [label="mpr-media-out", shape=note] } // Connections browser -> nginx - nginx -> django [label="/admin"] - nginx -> fastapi [label="/api"] - nginx -> timeline [label="/ui"] + nginx -> django [xlabel="/admin"] + nginx -> fastapi [xlabel="/api, /graphql"] + nginx -> timeline [xlabel="/ui"] + nginx -> minio [xlabel="/media/*"] - // Django uses FastAPI for operations (single API gateway) - django -> fastapi [label="job operations"] - django -> postgres [label="CRUD only"] + timeline -> fastapi [xlabel="REST API"] - // Timeline UI uses FastAPI - timeline -> fastapi [label="REST API"] - - // FastAPI is the single API gateway fastapi -> postgres - fastapi -> redis [label="job status"] - fastapi -> grpc_server [label="gRPC\nprogress streaming"] + fastapi -> grpc_server [xlabel="gRPC\nprogress"] - // Worker layer - grpc_server -> celery [label="task dispatch"] - celery -> redis [label="queue"] - celery -> postgres [label="job updates"] - celery -> grpc_server [label="progress\ncallbacks", style=dotted] - celery -> local_fs [label="read/write"] + // Local mode + grpc_server -> celery [xlabel="task dispatch"] + celery -> redis [xlabel="queue"] + celery -> postgres [xlabel="job updates"] + celery -> minio [xlabel="S3 API\ndownload/upload"] - // Cloud (future) - lambda -> sqs [label="queue", style=dashed] - lambda -> s3 [label="read/write", style=dashed] + // Lambda mode + fastapi -> step_functions [xlabel="boto3\nstart_execution", style=dashed] + step_functions -> lambda [style=dashed] + lambda -> s3 [xlabel="download/upload", style=dashed] + lambda -> fastapi [xlabel="callback\nPOST /jobs/{id}/callback", style=dashed] + + // Storage details + minio -> bucket_in [style=dotted, arrowhead=none] + minio -> bucket_out [style=dotted, arrowhead=none] + s3 -> bucket_in [style=dotted, arrowhead=none] + s3 -> bucket_out [style=dotted, arrowhead=none] } diff --git a/docs/architecture/01-system-overview.svg b/docs/architecture/01-system-overview.svg index abe50ac..6742f6e 100644 --- a/docs/architecture/01-system-overview.svg +++ b/docs/architecture/01-system-overview.svg @@ -1,260 +1,293 @@ - - - + + system_overview - -MPR - System Overview + +MPR - System Overview cluster_external - -External + +External cluster_proxy - -Reverse Proxy + +Reverse Proxy cluster_apps - -Application Layer + +Application Layer cluster_workers - -Worker Layer + +Worker Layer -cluster_data - -Data Layer +cluster_aws + +AWS (lambda mode) +cluster_data + +Data Layer + + cluster_storage - -File Storage + +S3 Storage browser - -Browser -mpr.local.ar + +Browser +mpr.local.ar / mpr.mcrn.ar nginx - -nginx -port 80 + +nginx +port 80 browser->nginx - - + + django - -Django -/admin -port 8701 + +Django +/admin +port 8701 nginx->django - - -/admin + + +/admin fastapi - -FastAPI -/api -port 8702 + +FastAPI +/api + /graphql +port 8702 nginx->fastapi - - -/api + + +/api, /graphql timeline - -Timeline UI -/ui -port 5173 + +Timeline UI +/ui +port 5173 nginx->timeline - - -/ui + + +/ui - + + +minio + +MinIO (local) +port 9000 + + -django->fastapi - - -job operations - - - -postgres - - -PostgreSQL -port 5433 - - - -django->postgres - - -CRUD only +nginx->minio + + +/media/* grpc_server - -gRPC Server -port 50051 + +gRPC Server +port 50051 - + fastapi->grpc_server - - -gRPC -progress streaming + + +gRPC +progress + + + +step_functions + +Step Functions +state machine + + + +fastapi->step_functions + + +boto3 +start_execution + + + +postgres + + +PostgreSQL +port 5436 - + fastapi->postgres - - - - - -redis - - -Redis -port 6380 - - - -fastapi->redis - - -job status + + - + timeline->fastapi - - -REST API + + +REST API celery - -Celery Worker -(local) + +Celery Worker +(local mode) - + grpc_server->celery - - -task dispatch - - - -celery->grpc_server - - -progress -callbacks + + +task dispatch - + celery->postgres - - -job updates + + +job updates + + + +redis + + +Redis +port 6381 - + celery->redis - - -queue + + +queue - - -local_fs - -Local FS -/media - - - -celery->local_fs - - -read/write + + +celery->minio + + +S3 API +download/upload - + lambda - -Lambda -(cloud) + +Lambda +FFmpeg container - - -sqs - - -SQS -(cloud) + + +step_functions->lambda + + - + -lambda->sqs - - -queue +lambda->fastapi + + +callback +POST /jobs/{id}/callback s3 - -S3 -(cloud) + +AWS S3 (cloud) - + lambda->s3 - - -read/write + + +download/upload + + + +bucket_in + + + +mpr-media-in + + + +minio->bucket_in + + + + +bucket_out + + + +mpr-media-out + + + +minio->bucket_out + + + + +s3->bucket_in + + + + +s3->bucket_out + diff --git a/docs/architecture/02-data-model.dot b/docs/architecture/02-data-model.dot index b0348d1..37fefe8 100644 --- a/docs/architecture/02-data-model.dot +++ b/docs/architecture/02-data-model.dot @@ -10,13 +10,13 @@ digraph data_model { graph [splines=ortho, nodesep=0.6, ranksep=1.2] - MediaAsset [label="{MediaAsset|id: UUID (PK)\lfilename: str\lfile_path: str\lfile_size: int?\lstatus: pending/ready/error\lerror_message: str?\l|duration: float?\lvideo_codec: str?\laudio_codec: str?\lwidth: int?\lheight: int?\lframerate: float?\lbitrate: int?\lproperties: JSON\l|comments: str\ltags: JSON[]\l|created_at: datetime\lupdated_at: datetime\l}"] + MediaAsset [label="{MediaAsset|id: UUID (PK)\lfilename: str\lfile_path: str (S3 key)\lfile_size: int?\lstatus: pending/ready/error\lerror_message: str?\l|duration: float?\lvideo_codec: str?\laudio_codec: str?\lwidth: int?\lheight: int?\lframerate: float?\lbitrate: int?\lproperties: JSON\l|comments: str\ltags: JSON[]\l|created_at: datetime\lupdated_at: datetime\l}"] TranscodePreset [label="{TranscodePreset|id: UUID (PK)\lname: str (unique)\ldescription: str\lis_builtin: bool\l|container: str\l|video_codec: str\lvideo_bitrate: str?\lvideo_crf: int?\lvideo_preset: str?\lresolution: str?\lframerate: float?\l|audio_codec: str\laudio_bitrate: str?\laudio_channels: int?\laudio_samplerate: int?\l|extra_args: JSON[]\l|created_at: datetime\lupdated_at: datetime\l}"] - TranscodeJob [label="{TranscodeJob|id: UUID (PK)\l|source_asset_id: UUID (FK)\l|preset_id: UUID? (FK)\lpreset_snapshot: JSON\l|trim_start: float?\ltrim_end: float?\l|output_filename: str\loutput_path: str?\loutput_asset_id: UUID? (FK)\l|status: pending/processing/...\lprogress: float (0-100)\lcurrent_frame: int?\lcurrent_time: float?\lspeed: str?\lerror_message: str?\l|celery_task_id: str?\lpriority: int\l|created_at: datetime\lstarted_at: datetime?\lcompleted_at: datetime?\l}"] + TranscodeJob [label="{TranscodeJob|id: UUID (PK)\l|source_asset_id: UUID (FK)\l|preset_id: UUID? (FK)\lpreset_snapshot: JSON\l|trim_start: float?\ltrim_end: float?\l|output_filename: str\loutput_path: str? (S3 key)\loutput_asset_id: UUID? (FK)\l|status: pending/processing/...\lprogress: float (0-100)\lcurrent_frame: int?\lcurrent_time: float?\lspeed: str?\lerror_message: str?\l|celery_task_id: str?\lexecution_arn: str?\lpriority: int\l|created_at: datetime\lstarted_at: datetime?\lcompleted_at: datetime?\l}"] - MediaAsset -> TranscodeJob [label="1:N source_asset"] - TranscodePreset -> TranscodeJob [label="1:N preset"] - TranscodeJob -> MediaAsset [label="1:1 output_asset", style=dashed] + MediaAsset -> TranscodeJob [xlabel="1:N source_asset"] + TranscodePreset -> TranscodeJob [xlabel="1:N preset"] + TranscodeJob -> MediaAsset [xlabel="1:1 output_asset", style=dashed] } diff --git a/docs/architecture/02-data-model.svg b/docs/architecture/02-data-model.svg index 4fdd5ed..8eae3a9 100644 --- a/docs/architecture/02-data-model.svg +++ b/docs/architecture/02-data-model.svg @@ -1,15 +1,15 @@ - - + data_model - -MPR - Data Model + +MPR - Data Model MediaAsset @@ -18,7 +18,7 @@ id: UUID (PK) filename: str -file_path: str +file_path: str (S3 key) file_size: int? status: pending/ready/error error_message: str? @@ -41,43 +41,44 @@ TranscodeJob - -TranscodeJob - -id: UUID (PK) - -source_asset_id: UUID (FK) - -preset_id: UUID? (FK) -preset_snapshot: JSON - -trim_start: float? -trim_end: float? - -output_filename: str -output_path: str? -output_asset_id: UUID? (FK) - -status: pending/processing/... -progress: float (0-100) -current_frame: int? -current_time: float? -speed: str? -error_message: str? - -celery_task_id: str? -priority: int - -created_at: datetime -started_at: datetime? -completed_at: datetime? + +TranscodeJob + +id: UUID (PK) + +source_asset_id: UUID (FK) + +preset_id: UUID? (FK) +preset_snapshot: JSON + +trim_start: float? +trim_end: float? + +output_filename: str +output_path: str? (S3 key) +output_asset_id: UUID? (FK) + +status: pending/processing/... +progress: float (0-100) +current_frame: int? +current_time: float? +speed: str? +error_message: str? + +celery_task_id: str? +execution_arn: str? +priority: int + +created_at: datetime +started_at: datetime? +completed_at: datetime? MediaAsset->TranscodeJob - - -1:N source_asset + + +1:N source_asset @@ -112,16 +113,16 @@ TranscodePreset->TranscodeJob - - -1:N preset + + +1:N preset TranscodeJob->MediaAsset - - -1:1 output_asset + + +1:1 output_asset diff --git a/docs/architecture/03-job-flow.dot b/docs/architecture/03-job-flow.dot index 813da0c..0ae5b6e 100644 --- a/docs/architecture/03-job-flow.dot +++ b/docs/architecture/03-job-flow.dot @@ -3,7 +3,6 @@ digraph job_flow { node [shape=box, style=rounded, fontname="Helvetica"] edge [fontname="Helvetica", fontsize=10] - // Title labelloc="t" label="MPR - Job Flow" fontsize=16 @@ -11,7 +10,19 @@ digraph job_flow { graph [splines=ortho, nodesep=0.6, ranksep=0.6] - // States + // API entry points + subgraph cluster_api { + label="API Entry Points" + style=dashed + color=gray + + rest_create [label="POST /api/jobs/", shape=ellipse] + gql_create [label="mutation createJob", shape=ellipse] + rest_cancel [label="POST /api/jobs/{id}/cancel", shape=ellipse] + rest_callback [label="POST /api/jobs/{id}/callback", shape=ellipse] + } + + // Job states subgraph cluster_states { label="Job States" style=filled @@ -24,78 +35,70 @@ digraph job_flow { cancelled [label="CANCELLED", fillcolor="#6c757d", style="filled,rounded", fontcolor=white] } - // Transitions - pending -> processing [label="worker picks up"] - processing -> completed [label="success"] - processing -> failed [label="error"] - pending -> cancelled [label="user cancels"] - processing -> cancelled [label="user cancels"] - failed -> pending [label="retry"] + // State transitions + pending -> processing [xlabel="worker picks up"] + processing -> completed [xlabel="success"] + processing -> failed [xlabel="error"] + pending -> cancelled [xlabel="user cancels"] + processing -> cancelled [xlabel="user cancels"] + failed -> pending [xlabel="retry"] - // API actions - subgraph cluster_api { - label="API Actions" - style=dashed - color=gray + rest_create -> pending + gql_create -> pending + rest_cancel -> cancelled [style=dashed] - create_job [label="POST /jobs/", shape=ellipse] - cancel_job [label="POST /jobs/{id}/cancel", shape=ellipse] - retry_job [label="POST /jobs/{id}/retry", shape=ellipse] - } - - create_job -> pending - cancel_job -> cancelled [style=dashed] - retry_job -> pending [style=dashed] - - // Executor layer - subgraph cluster_executor { - label="Executor Layer" + // Executor dispatch + subgraph cluster_dispatch { + label="Executor Dispatch" style=filled fillcolor="#fff8e8" - executor [label="Executor\n(abstract)", shape=diamond] - local [label="LocalExecutor\nCelery + FFmpeg"] - lambda_exec [label="LambdaExecutor\nSQS + Lambda"] + dispatch [label="MPR_EXECUTOR", shape=diamond] } - processing -> executor - executor -> local [label="MPR_EXECUTOR=local"] - executor -> lambda_exec [label="MPR_EXECUTOR=lambda", style=dashed] + pending -> dispatch - // FFmpeg operations - subgraph cluster_ffmpeg { - label="FFmpeg Operations" + // Local path + subgraph cluster_local { + label="Local Mode (Celery)" style=filled fillcolor="#e8f4e8" - transcode [label="Transcode\n(with preset)"] - trim [label="Trim\n(-c:v copy -c:a copy)"] + celery_task [label="Celery Task\n(transcode queue)"] + s3_download [label="S3 Download\n(MinIO)"] + ffmpeg_local [label="FFmpeg\ntranscode/trim"] + s3_upload [label="S3 Upload\n(MinIO)"] + db_update [label="DB Update\n(update_job_progress)"] } - local -> transcode - local -> trim + dispatch -> celery_task [xlabel="local"] + celery_task -> s3_download + s3_download -> ffmpeg_local + ffmpeg_local -> s3_upload + s3_upload -> db_update + db_update -> completed [style=dotted] - // gRPC streaming - subgraph cluster_grpc { - label="gRPC Communication" + // Lambda path + subgraph cluster_lambda { + label="Lambda Mode (AWS)" style=filled - fillcolor="#e8e8f8" + fillcolor="#fde8d0" - grpc_stream [label="StreamProgress\n(server streaming)", shape=parallelogram] - grpc_submit [label="SubmitJob\n(unary)", shape=parallelogram] - grpc_cancel [label="CancelJob\n(unary)", shape=parallelogram] + sfn_start [label="Step Functions\nstart_execution"] + lambda_fn [label="Lambda\nFFmpeg container"] + s3_dl_aws [label="S3 Download\n(AWS)"] + ffmpeg_aws [label="FFmpeg\ntranscode/trim"] + s3_ul_aws [label="S3 Upload\n(AWS)"] + callback [label="HTTP Callback\nPOST /jobs/{id}/callback"] } - // Progress tracking via gRPC - progress [label="Progress Updates\n(gRPC → Redis → DB)", shape=note] - transcode -> progress [style=dotted] - trim -> progress [style=dotted] - progress -> grpc_stream [style=dotted, label="stream to client"] - grpc_stream -> processing [style=dotted, label="update status"] + dispatch -> sfn_start [xlabel="lambda"] + sfn_start -> lambda_fn + lambda_fn -> s3_dl_aws + s3_dl_aws -> ffmpeg_aws + ffmpeg_aws -> s3_ul_aws + s3_ul_aws -> callback + callback -> completed [style=dotted] - // gRPC job control - create_job -> grpc_submit [label="via gRPC"] - grpc_submit -> pending [style=dashed] - cancel_job -> grpc_cancel [label="via gRPC"] - grpc_cancel -> cancelled [style=dashed] + rest_callback -> completed [style=dashed, xlabel="Lambda reports"] } diff --git a/docs/architecture/03-job-flow.svg b/docs/architecture/03-job-flow.svg index 36a21bc..cb09ad1 100644 --- a/docs/architecture/03-job-flow.svg +++ b/docs/architecture/03-job-flow.svg @@ -1,296 +1,329 @@ - - - + + job_flow - -MPR - Job Flow + +MPR - Job Flow -cluster_states - -Job States +cluster_api + +API Entry Points -cluster_api - -API Actions +cluster_states + +Job States -cluster_executor - -Executor Layer +cluster_dispatch + +Executor Dispatch -cluster_ffmpeg - -FFmpeg Operations +cluster_local + +Local Mode (Celery) -cluster_grpc - -gRPC Communication +cluster_lambda + +Lambda Mode (AWS) + + + +rest_create + +POST /api/jobs/ - + pending - -PENDING + +PENDING + + + +rest_create->pending + + + + + +gql_create + +mutation createJob + + + +gql_create->pending + + + + + +rest_cancel + +POST /api/jobs/{id}/cancel + + + +cancelled + +CANCELLED + + + +rest_cancel->cancelled + + + + + +rest_callback + +POST /api/jobs/{id}/callback + + + +completed + +COMPLETED + + + +rest_callback->completed + + +Lambda reports - + processing - -PROCESSING + +PROCESSING pending->processing - - -worker picks up - - - -cancelled - -CANCELLED + + +worker picks up pending->cancelled - - -user cancels + + +user cancels - - -completed - -COMPLETED + + +dispatch + +MPR_EXECUTOR + + + +pending->dispatch + + processing->completed - - -success + + +success - + failed - -FAILED + +FAILED processing->failed - - -error + + +error processing->cancelled - - -user cancels - - - -executor - -Executor -(abstract) - - - -processing->executor - - + + +user cancels failed->pending - - -retry + + +retry - - -create_job - -POST /jobs/ - - - -create_job->pending - - - - - -grpc_submit - -SubmitJob -(unary) - - - -create_job->grpc_submit - - -via gRPC - - - -cancel_job - -POST /jobs/{id}/cancel - - - -cancel_job->cancelled - - - - - -grpc_cancel - -CancelJob -(unary) - - - -cancel_job->grpc_cancel - - -via gRPC - - - -retry_job - -POST /jobs/{id}/retry - - - -retry_job->pending - - - - - -local - -LocalExecutor -Celery + FFmpeg - - - -executor->local - - -MPR_EXECUTOR=local - - + -lambda_exec - -LambdaExecutor -SQS + Lambda +celery_task + +Celery Task +(transcode queue) - - -executor->lambda_exec - - -MPR_EXECUTOR=lambda + + +dispatch->celery_task + + +local - - -transcode - -Transcode -(with preset) + + +sfn_start + +Step Functions +start_execution - - -local->transcode - - - - - -trim - -Trim -(-c:v copy -c:a copy) - - - -local->trim - - - - - -progress - - - -Progress Updates -(gRPC → Redis → DB) - - - -transcode->progress - - - - - -trim->progress - - - - - -grpc_stream - -StreamProgress -(server streaming) - - - -grpc_stream->processing - - -update status - - - -grpc_submit->pending - - - - - -grpc_cancel->cancelled - - - - + -progress->grpc_stream - - -stream to client +dispatch->sfn_start + + +lambda + + + +s3_download + +S3 Download +(MinIO) + + + +celery_task->s3_download + + + + + +ffmpeg_local + +FFmpeg +transcode/trim + + + +s3_download->ffmpeg_local + + + + + +s3_upload + +S3 Upload +(MinIO) + + + +ffmpeg_local->s3_upload + + + + + +db_update + +DB Update +(update_job_progress) + + + +s3_upload->db_update + + + + + +db_update->completed + + + + + +lambda_fn + +Lambda +FFmpeg container + + + +sfn_start->lambda_fn + + + + + +s3_dl_aws + +S3 Download +(AWS) + + + +lambda_fn->s3_dl_aws + + + + + +ffmpeg_aws + +FFmpeg +transcode/trim + + + +s3_dl_aws->ffmpeg_aws + + + + + +s3_ul_aws + +S3 Upload +(AWS) + + + +ffmpeg_aws->s3_ul_aws + + + + + +callback + +HTTP Callback +POST /jobs/{id}/callback + + + +s3_ul_aws->callback + + + + + +callback->completed + + diff --git a/docs/architecture/04-media-storage.md b/docs/architecture/04-media-storage.md index 011f463..a410c6b 100644 --- a/docs/architecture/04-media-storage.md +++ b/docs/architecture/04-media-storage.md @@ -2,147 +2,119 @@ ## Overview -MPR separates media into **input** and **output** paths, each independently configurable. File paths are stored **relative to their respective root** to ensure portability between local development and cloud deployments (AWS S3, etc.). +MPR uses **S3-compatible storage** everywhere. Locally via MinIO, in production via AWS S3. The same boto3 code and S3 keys work in both environments - the only difference is the `S3_ENDPOINT_URL` env var. ## Storage Strategy -### Input / Output Separation +### S3 Buckets -| Path | Env Var | Purpose | -|------|---------|---------| -| `MEDIA_IN` | `/app/media/in` | Source media files to process | -| `MEDIA_OUT` | `/app/media/out` | Transcoded/trimmed output files | +| Bucket | Env Var | Purpose | +|--------|---------|---------| +| `mpr-media-in` | `S3_BUCKET_IN` | Source media files | +| `mpr-media-out` | `S3_BUCKET_OUT` | Transcoded/trimmed output | -These can point to different locations or even different servers/buckets in production. +### S3 Keys as File Paths +- **Database**: Stores S3 object keys (e.g., `video1.mp4`, `subfolder/video3.mp4`) +- **Local dev**: MinIO serves these via S3 API on port 9000 +- **AWS**: Real S3, same keys, different endpoint -### File Path Storage -- **Database**: Stores only the relative path (e.g., `videos/sample.mp4`) -- **Input Root**: Configurable via `MEDIA_IN` env var -- **Output Root**: Configurable via `MEDIA_OUT` env var -- **Serving**: Base URL configurable via `MEDIA_BASE_URL` env var +### Why S3 Everywhere? +1. **Identical code paths** - no branching between local and cloud +2. **Seamless executor switching** - Celery and Lambda both use boto3 +3. **Cloud-native** - ready for production without refactoring -### Why Relative Paths? -1. **Portability**: Same database works locally and in cloud -2. **Flexibility**: Easy to switch between storage backends -3. **Simplicity**: No need to update paths when migrating - -## Local Development +## Local Development (MinIO) ### Configuration ```bash -MEDIA_IN=/app/media/in -MEDIA_OUT=/app/media/out +S3_ENDPOINT_URL=http://minio:9000 +S3_BUCKET_IN=mpr-media-in +S3_BUCKET_OUT=mpr-media-out +AWS_ACCESS_KEY_ID=minioadmin +AWS_SECRET_ACCESS_KEY=minioadmin ``` -### File Structure -``` -/app/media/ -├── in/ # Source files -│ ├── video1.mp4 -│ ├── video2.mp4 -│ └── subfolder/ -│ └── video3.mp4 -└── out/ # Transcoded output - ├── video1_h264.mp4 - └── video2_trimmed.mp4 -``` +### How It Works +- MinIO runs as a Docker container (port 9000 API, port 9001 console) +- `minio-init` container creates buckets and sets public read access on startup +- Nginx proxies `/media/in/` and `/media/out/` to MinIO buckets +- Upload files via MinIO Console (http://localhost:9001) or `mc` CLI -### Database Storage -``` -# Source assets (scanned from media/in) -filename: video1.mp4 -file_path: video1.mp4 - -filename: video3.mp4 -file_path: subfolder/video3.mp4 -``` - -### URL Serving -- Nginx serves input via `location /media/in { alias /app/media/in; }` -- Nginx serves output via `location /media/out { alias /app/media/out; }` -- Frontend accesses: `http://mpr.local.ar/media/in/video1.mp4` -- Video player: `` - -## AWS/Cloud Deployment - -### S3 Configuration +### Upload Files to MinIO ```bash -# Input and output can be different buckets/paths -MEDIA_IN=s3://source-bucket/media/ -MEDIA_OUT=s3://output-bucket/transcoded/ -MEDIA_BASE_URL=https://source-bucket.s3.amazonaws.com/media/ +# Using mc CLI +mc alias set local http://localhost:9000 minioadmin minioadmin +mc cp video.mp4 local/mpr-media-in/ + +# Using aws CLI with endpoint override +aws --endpoint-url http://localhost:9000 s3 cp video.mp4 s3://mpr-media-in/ ``` -### S3 Structure -``` -s3://source-bucket/media/ -├── video1.mp4 -└── subfolder/ - └── video3.mp4 +## AWS Production (S3) -s3://output-bucket/transcoded/ -├── video1_h264.mp4 -└── video2_trimmed.mp4 +### Configuration +```bash +# No S3_ENDPOINT_URL = uses real AWS S3 +S3_BUCKET_IN=mpr-media-in +S3_BUCKET_OUT=mpr-media-out +AWS_REGION=us-east-1 +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= ``` -### Database Storage (Same!) +### Upload Files to S3 +```bash +aws s3 cp video.mp4 s3://mpr-media-in/ +aws s3 sync /local/media/ s3://mpr-media-in/ ``` -filename: video1.mp4 -file_path: video1.mp4 -filename: video3.mp4 -file_path: subfolder/video3.mp4 +## Storage Module + +`core/storage.py` provides all S3 operations: + +```python +from core.storage import ( + get_s3_client, # boto3 client (MinIO or AWS) + list_objects, # List bucket contents, filter by extension + download_file, # Download S3 object to local path + download_to_temp, # Download to temp file (caller cleans up) + upload_file, # Upload local file to S3 + get_presigned_url, # Generate presigned URL + BUCKET_IN, # Input bucket name + BUCKET_OUT, # Output bucket name +) ``` ## API Endpoints -### Scan Media Folder +### Scan Media (REST) ```http POST /api/assets/scan ``` +Lists objects in `S3_BUCKET_IN`, registers new media files. -**Behavior:** -1. Recursively scans `MEDIA_IN` directory -2. Finds all video/audio files (mp4, mkv, avi, mov, mp3, wav, etc.) -3. Stores paths **relative to MEDIA_IN** -4. Skips already-registered files (by filename) -5. Returns summary: `{ found, registered, skipped, files }` - -### Create Job -```http -POST /api/jobs/ -Content-Type: application/json - -{ - "source_asset_id": "uuid", - "preset_id": "uuid", - "trim_start": 10.0, - "trim_end": 30.0 -} +### Scan Media (GraphQL) +```graphql +mutation { scanMediaFolder { found registered skipped files } } ``` -**Behavior:** -- Server sets `output_path` using `MEDIA_OUT` + generated filename -- Output goes to the output directory, not alongside source files +## Job Flow with S3 -## Migration Guide +### Local Mode (Celery) +1. Celery task receives `source_key` and `output_key` +2. Downloads source from `S3_BUCKET_IN` to temp file +3. Runs FFmpeg locally +4. Uploads result to `S3_BUCKET_OUT` +5. Cleans up temp files -### Moving from Local to S3 +### Lambda Mode (AWS) +1. Step Functions invokes Lambda with S3 keys +2. Lambda downloads source from `S3_BUCKET_IN` to `/tmp` +3. Runs FFmpeg in container +4. Uploads result to `S3_BUCKET_OUT` +5. Calls back to API with result -1. **Upload source files to S3:** - ```bash - aws s3 sync /app/media/in/ s3://source-bucket/media/ - aws s3 sync /app/media/out/ s3://output-bucket/transcoded/ - ``` - -2. **Update environment variables:** - ```bash - MEDIA_IN=s3://source-bucket/media/ - MEDIA_OUT=s3://output-bucket/transcoded/ - MEDIA_BASE_URL=https://source-bucket.s3.amazonaws.com/media/ - ``` - -3. **Database paths remain unchanged** (already relative) +Both paths use the same S3 buckets and key structure. ## Supported File Types diff --git a/docs/architecture/index.html b/docs/architecture/index.html index b913f2d..e9d2dc5 100644 --- a/docs/architecture/index.html +++ b/docs/architecture/index.html @@ -9,7 +9,8 @@

MPR - Media Processor

- A web-based media transcoding tool with professional architecture. + Media transcoding platform with dual execution modes: local (Celery + + MinIO) and cloud (AWS Step Functions + Lambda + S3).