From e642908abb74edf93619ed246ad14e6d11556529 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Fri, 6 Feb 2026 18:25:42 -0300 Subject: [PATCH] shoehorning graphql, step functions and lamdas. aws deployment scripts --- api/graphql.py | 251 +++++++++++ api/main.py | 5 + api/routes/assets.py | 101 ++--- api/routes/jobs.py | 109 +++-- api/schemas/graphql_types.py | 129 ++++++ api/schemas/models.py | 1 + core/storage.py | 90 ++++ ctrl/.env.template | 10 +- ctrl/deploy.sh | 317 +++++++++++--- ctrl/docker-compose.yml | 44 +- ctrl/generate.sh | 7 + ctrl/lambda/Dockerfile | 21 + ctrl/lambda/requirements.txt | 2 + ctrl/nginx.conf | 19 +- ctrl/state-machine.json | 39 ++ docs/architecture/01-system-overview.dot | 75 ++-- docs/architecture/01-system-overview.svg | 347 ++++++++------- docs/architecture/02-data-model.dot | 10 +- docs/architecture/02-data-model.svg | 91 ++-- docs/architecture/03-job-flow.dot | 117 +++--- docs/architecture/03-job-flow.svg | 509 ++++++++++++----------- docs/architecture/index.html | 248 +++++++---- docs/media-storage.md | 186 ++++----- modelgen/generator/__init__.py | 3 + modelgen/generator/graphene.py | 236 +++++++++++ modelgen/types.py | 33 ++ mpr/media_assets/models.py | 1 + requirements.txt | 7 + schema/models/__init__.py | 5 +- schema/models/api.py | 15 +- schema/models/jobs.py | 1 + task/executor.py | 43 +- task/lambda_handler.py | 148 +++++++ task/tasks.py | 55 ++- ui/timeline/src/types.ts | 9 + 35 files changed, 2354 insertions(+), 930 deletions(-) create mode 100644 api/graphql.py create mode 100644 api/schemas/graphql_types.py create mode 100644 core/storage.py create mode 100644 ctrl/lambda/Dockerfile create mode 100644 ctrl/lambda/requirements.txt create mode 100644 ctrl/state-machine.json create mode 100644 modelgen/generator/graphene.py create mode 100644 task/lambda_handler.py diff --git a/api/graphql.py b/api/graphql.py new file mode 100644 index 0000000..3adb176 --- /dev/null +++ b/api/graphql.py @@ -0,0 +1,251 @@ +""" +GraphQL API using graphene, mounted on FastAPI/Starlette. + +Provides the same data as the REST API but via GraphQL queries and mutations. +Uses Django ORM directly for data access. +Types are generated from schema/ via modelgen — see api/schemas/graphql_types.py. +""" + +import os + +import graphene + +from api.schemas.graphql_types import ( + CreateJobInput, + MediaAssetType, + ScanResultType, + SystemStatusType, + TranscodeJobType, + TranscodePresetType, +) +from core.storage import BUCKET_IN, list_objects + +# Media extensions (same as assets route) +VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} +AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} +MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS + + +# --------------------------------------------------------------------------- +# Queries +# --------------------------------------------------------------------------- + + +class Query(graphene.ObjectType): + assets = graphene.List( + MediaAssetType, + status=graphene.String(), + search=graphene.String(), + ) + asset = graphene.Field(MediaAssetType, id=graphene.UUID(required=True)) + jobs = graphene.List( + TranscodeJobType, + status=graphene.String(), + source_asset_id=graphene.UUID(), + ) + job = graphene.Field(TranscodeJobType, id=graphene.UUID(required=True)) + presets = graphene.List(TranscodePresetType) + system_status = graphene.Field(SystemStatusType) + + def resolve_assets(self, info, status=None, search=None): + from mpr.media_assets.models import MediaAsset + + qs = MediaAsset.objects.all() + if status: + qs = qs.filter(status=status) + if search: + qs = qs.filter(filename__icontains=search) + return qs + + def resolve_asset(self, info, id): + from mpr.media_assets.models import MediaAsset + + try: + return MediaAsset.objects.get(id=id) + except MediaAsset.DoesNotExist: + return None + + def resolve_jobs(self, info, status=None, source_asset_id=None): + from mpr.media_assets.models import TranscodeJob + + qs = TranscodeJob.objects.all() + if status: + qs = qs.filter(status=status) + if source_asset_id: + qs = qs.filter(source_asset_id=source_asset_id) + return qs + + def resolve_job(self, info, id): + from mpr.media_assets.models import TranscodeJob + + try: + return TranscodeJob.objects.get(id=id) + except TranscodeJob.DoesNotExist: + return None + + def resolve_presets(self, info): + from mpr.media_assets.models import TranscodePreset + + return TranscodePreset.objects.all() + + def resolve_system_status(self, info): + return {"status": "ok", "version": "0.1.0"} + + +# --------------------------------------------------------------------------- +# Mutations +# --------------------------------------------------------------------------- + + +class ScanMediaFolder(graphene.Mutation): + class Arguments: + pass + + Output = ScanResultType + + def mutate(self, info): + from mpr.media_assets.models import MediaAsset + + objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS) + existing = set(MediaAsset.objects.values_list("filename", flat=True)) + + registered = [] + skipped = [] + + for obj in objects: + if obj["filename"] in existing: + skipped.append(obj["filename"]) + continue + try: + MediaAsset.objects.create( + filename=obj["filename"], + file_path=obj["key"], + file_size=obj["size"], + ) + registered.append(obj["filename"]) + except Exception: + pass + + return ScanResultType( + found=len(objects), + registered=len(registered), + skipped=len(skipped), + files=registered, + ) + + +class CreateJob(graphene.Mutation): + class Arguments: + input = CreateJobInput(required=True) + + Output = TranscodeJobType + + def mutate(self, info, input): + from pathlib import Path + + from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset + + try: + source = MediaAsset.objects.get(id=input.source_asset_id) + except MediaAsset.DoesNotExist: + raise Exception("Source asset not found") + + preset = None + preset_snapshot = {} + if input.preset_id: + try: + preset = TranscodePreset.objects.get(id=input.preset_id) + preset_snapshot = { + "name": preset.name, + "container": preset.container, + "video_codec": preset.video_codec, + "audio_codec": preset.audio_codec, + } + except TranscodePreset.DoesNotExist: + raise Exception("Preset not found") + + if not preset and not input.trim_start and not input.trim_end: + raise Exception("Must specify preset_id or trim_start/trim_end") + + output_filename = input.output_filename + if not output_filename: + stem = Path(source.filename).stem + ext = preset_snapshot.get("container", "mp4") if preset else "mp4" + output_filename = f"{stem}_output.{ext}" + + job = TranscodeJob.objects.create( + source_asset_id=source.id, + preset_id=preset.id if preset else None, + preset_snapshot=preset_snapshot, + trim_start=input.trim_start, + trim_end=input.trim_end, + output_filename=output_filename, + output_path=output_filename, + priority=input.priority or 0, + ) + + # Dispatch + executor_mode = os.environ.get("MPR_EXECUTOR", "local") + if executor_mode == "lambda": + from task.executor import get_executor + + get_executor().run( + job_id=str(job.id), + source_path=source.file_path, + output_path=output_filename, + preset=preset_snapshot or None, + trim_start=input.trim_start, + trim_end=input.trim_end, + duration=source.duration, + ) + else: + from task.tasks import run_transcode_job + + result = run_transcode_job.delay( + job_id=str(job.id), + source_key=source.file_path, + output_key=output_filename, + preset=preset_snapshot or None, + trim_start=input.trim_start, + trim_end=input.trim_end, + duration=source.duration, + ) + job.celery_task_id = result.id + job.save(update_fields=["celery_task_id"]) + + return job + + +class CancelJob(graphene.Mutation): + class Arguments: + id = graphene.UUID(required=True) + + Output = TranscodeJobType + + def mutate(self, info, id): + from mpr.media_assets.models import TranscodeJob + + try: + job = TranscodeJob.objects.get(id=id) + except TranscodeJob.DoesNotExist: + raise Exception("Job not found") + + if job.status not in ("pending", "processing"): + raise Exception(f"Cannot cancel job with status: {job.status}") + + job.status = "cancelled" + job.save(update_fields=["status"]) + return job + + +class Mutation(graphene.ObjectType): + scan_media_folder = ScanMediaFolder.Field() + create_job = CreateJob.Field() + cancel_job = CancelJob.Field() + + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + +schema = graphene.Schema(query=Query, mutation=Mutation) diff --git a/api/main.py b/api/main.py index ed8b74f..c323064 100644 --- a/api/main.py +++ b/api/main.py @@ -20,7 +20,9 @@ django.setup() from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from api.graphql import schema as graphql_schema from api.routes import assets_router, jobs_router, presets_router, system_router +from starlette_graphene3 import GraphQLApp, make_graphiql_handler app = FastAPI( title="MPR API", @@ -45,6 +47,9 @@ app.include_router(assets_router, prefix="/api") app.include_router(presets_router, prefix="/api") app.include_router(jobs_router, prefix="/api") +# GraphQL +app.mount("/graphql", GraphQLApp(schema=graphql_schema, on_get=make_graphiql_handler())) + @app.get("/") def root(): diff --git a/api/routes/assets.py b/api/routes/assets.py index 5f333c2..272f3c2 100644 --- a/api/routes/assets.py +++ b/api/routes/assets.py @@ -9,45 +9,26 @@ from fastapi import APIRouter, Depends, HTTPException, Query from api.deps import get_asset from api.schemas import AssetCreate, AssetResponse, AssetUpdate +from core.storage import BUCKET_IN, list_objects router = APIRouter(prefix="/assets", tags=["assets"]) +# Supported media extensions +VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} +AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} +MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS + @router.post("/", response_model=AssetResponse, status_code=201) def create_asset(data: AssetCreate): - """ - Register a media file as an asset. - - The file must exist on disk. A probe task will be queued - to extract metadata asynchronously. - """ - from pathlib import Path - + """Register a media file as an asset.""" from mpr.media_assets.models import MediaAsset - # Validate file exists - path = Path(data.file_path) - if not path.exists(): - raise HTTPException(status_code=400, detail="File not found") - - # Store path relative to media root - import os - - media_root = Path(os.environ.get("MEDIA_IN", "/app/media/in")) - try: - rel_path = str(path.relative_to(media_root)) - except ValueError: - rel_path = path.name - - # Create asset asset = MediaAsset.objects.create( - filename=data.filename or path.name, - file_path=rel_path, - file_size=path.stat().st_size, + filename=data.filename or data.file_path.split("/")[-1], + file_path=data.file_path, + file_size=data.file_size, ) - - # TODO: Queue probe task via gRPC/Celery - return asset @@ -61,10 +42,8 @@ def list_assets( from mpr.media_assets.models import MediaAsset qs = MediaAsset.objects.all() - if status: qs = qs.filter(status=status) - return list(qs[offset : offset + limit]) @@ -102,62 +81,36 @@ def delete_asset(asset_id: UUID, asset=Depends(get_asset)): @router.post("/scan", response_model=dict) def scan_media_folder(): """ - Scan the media folder for new video/audio files and register them as assets. - - Returns a summary of files found and registered. + Scan the S3 media-in bucket for new video/audio files and register them as assets. """ - import os - from pathlib import Path - from mpr.media_assets.models import MediaAsset - # Get media input folder from environment - media_root = os.environ.get("MEDIA_IN", "/app/media/in") - media_path = Path(media_root) - - if not media_path.exists(): - raise HTTPException( - status_code=500, detail=f"Media folder not found: {media_root}" - ) - - # Supported video/audio extensions - video_exts = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} - audio_exts = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} - supported_exts = video_exts | audio_exts + # List objects from S3 bucket + objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS) # Get existing filenames to avoid duplicates existing_filenames = set(MediaAsset.objects.values_list("filename", flat=True)) - # Scan for media files - found_files = [] registered_files = [] skipped_files = [] - for file_path in media_path.rglob("*"): - if file_path.is_file() and file_path.suffix.lower() in supported_exts: - found_files.append(str(file_path)) + for obj in objects: + if obj["filename"] in existing_filenames: + skipped_files.append(obj["filename"]) + continue - # Skip if already registered - if file_path.name in existing_filenames: - skipped_files.append(file_path.name) - continue - - # Register new asset with path relative to media root - rel_path = str(file_path.relative_to(media_path)) - try: - asset = MediaAsset.objects.create( - filename=file_path.name, - file_path=rel_path, - file_size=file_path.stat().st_size, - ) - registered_files.append(file_path.name) - - # TODO: Queue probe task to extract metadata - except Exception as e: - print(f"Error registering {file_path.name}: {e}") + try: + MediaAsset.objects.create( + filename=obj["filename"], + file_path=obj["key"], + file_size=obj["size"], + ) + registered_files.append(obj["filename"]) + except Exception as e: + print(f"Error registering {obj['filename']}: {e}") return { - "found": len(found_files), + "found": len(objects), "registered": len(registered_files), "skipped": len(skipped_files), "files": registered_files, diff --git a/api/routes/jobs.py b/api/routes/jobs.py index 31c948b..e4b6e5d 100644 --- a/api/routes/jobs.py +++ b/api/routes/jobs.py @@ -2,17 +2,20 @@ Job endpoints - transcode/trim job management. """ -import json +import os +from pathlib import Path from typing import Optional from uuid import UUID -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, Header, HTTPException, Query from api.deps import get_asset, get_job, get_preset from api.schemas import JobCreate, JobResponse router = APIRouter(prefix="/jobs", tags=["jobs"]) +CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "") + @router.post("/", response_model=JobResponse, status_code=201) def create_job(data: JobCreate): @@ -36,7 +39,6 @@ def create_job(data: JobCreate): if data.preset_id: try: preset = TranscodePreset.objects.get(id=data.preset_id) - # Snapshot preset at job creation time preset_snapshot = { "name": preset.name, "container": preset.container, @@ -61,22 +63,13 @@ def create_job(data: JobCreate): status_code=400, detail="Must specify preset_id or trim_start/trim_end" ) - # Generate output filename and path - import os - from pathlib import Path - + # Generate output filename - stored as S3 key in output bucket output_filename = data.output_filename if not output_filename: stem = Path(source.filename).stem ext = preset_snapshot.get("container", "mp4") if preset else "mp4" output_filename = f"{stem}_output.{ext}" - media_out = os.environ.get("MEDIA_OUT", "/app/media/out") - output_path = str(Path(media_out) / output_filename) - - media_in = os.environ.get("MEDIA_IN", "/app/media/in") - source_path = str(Path(media_in) / source.file_path) - # Create job job = TranscodeJob.objects.create( source_asset_id=source.id, @@ -85,26 +78,95 @@ def create_job(data: JobCreate): trim_start=data.trim_start, trim_end=data.trim_end, output_filename=output_filename, - output_path=output_path, + output_path=output_filename, # S3 key in output bucket priority=data.priority or 0, ) - # Dispatch to Celery + # Dispatch based on executor mode + executor_mode = os.environ.get("MPR_EXECUTOR", "local") + + if executor_mode == "lambda": + _dispatch_lambda(job, source, preset_snapshot) + else: + _dispatch_celery(job, source, preset_snapshot) + + return job + + +def _dispatch_celery(job, source, preset_snapshot): + """Dispatch job to Celery worker.""" from task.tasks import run_transcode_job result = run_transcode_job.delay( job_id=str(job.id), - source_path=source_path, - output_path=output_path, + source_key=source.file_path, + output_key=job.output_filename, preset=preset_snapshot or None, - trim_start=data.trim_start, - trim_end=data.trim_end, + trim_start=job.trim_start, + trim_end=job.trim_end, duration=source.duration, ) job.celery_task_id = result.id job.save(update_fields=["celery_task_id"]) - return job + +def _dispatch_lambda(job, source, preset_snapshot): + """Dispatch job to AWS Step Functions.""" + from task.executor import get_executor + + executor = get_executor() + executor.run( + job_id=str(job.id), + source_path=source.file_path, + output_path=job.output_filename, + preset=preset_snapshot or None, + trim_start=job.trim_start, + trim_end=job.trim_end, + duration=source.duration, + ) + + +@router.post("/{job_id}/callback") +def job_callback( + job_id: UUID, + payload: dict, + x_api_key: Optional[str] = Header(None), +): + """ + Callback endpoint for Lambda to report job completion. + Protected by API key. + """ + if CALLBACK_API_KEY and x_api_key != CALLBACK_API_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + + from django.utils import timezone + + from mpr.media_assets.models import TranscodeJob + + try: + job = TranscodeJob.objects.get(id=job_id) + except TranscodeJob.DoesNotExist: + raise HTTPException(status_code=404, detail="Job not found") + + status = payload.get("status", "failed") + job.status = status + job.progress = 100.0 if status == "completed" else job.progress + update_fields = ["status", "progress"] + + if payload.get("error"): + job.error_message = payload["error"] + update_fields.append("error_message") + + if status == "completed": + job.completed_at = timezone.now() + update_fields.append("completed_at") + elif status == "failed": + job.completed_at = timezone.now() + update_fields.append("completed_at") + + job.save(update_fields=update_fields) + + return {"ok": True} @router.get("/", response_model=list[JobResponse]) @@ -118,12 +180,10 @@ def list_jobs( from mpr.media_assets.models import TranscodeJob qs = TranscodeJob.objects.all() - if status: qs = qs.filter(status=status) if source_asset_id: qs = qs.filter(source_asset_id=source_asset_id) - return list(qs[offset : offset + limit]) @@ -154,11 +214,8 @@ def cancel_job(job_id: UUID, job=Depends(get_job)): status_code=400, detail=f"Cannot cancel job with status: {job.status}" ) - # TODO: Cancel via gRPC - job.status = "cancelled" job.save(update_fields=["status"]) - return job @@ -173,6 +230,4 @@ def retry_job(job_id: UUID, job=Depends(get_job)): job.error_message = None job.save(update_fields=["status", "progress", "error_message"]) - # TODO: Resubmit via gRPC - return job diff --git a/api/schemas/graphql_types.py b/api/schemas/graphql_types.py new file mode 100644 index 0000000..7f7017d --- /dev/null +++ b/api/schemas/graphql_types.py @@ -0,0 +1,129 @@ +""" +Graphene Types - GENERATED FILE + +Do not edit directly. Regenerate using modelgen. +""" + +import graphene + + +class AssetStatus(graphene.Enum): + PENDING = "pending" + READY = "ready" + ERROR = "error" + + +class JobStatus(graphene.Enum): + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class MediaAssetType(graphene.ObjectType): + """A video/audio file registered in the system.""" + + id = graphene.UUID() + filename = graphene.String() + file_path = graphene.String() + status = graphene.String() + error_message = graphene.String() + file_size = graphene.Int() + duration = graphene.Float() + video_codec = graphene.String() + audio_codec = graphene.String() + width = graphene.Int() + height = graphene.Int() + framerate = graphene.Float() + bitrate = graphene.Int() + properties = graphene.JSONString() + comments = graphene.String() + tags = graphene.List(graphene.String) + created_at = graphene.DateTime() + updated_at = graphene.DateTime() + + +class TranscodePresetType(graphene.ObjectType): + """A reusable transcoding configuration (like Handbrake presets).""" + + id = graphene.UUID() + name = graphene.String() + description = graphene.String() + is_builtin = graphene.Boolean() + container = graphene.String() + video_codec = graphene.String() + video_bitrate = graphene.String() + video_crf = graphene.Int() + video_preset = graphene.String() + resolution = graphene.String() + framerate = graphene.Float() + audio_codec = graphene.String() + audio_bitrate = graphene.String() + audio_channels = graphene.Int() + audio_samplerate = graphene.Int() + extra_args = graphene.List(graphene.String) + created_at = graphene.DateTime() + updated_at = graphene.DateTime() + + +class TranscodeJobType(graphene.ObjectType): + """A transcoding or trimming job in the queue.""" + + id = graphene.UUID() + source_asset_id = graphene.UUID() + preset_id = graphene.UUID() + preset_snapshot = graphene.JSONString() + trim_start = graphene.Float() + trim_end = graphene.Float() + output_filename = graphene.String() + output_path = graphene.String() + output_asset_id = graphene.UUID() + status = graphene.String() + progress = graphene.Float() + current_frame = graphene.Int() + current_time = graphene.Float() + speed = graphene.String() + error_message = graphene.String() + celery_task_id = graphene.String() + execution_arn = graphene.String() + priority = graphene.Int() + created_at = graphene.DateTime() + started_at = graphene.DateTime() + completed_at = graphene.DateTime() + + +class CreateJobInput(graphene.InputObjectType): + """Request body for creating a transcode/trim job.""" + + source_asset_id = graphene.UUID(required=True) + preset_id = graphene.UUID() + trim_start = graphene.Float() + trim_end = graphene.Float() + output_filename = graphene.String() + priority = graphene.Int(default_value=0) + + +class SystemStatusType(graphene.ObjectType): + """System status response.""" + + status = graphene.String() + version = graphene.String() + + +class ScanResultType(graphene.ObjectType): + """Result of scanning the media input bucket.""" + + found = graphene.Int() + registered = graphene.Int() + skipped = graphene.Int() + files = graphene.List(graphene.String) + + +class WorkerStatusType(graphene.ObjectType): + """Worker health and capabilities.""" + + available = graphene.Boolean() + active_jobs = graphene.Int() + supported_codecs = graphene.List(graphene.String) + gpu_available = graphene.Boolean() diff --git a/api/schemas/models.py b/api/schemas/models.py index 3cd1cf5..3408641 100644 --- a/api/schemas/models.py +++ b/api/schemas/models.py @@ -83,6 +83,7 @@ class TranscodeJob(BaseModel): speed: Optional[str] = None error_message: Optional[str] = None celery_task_id: Optional[str] = None + execution_arn: Optional[str] = None priority: int = 0 created_at: Optional[datetime] = None started_at: Optional[datetime] = None diff --git a/core/storage.py b/core/storage.py new file mode 100644 index 0000000..ac2ab65 --- /dev/null +++ b/core/storage.py @@ -0,0 +1,90 @@ +""" +S3 storage layer. + +Uses MinIO locally (S3-compatible) and real AWS S3 in production. +The only difference is S3_ENDPOINT_URL: set for MinIO, omit for AWS. +""" + +import os +import tempfile +from pathlib import Path +from typing import Optional + +import boto3 +from botocore.config import Config + +BUCKET_IN = os.environ.get("S3_BUCKET_IN", "mpr-media-in") +BUCKET_OUT = os.environ.get("S3_BUCKET_OUT", "mpr-media-out") + + +def get_s3_client(): + """Get a boto3 S3 client. Works with both MinIO and real AWS S3.""" + kwargs = { + "region_name": os.environ.get("AWS_REGION", "us-east-1"), + "config": Config(signature_version="s3v4"), + } + endpoint = os.environ.get("S3_ENDPOINT_URL") + if endpoint: + kwargs["endpoint_url"] = endpoint + kwargs["aws_access_key_id"] = os.environ.get("AWS_ACCESS_KEY_ID", "minioadmin") + kwargs["aws_secret_access_key"] = os.environ.get("AWS_SECRET_ACCESS_KEY", "minioadmin") + return boto3.client("s3", **kwargs) + + +def list_objects(bucket: str, prefix: str = "", extensions: Optional[set] = None) -> list[dict]: + """List objects in an S3 bucket, optionally filtered by file extension.""" + s3 = get_s3_client() + objects = [] + kwargs = {"Bucket": bucket, "Prefix": prefix} + + while True: + response = s3.list_objects_v2(**kwargs) + for obj in response.get("Contents", []): + key = obj["Key"] + if extensions: + ext = Path(key).suffix.lower() + if ext not in extensions: + continue + objects.append({ + "key": key, + "size": obj["Size"], + "filename": Path(key).name, + }) + if not response.get("IsTruncated"): + break + kwargs["ContinuationToken"] = response["NextContinuationToken"] + + return objects + + +def download_file(bucket: str, key: str, local_path: str) -> str: + """Download a file from S3 to a local path.""" + s3 = get_s3_client() + Path(local_path).parent.mkdir(parents=True, exist_ok=True) + s3.download_file(bucket, key, local_path) + return local_path + + +def download_to_temp(bucket: str, key: str) -> str: + """Download a file from S3 to a temp file. Caller must clean up.""" + ext = Path(key).suffix + fd, tmp_path = tempfile.mkstemp(suffix=ext) + os.close(fd) + download_file(bucket, key, tmp_path) + return tmp_path + + +def upload_file(local_path: str, bucket: str, key: str) -> None: + """Upload a local file to S3.""" + s3 = get_s3_client() + s3.upload_file(local_path, bucket, key) + + +def get_presigned_url(bucket: str, key: str, expires: int = 3600) -> str: + """Generate a presigned URL for an S3 object.""" + s3 = get_s3_client() + return s3.generate_presigned_url( + "get_object", + Params={"Bucket": bucket, "Key": key}, + ExpiresIn=expires, + ) diff --git a/ctrl/.env.template b/ctrl/.env.template index 5e8c5f5..6cf3e19 100644 --- a/ctrl/.env.template +++ b/ctrl/.env.template @@ -27,9 +27,13 @@ GRPC_HOST=grpc GRPC_PORT=50051 GRPC_MAX_WORKERS=10 -# Media -MEDIA_IN=/app/media/in -MEDIA_OUT=/app/media/out +# S3 Storage (MinIO locally, real S3 on AWS) +S3_ENDPOINT_URL=http://minio:9000 +S3_BUCKET_IN=mpr-media-in +S3_BUCKET_OUT=mpr-media-out +AWS_REGION=us-east-1 +AWS_ACCESS_KEY_ID=minioadmin +AWS_SECRET_ACCESS_KEY=minioadmin # Vite VITE_ALLOWED_HOSTS=your-domain.local diff --git a/ctrl/deploy.sh b/ctrl/deploy.sh index cd71847..83c197d 100755 --- a/ctrl/deploy.sh +++ b/ctrl/deploy.sh @@ -1,18 +1,17 @@ #!/bin/bash -# Deploy MPR to remote server via rsync -# Uses project .gitignore for excludes +# MPR Deploy Script # -# Usage: ./ctrl/deploy.sh [--restart] [--dry-run] +# Usage: ./ctrl/deploy.sh [options] # -# Examples: -# ./ctrl/deploy.sh # Sync files only -# ./ctrl/deploy.sh --restart # Sync and restart services -# ./ctrl/deploy.sh --dry-run # Preview sync +# Commands: +# rsync [--restart] [--dry-run] Sync to remote server via rsync +# aws Deploy AWS infrastructure (Lambda, Step Functions, S3) -set -e +set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +cd "$PROJECT_ROOT" source "$SCRIPT_DIR/.env" 2>/dev/null || true @@ -21,56 +20,268 @@ GREEN='\033[0;32m' YELLOW='\033[1;33m' NC='\033[0m' -if [ -z "$SERVER" ] || [ -z "$REMOTE_PATH" ]; then - echo -e "${RED}Error: SERVER and REMOTE_PATH must be set in ctrl/.env${NC}" - echo "Example:" - echo " SERVER=user@host" - echo " REMOTE_PATH=~/mpr" - exit 1 -fi +# ─── Rsync Deploy ───────────────────────────────────────────────────────────── -RESTART=false -DRY_RUN="" +deploy_rsync() { + if [ -z "${SERVER:-}" ] || [ -z "${REMOTE_PATH:-}" ]; then + echo -e "${RED}Error: SERVER and REMOTE_PATH must be set in ctrl/.env${NC}" + echo "Example:" + echo " SERVER=user@host" + echo " REMOTE_PATH=~/mpr" + exit 1 + fi -while [ $# -gt 0 ]; do - case "$1" in - --restart) - RESTART=true - shift - ;; - --dry-run) - DRY_RUN="--dry-run" - shift - ;; - *) - echo "Unknown option: $1" - exit 1 - ;; - esac -done + RESTART=false + DRY_RUN="" -echo -e "${GREEN}=== Deploying MPR to $SERVER:$REMOTE_PATH ===${NC}" + while [ $# -gt 0 ]; do + case "$1" in + --restart) RESTART=true; shift ;; + --dry-run) DRY_RUN="--dry-run"; shift ;; + *) echo "Unknown option: $1"; exit 1 ;; + esac + done -# Sync files using .gitignore for excludes -echo -e "${YELLOW}Syncing files...${NC}" -rsync -avz --delete $DRY_RUN \ - --filter=':- .gitignore' \ - --exclude='.git' \ - --exclude='media/*' \ - --exclude='ctrl/.env' \ - "$PROJECT_ROOT/" "$SERVER:$REMOTE_PATH/" + echo -e "${GREEN}=== Deploying MPR to $SERVER:$REMOTE_PATH ===${NC}" -if [ -n "$DRY_RUN" ]; then - echo -e "${YELLOW}Dry run - no changes made${NC}" - exit 0 -fi + echo -e "${YELLOW}Syncing files...${NC}" + rsync -avz --delete $DRY_RUN \ + --filter=':- .gitignore' \ + --exclude='.git' \ + --exclude='media/*' \ + --exclude='ctrl/.env' \ + "$PROJECT_ROOT/" "$SERVER:$REMOTE_PATH/" -# Copy env template if .env doesn't exist on remote -ssh "$SERVER" "[ -f $REMOTE_PATH/ctrl/.env ] || cp $REMOTE_PATH/ctrl/.env.template $REMOTE_PATH/ctrl/.env" + if [ -n "$DRY_RUN" ]; then + echo -e "${YELLOW}Dry run - no changes made${NC}" + exit 0 + fi -if [ "$RESTART" = true ]; then - echo -e "${YELLOW}Restarting services...${NC}" - ssh "$SERVER" "cd $REMOTE_PATH/ctrl && docker compose down && docker compose up -d --build" -fi + ssh "$SERVER" "[ -f $REMOTE_PATH/ctrl/.env ] || cp $REMOTE_PATH/ctrl/.env.template $REMOTE_PATH/ctrl/.env" -echo -e "${GREEN}Done!${NC}" + if [ "$RESTART" = true ]; then + echo -e "${YELLOW}Restarting services...${NC}" + ssh "$SERVER" "cd $REMOTE_PATH/ctrl && docker compose down && docker compose up -d --build" + fi + + echo -e "${GREEN}Done!${NC}" +} + +# ─── AWS Deploy ──────────────────────────────────────────────────────────────── + +deploy_aws() { + REGION="${AWS_REGION:-us-east-1}" + ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) + PROJECT="mpr" + + # S3 + BUCKET_IN="${S3_BUCKET_IN:-mpr-media-in}" + BUCKET_OUT="${S3_BUCKET_OUT:-mpr-media-out}" + + # ECR + ECR_REPO="${PROJECT}-transcode" + ECR_URI="${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/${ECR_REPO}" + + # Lambda + LAMBDA_NAME="${PROJECT}-transcode" + LAMBDA_TIMEOUT=900 + LAMBDA_MEMORY=2048 + + # Step Functions + SFN_NAME="${PROJECT}-transcode" + + # IAM + LAMBDA_ROLE_NAME="${PROJECT}-lambda-role" + SFN_ROLE_NAME="${PROJECT}-sfn-role" + + # Callback + CALLBACK_URL="${CALLBACK_URL:-https://mpr.mcrn.ar/api}" + CALLBACK_API_KEY="${CALLBACK_API_KEY:-changeme}" + + echo -e "${GREEN}=== Deploying MPR to AWS ($REGION, account $ACCOUNT_ID) ===${NC}" + + # ─── S3 Buckets ─────────────────────────────────────────────────────── + + echo -e "${YELLOW}Creating S3 buckets...${NC}" + for bucket in "$BUCKET_IN" "$BUCKET_OUT"; do + if ! aws s3api head-bucket --bucket "$bucket" 2>/dev/null; then + aws s3api create-bucket \ + --bucket "$bucket" \ + --region "$REGION" \ + --create-bucket-configuration LocationConstraint="$REGION" + echo " Created $bucket" + else + echo " $bucket already exists" + fi + done + + # ─── IAM Roles ──────────────────────────────────────────────────────── + + echo -e "${YELLOW}Creating IAM roles...${NC}" + + if ! aws iam get-role --role-name "$LAMBDA_ROLE_NAME" 2>/dev/null; then + aws iam create-role \ + --role-name "$LAMBDA_ROLE_NAME" \ + --assume-role-policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole" + }] + }' + aws iam attach-role-policy \ + --role-name "$LAMBDA_ROLE_NAME" \ + --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + aws iam put-role-policy \ + --role-name "$LAMBDA_ROLE_NAME" \ + --policy-name "${PROJECT}-s3-access" \ + --policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": ["s3:GetObject", "s3:PutObject"], + "Resource": [ + "arn:aws:s3:::'"$BUCKET_IN"'/*", + "arn:aws:s3:::'"$BUCKET_OUT"'/*" + ] + }] + }' + echo " Created $LAMBDA_ROLE_NAME" + echo " Waiting for role to propagate..." + sleep 10 + else + echo " $LAMBDA_ROLE_NAME already exists" + fi + LAMBDA_ROLE_ARN=$(aws iam get-role --role-name "$LAMBDA_ROLE_NAME" --query Role.Arn --output text) + + if ! aws iam get-role --role-name "$SFN_ROLE_NAME" 2>/dev/null; then + aws iam create-role \ + --role-name "$SFN_ROLE_NAME" \ + --assume-role-policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"Service": "states.amazonaws.com"}, + "Action": "sts:AssumeRole" + }] + }' + aws iam put-role-policy \ + --role-name "$SFN_ROLE_NAME" \ + --policy-name "${PROJECT}-sfn-invoke-lambda" \ + --policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "lambda:InvokeFunction", + "Resource": "arn:aws:lambda:'"$REGION"':'"$ACCOUNT_ID"':function:'"$LAMBDA_NAME"'" + }] + }' + echo " Created $SFN_ROLE_NAME" + sleep 10 + else + echo " $SFN_ROLE_NAME already exists" + fi + SFN_ROLE_ARN=$(aws iam get-role --role-name "$SFN_ROLE_NAME" --query Role.Arn --output text) + + # ─── ECR Repository ────────────────────────────────────────────────── + + echo -e "${YELLOW}Setting up ECR...${NC}" + if ! aws ecr describe-repositories --repository-names "$ECR_REPO" --region "$REGION" 2>/dev/null; then + aws ecr create-repository --repository-name "$ECR_REPO" --region "$REGION" + echo " Created ECR repo $ECR_REPO" + else + echo " ECR repo $ECR_REPO already exists" + fi + + # ─── Build & Push Lambda Image ─────────────────────────────────────── + + echo -e "${YELLOW}Building Lambda container image...${NC}" + + docker build -f ctrl/lambda/Dockerfile -t "${ECR_REPO}:latest" . + + echo -e "${YELLOW}Pushing to ECR...${NC}" + aws ecr get-login-password --region "$REGION" | \ + docker login --username AWS --password-stdin "${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com" + + docker tag "${ECR_REPO}:latest" "${ECR_URI}:latest" + docker push "${ECR_URI}:latest" + + # ─── Lambda Function ───────────────────────────────────────────────── + + echo -e "${YELLOW}Deploying Lambda function...${NC}" + LAMBDA_ARN="arn:aws:lambda:${REGION}:${ACCOUNT_ID}:function:${LAMBDA_NAME}" + + if aws lambda get-function --function-name "$LAMBDA_NAME" --region "$REGION" 2>/dev/null; then + aws lambda update-function-code \ + --function-name "$LAMBDA_NAME" \ + --image-uri "${ECR_URI}:latest" \ + --region "$REGION" + echo " Updated $LAMBDA_NAME" + else + aws lambda create-function \ + --function-name "$LAMBDA_NAME" \ + --package-type Image \ + --code ImageUri="${ECR_URI}:latest" \ + --role "$LAMBDA_ROLE_ARN" \ + --timeout "$LAMBDA_TIMEOUT" \ + --memory-size "$LAMBDA_MEMORY" \ + --environment "Variables={S3_BUCKET_IN=${BUCKET_IN},S3_BUCKET_OUT=${BUCKET_OUT},AWS_REGION=${REGION}}" \ + --region "$REGION" + echo " Created $LAMBDA_NAME" + fi + + # ─── Step Functions ─────────────────────────────────────────────────── + + echo -e "${YELLOW}Deploying Step Functions state machine...${NC}" + + SFN_DEFINITION=$(sed "s|\${TranscodeLambdaArn}|${LAMBDA_ARN}|g" ctrl/state-machine.json) + + SFN_ARN="arn:aws:states:${REGION}:${ACCOUNT_ID}:stateMachine:${SFN_NAME}" + if aws stepfunctions describe-state-machine --state-machine-arn "$SFN_ARN" --region "$REGION" 2>/dev/null; then + aws stepfunctions update-state-machine \ + --state-machine-arn "$SFN_ARN" \ + --definition "$SFN_DEFINITION" \ + --region "$REGION" + echo " Updated $SFN_NAME" + else + aws stepfunctions create-state-machine \ + --name "$SFN_NAME" \ + --definition "$SFN_DEFINITION" \ + --role-arn "$SFN_ROLE_ARN" \ + --region "$REGION" + echo " Created $SFN_NAME" + fi + + # ─── Summary ────────────────────────────────────────────────────────── + + echo "" + echo -e "${GREEN}Deployment complete!${NC}" + echo "" + echo "Add these to your .env:" + echo " MPR_EXECUTOR=lambda" + echo " STEP_FUNCTION_ARN=${SFN_ARN}" + echo " LAMBDA_FUNCTION_ARN=${LAMBDA_ARN}" + echo " S3_BUCKET_IN=${BUCKET_IN}" + echo " S3_BUCKET_OUT=${BUCKET_OUT}" + echo " CALLBACK_URL=${CALLBACK_URL}" + echo " CALLBACK_API_KEY=${CALLBACK_API_KEY}" +} + +# ─── Main ────────────────────────────────────────────────────────────────────── + +COMMAND="${1:-}" +shift || true + +case "$COMMAND" in + rsync) deploy_rsync "$@" ;; + aws) deploy_aws "$@" ;; + *) + echo "Usage: ./ctrl/deploy.sh [options]" + echo "" + echo "Commands:" + echo " rsync [--restart] [--dry-run] Sync to remote server" + echo " aws Deploy AWS infrastructure" + exit 1 + ;; +esac diff --git a/ctrl/docker-compose.yml b/ctrl/docker-compose.yml index 27db613..0cc6c44 100644 --- a/ctrl/docker-compose.yml +++ b/ctrl/docker-compose.yml @@ -5,8 +5,12 @@ x-common-env: &common-env DEBUG: 1 GRPC_HOST: grpc GRPC_PORT: 50051 - MEDIA_IN: ${MEDIA_IN:-/app/media/in} - MEDIA_OUT: ${MEDIA_OUT:-/app/media/out} + S3_ENDPOINT_URL: http://minio:9000 + S3_BUCKET_IN: mpr-media-in + S3_BUCKET_OUT: mpr-media-out + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + AWS_REGION: us-east-1 x-healthcheck-defaults: &healthcheck-defaults interval: 5s @@ -42,17 +46,46 @@ services: <<: *healthcheck-defaults test: ["CMD", "redis-cli", "ping"] + minio: + image: minio/minio + command: ["server", "/data", "--console-address", ":9001"] + ports: + - "9000:9000" + - "9001:9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + volumes: + - minio-data:/data + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "mc", "ready", "local"] + + minio-init: + image: minio/mc + depends_on: + minio: + condition: service_healthy + entrypoint: ["/bin/sh", "-c"] + command: + - | + mc alias set local http://minio:9000 minioadmin minioadmin + mc mb --ignore-existing local/mpr-media-in + mc mb --ignore-existing local/mpr-media-out + mc anonymous set download local/mpr-media-in + mc anonymous set download local/mpr-media-out + nginx: image: nginx:alpine ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf:ro - - ../media:/app/media:ro depends_on: - django - fastapi - timeline + - minio # ============================================================================= # Application Services @@ -72,7 +105,6 @@ services: <<: *common-env volumes: - ..:/app - - ../media:/app/media depends_on: postgres: condition: service_healthy @@ -90,7 +122,6 @@ services: <<: *common-env volumes: - ..:/app - - ../media:/app/media depends_on: postgres: condition: service_healthy @@ -110,7 +141,6 @@ services: GRPC_MAX_WORKERS: 10 volumes: - ..:/app - - ../media:/app/media depends_on: postgres: condition: service_healthy @@ -127,7 +157,6 @@ services: MPR_EXECUTOR: local volumes: - ..:/app - - ../media:/app/media depends_on: postgres: condition: service_healthy @@ -150,6 +179,7 @@ services: volumes: postgres-data: redis-data: + minio-data: networks: default: diff --git a/ctrl/generate.sh b/ctrl/generate.sh index 5d7ce6f..dc8f689 100755 --- a/ctrl/generate.sh +++ b/ctrl/generate.sh @@ -29,6 +29,13 @@ python -m modelgen from-schema \ --targets typescript \ --include dataclasses,enums,api +# Graphene types for GraphQL: domain models + enums + API types +python -m modelgen from-schema \ + --schema schema/models \ + --output api/schemas/graphql_types.py \ + --targets graphene \ + --include dataclasses,enums,api + # Protobuf for gRPC: gRPC messages + service python -m modelgen from-schema \ --schema schema/models \ diff --git a/ctrl/lambda/Dockerfile b/ctrl/lambda/Dockerfile new file mode 100644 index 0000000..13693aa --- /dev/null +++ b/ctrl/lambda/Dockerfile @@ -0,0 +1,21 @@ +FROM public.ecr.aws/lambda/python:3.11 + +# Install ffmpeg static binary +RUN yum install -y tar xz && \ + curl -L https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz -o /tmp/ffmpeg.tar.xz && \ + tar -xf /tmp/ffmpeg.tar.xz -C /tmp && \ + cp /tmp/ffmpeg-*-amd64-static/ffmpeg /usr/local/bin/ffmpeg && \ + cp /tmp/ffmpeg-*-amd64-static/ffprobe /usr/local/bin/ffprobe && \ + rm -rf /tmp/ffmpeg* && \ + yum clean all + +# Install Python dependencies +COPY ctrl/lambda/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY task/lambda_handler.py ${LAMBDA_TASK_ROOT}/task/lambda_handler.py +COPY task/__init__.py ${LAMBDA_TASK_ROOT}/task/__init__.py +COPY core/ ${LAMBDA_TASK_ROOT}/core/ + +CMD ["task.lambda_handler.handler"] diff --git a/ctrl/lambda/requirements.txt b/ctrl/lambda/requirements.txt new file mode 100644 index 0000000..4ba1e0f --- /dev/null +++ b/ctrl/lambda/requirements.txt @@ -0,0 +1,2 @@ +ffmpeg-python>=0.2.0 +requests>=2.31.0 diff --git a/ctrl/nginx.conf b/ctrl/nginx.conf index 4c252d8..34f0551 100644 --- a/ctrl/nginx.conf +++ b/ctrl/nginx.conf @@ -21,6 +21,10 @@ http { server timeline:5173; } + upstream minio { + server minio:9000; + } + server { listen 80; server_name mpr.local.ar; @@ -67,16 +71,15 @@ http { proxy_set_header Host $host; } - # Media files - input (source) - location /media/in { - alias /app/media/in; - autoindex on; + # Media files - proxied from MinIO (local) or S3 (AWS) + location /media/in/ { + proxy_pass http://minio/mpr-media-in/; + proxy_set_header Host $http_host; } - # Media files - output (transcoded) - location /media/out { - alias /app/media/out; - autoindex on; + location /media/out/ { + proxy_pass http://minio/mpr-media-out/; + proxy_set_header Host $http_host; } # Default to Timeline UI diff --git a/ctrl/state-machine.json b/ctrl/state-machine.json new file mode 100644 index 0000000..b27bf94 --- /dev/null +++ b/ctrl/state-machine.json @@ -0,0 +1,39 @@ +{ + "Comment": "MPR Transcode Job - orchestrates Lambda-based media transcoding", + "StartAt": "Transcode", + "States": { + "Transcode": { + "Type": "Task", + "Resource": "${TranscodeLambdaArn}", + "TimeoutSeconds": 900, + "Retry": [ + { + "ErrorEquals": ["States.TaskFailed", "Lambda.ServiceException"], + "IntervalSeconds": 10, + "MaxAttempts": 2, + "BackoffRate": 2.0 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Next": "HandleError", + "ResultPath": "$.error" + } + ], + "Next": "Done" + }, + "HandleError": { + "Type": "Pass", + "Parameters": { + "status": "failed", + "job_id.$": "$.job_id", + "error.$": "$.error.Cause" + }, + "Next": "Done" + }, + "Done": { + "Type": "Succeed" + } + } +} diff --git a/docs/architecture/01-system-overview.dot b/docs/architecture/01-system-overview.dot index 2ee63e0..16b4bec 100644 --- a/docs/architecture/01-system-overview.dot +++ b/docs/architecture/01-system-overview.dot @@ -3,13 +3,11 @@ digraph system_overview { node [shape=box, style=rounded, fontname="Helvetica"] edge [fontname="Helvetica", fontsize=10] - // Title labelloc="t" label="MPR - System Overview" fontsize=16 fontname="Helvetica-Bold" - // Styling graph [splines=ortho, nodesep=0.8, ranksep=0.8] // External @@ -18,7 +16,7 @@ digraph system_overview { style=dashed color=gray - browser [label="Browser\nmpr.local.ar", shape=ellipse] + browser [label="Browser\nmpr.local.ar / mpr.mcrn.ar", shape=ellipse] } // Nginx reverse proxy @@ -37,7 +35,7 @@ digraph system_overview { fillcolor="#f0f8e8" django [label="Django\n/admin\nport 8701"] - fastapi [label="FastAPI\n/api\nport 8702"] + fastapi [label="FastAPI\n/api + /graphql\nport 8702"] timeline [label="Timeline UI\n/ui\nport 5173"] } @@ -48,8 +46,17 @@ digraph system_overview { fillcolor="#fff8e8" grpc_server [label="gRPC Server\nport 50051"] - celery [label="Celery Worker\n(local)"] - lambda [label="Lambda\n(cloud)", style="dashed,rounded"] + celery [label="Celery Worker\n(local mode)"] + } + + // AWS layer + subgraph cluster_aws { + label="AWS (lambda mode)" + style=filled + fillcolor="#fde8d0" + + step_functions [label="Step Functions\nstate machine"] + lambda [label="Lambda\nFFmpeg container"] } // Data layer @@ -58,48 +65,50 @@ digraph system_overview { style=filled fillcolor="#f8e8f0" - postgres [label="PostgreSQL\nport 5433", shape=cylinder] - redis [label="Redis\nport 6380", shape=cylinder] - sqs [label="SQS\n(cloud)", shape=cylinder, style=dashed] + postgres [label="PostgreSQL\nport 5436", shape=cylinder] + redis [label="Redis\nport 6381", shape=cylinder] } // Storage subgraph cluster_storage { - label="File Storage" + label="S3 Storage" style=filled fillcolor="#f0f0f0" - local_fs [label="Local FS\n/media", shape=folder] - s3 [label="S3\n(cloud)", shape=folder, style=dashed] + minio [label="MinIO (local)\nport 9000", shape=folder] + s3 [label="AWS S3 (cloud)", shape=folder, style="dashed,rounded"] + bucket_in [label="mpr-media-in", shape=note] + bucket_out [label="mpr-media-out", shape=note] } // Connections browser -> nginx - nginx -> django [label="/admin"] - nginx -> fastapi [label="/api"] - nginx -> timeline [label="/ui"] + nginx -> django [xlabel="/admin"] + nginx -> fastapi [xlabel="/api, /graphql"] + nginx -> timeline [xlabel="/ui"] + nginx -> minio [xlabel="/media/*"] - // Django uses FastAPI for operations (single API gateway) - django -> fastapi [label="job operations"] - django -> postgres [label="CRUD only"] + timeline -> fastapi [xlabel="REST API"] - // Timeline UI uses FastAPI - timeline -> fastapi [label="REST API"] - - // FastAPI is the single API gateway fastapi -> postgres - fastapi -> redis [label="job status"] - fastapi -> grpc_server [label="gRPC\nprogress streaming"] + fastapi -> grpc_server [xlabel="gRPC\nprogress"] - // Worker layer - grpc_server -> celery [label="task dispatch"] - celery -> redis [label="queue"] - celery -> postgres [label="job updates"] - celery -> grpc_server [label="progress\ncallbacks", style=dotted] - celery -> local_fs [label="read/write"] + // Local mode + grpc_server -> celery [xlabel="task dispatch"] + celery -> redis [xlabel="queue"] + celery -> postgres [xlabel="job updates"] + celery -> minio [xlabel="S3 API\ndownload/upload"] - // Cloud (future) - lambda -> sqs [label="queue", style=dashed] - lambda -> s3 [label="read/write", style=dashed] + // Lambda mode + fastapi -> step_functions [xlabel="boto3\nstart_execution", style=dashed] + step_functions -> lambda [style=dashed] + lambda -> s3 [xlabel="download/upload", style=dashed] + lambda -> fastapi [xlabel="callback\nPOST /jobs/{id}/callback", style=dashed] + + // Storage details + minio -> bucket_in [style=dotted, arrowhead=none] + minio -> bucket_out [style=dotted, arrowhead=none] + s3 -> bucket_in [style=dotted, arrowhead=none] + s3 -> bucket_out [style=dotted, arrowhead=none] } diff --git a/docs/architecture/01-system-overview.svg b/docs/architecture/01-system-overview.svg index abe50ac..6742f6e 100644 --- a/docs/architecture/01-system-overview.svg +++ b/docs/architecture/01-system-overview.svg @@ -1,260 +1,293 @@ - - - + + system_overview - -MPR - System Overview + +MPR - System Overview cluster_external - -External + +External cluster_proxy - -Reverse Proxy + +Reverse Proxy cluster_apps - -Application Layer + +Application Layer cluster_workers - -Worker Layer + +Worker Layer -cluster_data - -Data Layer +cluster_aws + +AWS (lambda mode) +cluster_data + +Data Layer + + cluster_storage - -File Storage + +S3 Storage browser - -Browser -mpr.local.ar + +Browser +mpr.local.ar / mpr.mcrn.ar nginx - -nginx -port 80 + +nginx +port 80 browser->nginx - - + + django - -Django -/admin -port 8701 + +Django +/admin +port 8701 nginx->django - - -/admin + + +/admin fastapi - -FastAPI -/api -port 8702 + +FastAPI +/api + /graphql +port 8702 nginx->fastapi - - -/api + + +/api, /graphql timeline - -Timeline UI -/ui -port 5173 + +Timeline UI +/ui +port 5173 nginx->timeline - - -/ui + + +/ui - + + +minio + +MinIO (local) +port 9000 + + -django->fastapi - - -job operations - - - -postgres - - -PostgreSQL -port 5433 - - - -django->postgres - - -CRUD only +nginx->minio + + +/media/* grpc_server - -gRPC Server -port 50051 + +gRPC Server +port 50051 - + fastapi->grpc_server - - -gRPC -progress streaming + + +gRPC +progress + + + +step_functions + +Step Functions +state machine + + + +fastapi->step_functions + + +boto3 +start_execution + + + +postgres + + +PostgreSQL +port 5436 - + fastapi->postgres - - - - - -redis - - -Redis -port 6380 - - - -fastapi->redis - - -job status + + - + timeline->fastapi - - -REST API + + +REST API celery - -Celery Worker -(local) + +Celery Worker +(local mode) - + grpc_server->celery - - -task dispatch - - - -celery->grpc_server - - -progress -callbacks + + +task dispatch - + celery->postgres - - -job updates + + +job updates + + + +redis + + +Redis +port 6381 - + celery->redis - - -queue + + +queue - - -local_fs - -Local FS -/media - - - -celery->local_fs - - -read/write + + +celery->minio + + +S3 API +download/upload - + lambda - -Lambda -(cloud) + +Lambda +FFmpeg container - - -sqs - - -SQS -(cloud) + + +step_functions->lambda + + - + -lambda->sqs - - -queue +lambda->fastapi + + +callback +POST /jobs/{id}/callback s3 - -S3 -(cloud) + +AWS S3 (cloud) - + lambda->s3 - - -read/write + + +download/upload + + + +bucket_in + + + +mpr-media-in + + + +minio->bucket_in + + + + +bucket_out + + + +mpr-media-out + + + +minio->bucket_out + + + + +s3->bucket_in + + + + +s3->bucket_out + diff --git a/docs/architecture/02-data-model.dot b/docs/architecture/02-data-model.dot index b0348d1..37fefe8 100644 --- a/docs/architecture/02-data-model.dot +++ b/docs/architecture/02-data-model.dot @@ -10,13 +10,13 @@ digraph data_model { graph [splines=ortho, nodesep=0.6, ranksep=1.2] - MediaAsset [label="{MediaAsset|id: UUID (PK)\lfilename: str\lfile_path: str\lfile_size: int?\lstatus: pending/ready/error\lerror_message: str?\l|duration: float?\lvideo_codec: str?\laudio_codec: str?\lwidth: int?\lheight: int?\lframerate: float?\lbitrate: int?\lproperties: JSON\l|comments: str\ltags: JSON[]\l|created_at: datetime\lupdated_at: datetime\l}"] + MediaAsset [label="{MediaAsset|id: UUID (PK)\lfilename: str\lfile_path: str (S3 key)\lfile_size: int?\lstatus: pending/ready/error\lerror_message: str?\l|duration: float?\lvideo_codec: str?\laudio_codec: str?\lwidth: int?\lheight: int?\lframerate: float?\lbitrate: int?\lproperties: JSON\l|comments: str\ltags: JSON[]\l|created_at: datetime\lupdated_at: datetime\l}"] TranscodePreset [label="{TranscodePreset|id: UUID (PK)\lname: str (unique)\ldescription: str\lis_builtin: bool\l|container: str\l|video_codec: str\lvideo_bitrate: str?\lvideo_crf: int?\lvideo_preset: str?\lresolution: str?\lframerate: float?\l|audio_codec: str\laudio_bitrate: str?\laudio_channels: int?\laudio_samplerate: int?\l|extra_args: JSON[]\l|created_at: datetime\lupdated_at: datetime\l}"] - TranscodeJob [label="{TranscodeJob|id: UUID (PK)\l|source_asset_id: UUID (FK)\l|preset_id: UUID? (FK)\lpreset_snapshot: JSON\l|trim_start: float?\ltrim_end: float?\l|output_filename: str\loutput_path: str?\loutput_asset_id: UUID? (FK)\l|status: pending/processing/...\lprogress: float (0-100)\lcurrent_frame: int?\lcurrent_time: float?\lspeed: str?\lerror_message: str?\l|celery_task_id: str?\lpriority: int\l|created_at: datetime\lstarted_at: datetime?\lcompleted_at: datetime?\l}"] + TranscodeJob [label="{TranscodeJob|id: UUID (PK)\l|source_asset_id: UUID (FK)\l|preset_id: UUID? (FK)\lpreset_snapshot: JSON\l|trim_start: float?\ltrim_end: float?\l|output_filename: str\loutput_path: str? (S3 key)\loutput_asset_id: UUID? (FK)\l|status: pending/processing/...\lprogress: float (0-100)\lcurrent_frame: int?\lcurrent_time: float?\lspeed: str?\lerror_message: str?\l|celery_task_id: str?\lexecution_arn: str?\lpriority: int\l|created_at: datetime\lstarted_at: datetime?\lcompleted_at: datetime?\l}"] - MediaAsset -> TranscodeJob [label="1:N source_asset"] - TranscodePreset -> TranscodeJob [label="1:N preset"] - TranscodeJob -> MediaAsset [label="1:1 output_asset", style=dashed] + MediaAsset -> TranscodeJob [xlabel="1:N source_asset"] + TranscodePreset -> TranscodeJob [xlabel="1:N preset"] + TranscodeJob -> MediaAsset [xlabel="1:1 output_asset", style=dashed] } diff --git a/docs/architecture/02-data-model.svg b/docs/architecture/02-data-model.svg index 4fdd5ed..8eae3a9 100644 --- a/docs/architecture/02-data-model.svg +++ b/docs/architecture/02-data-model.svg @@ -1,15 +1,15 @@ - - + data_model - -MPR - Data Model + +MPR - Data Model MediaAsset @@ -18,7 +18,7 @@ id: UUID (PK) filename: str -file_path: str +file_path: str (S3 key) file_size: int? status: pending/ready/error error_message: str? @@ -41,43 +41,44 @@ TranscodeJob - -TranscodeJob - -id: UUID (PK) - -source_asset_id: UUID (FK) - -preset_id: UUID? (FK) -preset_snapshot: JSON - -trim_start: float? -trim_end: float? - -output_filename: str -output_path: str? -output_asset_id: UUID? (FK) - -status: pending/processing/... -progress: float (0-100) -current_frame: int? -current_time: float? -speed: str? -error_message: str? - -celery_task_id: str? -priority: int - -created_at: datetime -started_at: datetime? -completed_at: datetime? + +TranscodeJob + +id: UUID (PK) + +source_asset_id: UUID (FK) + +preset_id: UUID? (FK) +preset_snapshot: JSON + +trim_start: float? +trim_end: float? + +output_filename: str +output_path: str? (S3 key) +output_asset_id: UUID? (FK) + +status: pending/processing/... +progress: float (0-100) +current_frame: int? +current_time: float? +speed: str? +error_message: str? + +celery_task_id: str? +execution_arn: str? +priority: int + +created_at: datetime +started_at: datetime? +completed_at: datetime? MediaAsset->TranscodeJob - - -1:N source_asset + + +1:N source_asset @@ -112,16 +113,16 @@ TranscodePreset->TranscodeJob - - -1:N preset + + +1:N preset TranscodeJob->MediaAsset - - -1:1 output_asset + + +1:1 output_asset diff --git a/docs/architecture/03-job-flow.dot b/docs/architecture/03-job-flow.dot index 813da0c..0ae5b6e 100644 --- a/docs/architecture/03-job-flow.dot +++ b/docs/architecture/03-job-flow.dot @@ -3,7 +3,6 @@ digraph job_flow { node [shape=box, style=rounded, fontname="Helvetica"] edge [fontname="Helvetica", fontsize=10] - // Title labelloc="t" label="MPR - Job Flow" fontsize=16 @@ -11,7 +10,19 @@ digraph job_flow { graph [splines=ortho, nodesep=0.6, ranksep=0.6] - // States + // API entry points + subgraph cluster_api { + label="API Entry Points" + style=dashed + color=gray + + rest_create [label="POST /api/jobs/", shape=ellipse] + gql_create [label="mutation createJob", shape=ellipse] + rest_cancel [label="POST /api/jobs/{id}/cancel", shape=ellipse] + rest_callback [label="POST /api/jobs/{id}/callback", shape=ellipse] + } + + // Job states subgraph cluster_states { label="Job States" style=filled @@ -24,78 +35,70 @@ digraph job_flow { cancelled [label="CANCELLED", fillcolor="#6c757d", style="filled,rounded", fontcolor=white] } - // Transitions - pending -> processing [label="worker picks up"] - processing -> completed [label="success"] - processing -> failed [label="error"] - pending -> cancelled [label="user cancels"] - processing -> cancelled [label="user cancels"] - failed -> pending [label="retry"] + // State transitions + pending -> processing [xlabel="worker picks up"] + processing -> completed [xlabel="success"] + processing -> failed [xlabel="error"] + pending -> cancelled [xlabel="user cancels"] + processing -> cancelled [xlabel="user cancels"] + failed -> pending [xlabel="retry"] - // API actions - subgraph cluster_api { - label="API Actions" - style=dashed - color=gray + rest_create -> pending + gql_create -> pending + rest_cancel -> cancelled [style=dashed] - create_job [label="POST /jobs/", shape=ellipse] - cancel_job [label="POST /jobs/{id}/cancel", shape=ellipse] - retry_job [label="POST /jobs/{id}/retry", shape=ellipse] - } - - create_job -> pending - cancel_job -> cancelled [style=dashed] - retry_job -> pending [style=dashed] - - // Executor layer - subgraph cluster_executor { - label="Executor Layer" + // Executor dispatch + subgraph cluster_dispatch { + label="Executor Dispatch" style=filled fillcolor="#fff8e8" - executor [label="Executor\n(abstract)", shape=diamond] - local [label="LocalExecutor\nCelery + FFmpeg"] - lambda_exec [label="LambdaExecutor\nSQS + Lambda"] + dispatch [label="MPR_EXECUTOR", shape=diamond] } - processing -> executor - executor -> local [label="MPR_EXECUTOR=local"] - executor -> lambda_exec [label="MPR_EXECUTOR=lambda", style=dashed] + pending -> dispatch - // FFmpeg operations - subgraph cluster_ffmpeg { - label="FFmpeg Operations" + // Local path + subgraph cluster_local { + label="Local Mode (Celery)" style=filled fillcolor="#e8f4e8" - transcode [label="Transcode\n(with preset)"] - trim [label="Trim\n(-c:v copy -c:a copy)"] + celery_task [label="Celery Task\n(transcode queue)"] + s3_download [label="S3 Download\n(MinIO)"] + ffmpeg_local [label="FFmpeg\ntranscode/trim"] + s3_upload [label="S3 Upload\n(MinIO)"] + db_update [label="DB Update\n(update_job_progress)"] } - local -> transcode - local -> trim + dispatch -> celery_task [xlabel="local"] + celery_task -> s3_download + s3_download -> ffmpeg_local + ffmpeg_local -> s3_upload + s3_upload -> db_update + db_update -> completed [style=dotted] - // gRPC streaming - subgraph cluster_grpc { - label="gRPC Communication" + // Lambda path + subgraph cluster_lambda { + label="Lambda Mode (AWS)" style=filled - fillcolor="#e8e8f8" + fillcolor="#fde8d0" - grpc_stream [label="StreamProgress\n(server streaming)", shape=parallelogram] - grpc_submit [label="SubmitJob\n(unary)", shape=parallelogram] - grpc_cancel [label="CancelJob\n(unary)", shape=parallelogram] + sfn_start [label="Step Functions\nstart_execution"] + lambda_fn [label="Lambda\nFFmpeg container"] + s3_dl_aws [label="S3 Download\n(AWS)"] + ffmpeg_aws [label="FFmpeg\ntranscode/trim"] + s3_ul_aws [label="S3 Upload\n(AWS)"] + callback [label="HTTP Callback\nPOST /jobs/{id}/callback"] } - // Progress tracking via gRPC - progress [label="Progress Updates\n(gRPC → Redis → DB)", shape=note] - transcode -> progress [style=dotted] - trim -> progress [style=dotted] - progress -> grpc_stream [style=dotted, label="stream to client"] - grpc_stream -> processing [style=dotted, label="update status"] + dispatch -> sfn_start [xlabel="lambda"] + sfn_start -> lambda_fn + lambda_fn -> s3_dl_aws + s3_dl_aws -> ffmpeg_aws + ffmpeg_aws -> s3_ul_aws + s3_ul_aws -> callback + callback -> completed [style=dotted] - // gRPC job control - create_job -> grpc_submit [label="via gRPC"] - grpc_submit -> pending [style=dashed] - cancel_job -> grpc_cancel [label="via gRPC"] - grpc_cancel -> cancelled [style=dashed] + rest_callback -> completed [style=dashed, xlabel="Lambda reports"] } diff --git a/docs/architecture/03-job-flow.svg b/docs/architecture/03-job-flow.svg index 36a21bc..cb09ad1 100644 --- a/docs/architecture/03-job-flow.svg +++ b/docs/architecture/03-job-flow.svg @@ -1,296 +1,329 @@ - - - + + job_flow - -MPR - Job Flow + +MPR - Job Flow -cluster_states - -Job States +cluster_api + +API Entry Points -cluster_api - -API Actions +cluster_states + +Job States -cluster_executor - -Executor Layer +cluster_dispatch + +Executor Dispatch -cluster_ffmpeg - -FFmpeg Operations +cluster_local + +Local Mode (Celery) -cluster_grpc - -gRPC Communication +cluster_lambda + +Lambda Mode (AWS) + + + +rest_create + +POST /api/jobs/ - + pending - -PENDING + +PENDING + + + +rest_create->pending + + + + + +gql_create + +mutation createJob + + + +gql_create->pending + + + + + +rest_cancel + +POST /api/jobs/{id}/cancel + + + +cancelled + +CANCELLED + + + +rest_cancel->cancelled + + + + + +rest_callback + +POST /api/jobs/{id}/callback + + + +completed + +COMPLETED + + + +rest_callback->completed + + +Lambda reports - + processing - -PROCESSING + +PROCESSING pending->processing - - -worker picks up - - - -cancelled - -CANCELLED + + +worker picks up pending->cancelled - - -user cancels + + +user cancels - - -completed - -COMPLETED + + +dispatch + +MPR_EXECUTOR + + + +pending->dispatch + + processing->completed - - -success + + +success - + failed - -FAILED + +FAILED processing->failed - - -error + + +error processing->cancelled - - -user cancels - - - -executor - -Executor -(abstract) - - - -processing->executor - - + + +user cancels failed->pending - - -retry + + +retry - - -create_job - -POST /jobs/ - - - -create_job->pending - - - - - -grpc_submit - -SubmitJob -(unary) - - - -create_job->grpc_submit - - -via gRPC - - - -cancel_job - -POST /jobs/{id}/cancel - - - -cancel_job->cancelled - - - - - -grpc_cancel - -CancelJob -(unary) - - - -cancel_job->grpc_cancel - - -via gRPC - - - -retry_job - -POST /jobs/{id}/retry - - - -retry_job->pending - - - - - -local - -LocalExecutor -Celery + FFmpeg - - - -executor->local - - -MPR_EXECUTOR=local - - + -lambda_exec - -LambdaExecutor -SQS + Lambda +celery_task + +Celery Task +(transcode queue) - - -executor->lambda_exec - - -MPR_EXECUTOR=lambda + + +dispatch->celery_task + + +local - - -transcode - -Transcode -(with preset) + + +sfn_start + +Step Functions +start_execution - - -local->transcode - - - - - -trim - -Trim -(-c:v copy -c:a copy) - - - -local->trim - - - - - -progress - - - -Progress Updates -(gRPC → Redis → DB) - - - -transcode->progress - - - - - -trim->progress - - - - - -grpc_stream - -StreamProgress -(server streaming) - - - -grpc_stream->processing - - -update status - - - -grpc_submit->pending - - - - - -grpc_cancel->cancelled - - - - + -progress->grpc_stream - - -stream to client +dispatch->sfn_start + + +lambda + + + +s3_download + +S3 Download +(MinIO) + + + +celery_task->s3_download + + + + + +ffmpeg_local + +FFmpeg +transcode/trim + + + +s3_download->ffmpeg_local + + + + + +s3_upload + +S3 Upload +(MinIO) + + + +ffmpeg_local->s3_upload + + + + + +db_update + +DB Update +(update_job_progress) + + + +s3_upload->db_update + + + + + +db_update->completed + + + + + +lambda_fn + +Lambda +FFmpeg container + + + +sfn_start->lambda_fn + + + + + +s3_dl_aws + +S3 Download +(AWS) + + + +lambda_fn->s3_dl_aws + + + + + +ffmpeg_aws + +FFmpeg +transcode/trim + + + +s3_dl_aws->ffmpeg_aws + + + + + +s3_ul_aws + +S3 Upload +(AWS) + + + +ffmpeg_aws->s3_ul_aws + + + + + +callback + +HTTP Callback +POST /jobs/{id}/callback + + + +s3_ul_aws->callback + + + + + +callback->completed + + diff --git a/docs/architecture/index.html b/docs/architecture/index.html index 5ed7e13..37bb7d3 100644 --- a/docs/architecture/index.html +++ b/docs/architecture/index.html @@ -1,101 +1,177 @@ - + - - - - MPR - Architecture - - - -

MPR - Media Processor

-

A web-based media transcoding tool with professional architecture.

+ + + + MPR - Architecture + + + +

MPR - Media Processor

+

+ Media transcoding platform with dual execution modes: local (Celery + + MinIO) and cloud (AWS Step Functions + Lambda + S3). +

- + -

System Overview

-
-
-

Architecture

- - System Overview - - Open full size +

System Overview

+
+
+

Architecture

+ + System Overview + + Open full size +
-
-
-

Components

-
    -
  • Reverse Proxy (nginx)
  • -
  • Application Layer (Django, FastAPI, UI)
  • -
  • Worker Layer (Celery, Lambda)
  • -
  • Data Layer (PostgreSQL, Redis, SQS)
  • -
  • Storage (Local FS, S3)
  • -
-
- -

Data Model

-
-
-

Entity Relationships

- - Data Model - - Open full size +
+

Components

+
    +
  • + + Reverse Proxy (nginx) +
  • +
  • + + Application Layer (Django Admin, FastAPI + GraphQL, Timeline + UI) +
  • +
  • + + Worker Layer (Celery local mode) +
  • +
  • + + AWS (Step Functions, Lambda - cloud mode) +
  • +
  • + + Data Layer (PostgreSQL, Redis) +
  • +
  • + + S3 Storage (MinIO local / AWS S3 cloud) +
  • +
-
-
-

Entities

-
    -
  • MediaAsset - Video/audio files with metadata
  • -
  • TranscodePreset - Encoding configurations
  • -
  • TranscodeJob - Processing queue items
  • -
-
- -

Job Flow

-
-
-

Job Lifecycle

- - Job Flow - - Open full size +

Data Model

+
+
+

Entity Relationships

+ + Data Model + + Open full size +
-
-
-

Job States

-
    -
  • PENDING - Waiting in queue
  • -
  • PROCESSING - Worker executing
  • -
  • COMPLETED - Success
  • -
  • FAILED - Error occurred
  • -
  • CANCELLED - User cancelled
  • -
-
+
+

Entities

+
    +
  • + + MediaAsset - Video/audio files (S3 keys as paths) +
  • +
  • + + TranscodePreset - Encoding configurations +
  • +
  • + + TranscodeJob - Processing queue (celery_task_id or + execution_arn) +
  • +
+
-

Quick Reference

-
# Generate SVGs from DOT files
-dot -Tsvg 01-system-overview.dot -o 01-system-overview.svg
-dot -Tsvg 02-data-model.dot -o 02-data-model.svg
-dot -Tsvg 03-job-flow.dot -o 03-job-flow.svg
+        

Job Flow

+
+
+

Job Lifecycle

+ + Job Flow + + Open full size +
+
-# Or generate all at once -for f in *.dot; do dot -Tsvg "$f" -o "${f%.dot}.svg"; done
+
+

Job States

+
    +
  • + + PENDING - Waiting in queue +
  • +
  • + + PROCESSING - Worker executing +
  • +
  • + + COMPLETED - Success +
  • +
  • + + FAILED - Error occurred +
  • +
  • + + CANCELLED - User cancelled +
  • +
+

Execution Modes

+
    +
  • + + Local: Celery + MinIO (S3 API) + FFmpeg +
  • +
  • + + Lambda: Step Functions + Lambda + AWS S3 +
  • +
+
-

Access Points

-
# Add to /etc/hosts
+        

API Interfaces

+
# REST API
+http://mpr.local.ar/api/docs     - Swagger UI
+POST /api/assets/scan             - Scan S3 bucket for media
+POST /api/jobs/                   - Create transcode job
+POST /api/jobs/{id}/callback      - Lambda completion callback
+
+# GraphQL (GraphiQL)
+http://mpr.local.ar/graphql       - GraphiQL IDE
+query { assets { id filename } }
+mutation { createJob(input: {...}) { id status } }
+mutation { scanMediaFolder { found registered } }
+ +

Access Points

+
# Local development
 127.0.0.1 mpr.local.ar
+http://mpr.local.ar/admin         - Django Admin
+http://mpr.local.ar/api/docs      - FastAPI Swagger
+http://mpr.local.ar/graphql       - GraphiQL
+http://mpr.local.ar/              - Timeline UI
+http://localhost:9001              - MinIO Console
 
-# URLs
-http://mpr.local.ar/admin  - Django Admin
-http://mpr.local.ar/api    - FastAPI (docs at /api/docs)
-http://mpr.local.ar/ui     - Timeline UI
- +# AWS deployment +https://mpr.mcrn.ar/ - Production
+ +

Quick Reference

+
# Render SVGs from DOT files
+for f in *.dot; do dot -Tsvg "$f" -o "${f%.dot}.svg"; done
+
+# Switch executor mode
+MPR_EXECUTOR=local    # Celery + MinIO
+MPR_EXECUTOR=lambda   # Step Functions + Lambda + S3
+ diff --git a/docs/media-storage.md b/docs/media-storage.md index 011f463..a410c6b 100644 --- a/docs/media-storage.md +++ b/docs/media-storage.md @@ -2,147 +2,119 @@ ## Overview -MPR separates media into **input** and **output** paths, each independently configurable. File paths are stored **relative to their respective root** to ensure portability between local development and cloud deployments (AWS S3, etc.). +MPR uses **S3-compatible storage** everywhere. Locally via MinIO, in production via AWS S3. The same boto3 code and S3 keys work in both environments - the only difference is the `S3_ENDPOINT_URL` env var. ## Storage Strategy -### Input / Output Separation +### S3 Buckets -| Path | Env Var | Purpose | -|------|---------|---------| -| `MEDIA_IN` | `/app/media/in` | Source media files to process | -| `MEDIA_OUT` | `/app/media/out` | Transcoded/trimmed output files | +| Bucket | Env Var | Purpose | +|--------|---------|---------| +| `mpr-media-in` | `S3_BUCKET_IN` | Source media files | +| `mpr-media-out` | `S3_BUCKET_OUT` | Transcoded/trimmed output | -These can point to different locations or even different servers/buckets in production. +### S3 Keys as File Paths +- **Database**: Stores S3 object keys (e.g., `video1.mp4`, `subfolder/video3.mp4`) +- **Local dev**: MinIO serves these via S3 API on port 9000 +- **AWS**: Real S3, same keys, different endpoint -### File Path Storage -- **Database**: Stores only the relative path (e.g., `videos/sample.mp4`) -- **Input Root**: Configurable via `MEDIA_IN` env var -- **Output Root**: Configurable via `MEDIA_OUT` env var -- **Serving**: Base URL configurable via `MEDIA_BASE_URL` env var +### Why S3 Everywhere? +1. **Identical code paths** - no branching between local and cloud +2. **Seamless executor switching** - Celery and Lambda both use boto3 +3. **Cloud-native** - ready for production without refactoring -### Why Relative Paths? -1. **Portability**: Same database works locally and in cloud -2. **Flexibility**: Easy to switch between storage backends -3. **Simplicity**: No need to update paths when migrating - -## Local Development +## Local Development (MinIO) ### Configuration ```bash -MEDIA_IN=/app/media/in -MEDIA_OUT=/app/media/out +S3_ENDPOINT_URL=http://minio:9000 +S3_BUCKET_IN=mpr-media-in +S3_BUCKET_OUT=mpr-media-out +AWS_ACCESS_KEY_ID=minioadmin +AWS_SECRET_ACCESS_KEY=minioadmin ``` -### File Structure -``` -/app/media/ -├── in/ # Source files -│ ├── video1.mp4 -│ ├── video2.mp4 -│ └── subfolder/ -│ └── video3.mp4 -└── out/ # Transcoded output - ├── video1_h264.mp4 - └── video2_trimmed.mp4 -``` +### How It Works +- MinIO runs as a Docker container (port 9000 API, port 9001 console) +- `minio-init` container creates buckets and sets public read access on startup +- Nginx proxies `/media/in/` and `/media/out/` to MinIO buckets +- Upload files via MinIO Console (http://localhost:9001) or `mc` CLI -### Database Storage -``` -# Source assets (scanned from media/in) -filename: video1.mp4 -file_path: video1.mp4 - -filename: video3.mp4 -file_path: subfolder/video3.mp4 -``` - -### URL Serving -- Nginx serves input via `location /media/in { alias /app/media/in; }` -- Nginx serves output via `location /media/out { alias /app/media/out; }` -- Frontend accesses: `http://mpr.local.ar/media/in/video1.mp4` -- Video player: `